ServerManager 是一个服务器集群管理系统,有个野心不小的插件系统——让用户自己写 Python 代码跑在服务端,而且不用开子进程、不用容器隔离,直接在 Django ASGI 进程里跑。听起来就挺危险的,对吧?
这事的难点不在于”让插件跑起来”——exec() 就行——难点在于”阻止插件做不该做的事”。一个恶意插件能 import os 然后 os.system('rm -rf /'),能 __import__('subprocess'),能通过 obj.__class__.__bases__[0].__subclasses__() 链回溯到任意类型……Python 的动态性让沙箱成了猫鼠游戏。
折腾了大半年,最终在这套系统里落下了七层防线。每一层单独看都有绕过的可能,但叠在一起,攻击面就缩到很小了。
第一层:AST 静态校验——代码还没跑就先拦一轮 插件代码在执行之前,先过一遍 AST(抽象语法树)校验。ASTValidator 会遍历整棵语法树,找出所有的 import 和 from ... import 语句,检查是否涉及被禁止的模块。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class ASTValidator : BLOCKED_ATTRIBUTE_NAMES = { '__class__' , '__bases__' , '__mro__' , '__subclasses__' , '__globals__' , '__code__' , '__closure__' , '__func__' , } BLOCKED_REFLECTION_FUNCTIONS = {'vars' , 'dir' , 'globals' , 'locals' } @staticmethod def validate (source_code: str , filepath: str = '' , blocked_modules: Optional [Set [str ]] = None ) -> List [str ]: tree = ast.parse(source_code) warnings = [] for node in ast.walk(tree): if isinstance (node, ast.Import): for alias in node.names: if _is_module_blocked(alias.name, blocked_modules): warnings.append(f"{filepath} : 导入被禁止的模块: {alias.name} " ) if isinstance (node, ast.Attribute): if node.attr in ASTValidator.BLOCKED_ATTRIBUTE_NAMES: warnings.append(f"{filepath} : 访问被禁止的属性: {node.attr} " ) return warnings
subprocess、os.system、ctypes 这种直接能起shell的模块,在这一层就打回去了。同时也在 AST 层面拦截了对 __class__、__globals__ 这类反射属性的访问。
但这只是第一道门。字符串拼接能绕 AST——getattr(os, 'sys'+'tem') 这种写法 AST 看不出问题。所以还得往下加。
AST 校验是在加载前做的,但插件运行时可能通过 __import__() 或 importlib 动态导入。这就要靠 sys.meta_path 来拦截了。
_PluginImportFinder 注册在 sys.meta_path 的最前面,每次 Python 解释器执行 import 都会先过它这一关:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class _PluginImportFinder : def find_spec (self, fullname, path, target=None ): ctx = _plugin_context.get() if ctx is None : return None plugin_id = ctx.get('plugin_id' , 'unknown' ) permissions = ctx.get('permissions' , []) critical = get_critical_blocked_modules() if _is_module_blocked(fullname, critical): raise ImportError(f"插件无权导入模块: {fullname} " ) blocked = get_blocked_modules_for_permissions(permissions) if _is_module_blocked(fullname, blocked): raise ImportError(f"插件无权导入被禁止的模块: {fullname} " ) root_module = fullname.split('.' )[0 ] if root_module in ('apps' , 'middleware' , 'consumers' , 'util' , 'django' ): if not self ._is_safe_internal_import(fullname, permissions): raise ImportError(f"插件无权直接导入: {fullname} " ) return None
这里有个细节很关键——_trusted_internal_import 这个 ContextVar。框架自己的代码在替插件做事时(比如 BasePlugin.get_nodes()),需要临时打开信任通道。但这个通道只在框架代码的栈帧里有效,插件代码试图import还是会被拦。
另一个细节是 third_party_caller 的检测。当插件调用了 requests.get(),requests 库内部会调用 import——这时候调用栈里顶层是插件代码,但中间经过第三方库。系统通过 _is_plugin_import_caller() 判断调用源,如果第三方库有 package.import 权限就放行其内部 import,但不让插件借道。
第三层:运行时 builtins 篡改——把 eval、exec、getattr 全换成替身 即便前两层拦住了 import,插件还可以用 Python 内置函数搞事。eval() 和 exec() 能执行任意字符串,getattr() 能绕过属性名检查,globals() 和 locals() 能一览所有局部变量。
runtime_guard() 是一个上下文管理器,在插件执行前挨个替换 builtins 里的危险函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @contextlib.contextmanager def runtime_guard (): patches = [] def patch (obj, attr, replacement ): original = getattr (obj, attr) patches.append((obj, attr, original)) setattr (obj, attr, replacement) for name in ('eval' , 'exec' , 'compile' , 'breakpoint' ): patch(builtins, name, _blocked_runtime_call_factory(getattr (builtins, name), name)) patch(builtins, 'getattr' , _guarded_getattr_factory(builtins.getattr )) patch(builtins, 'setattr' , _guarded_setattr_factory(builtins.setattr )) patch(builtins, 'delattr' , _guarded_delattr_factory(builtins.delattr )) for name in ('globals' , 'locals' , 'vars' , 'dir' ): patch(builtins, name, _guarded_reflection_call_factory(getattr (builtins, name), name)) try : yield finally : for obj, attr, original in reversed (patches): setattr (obj, attr, original)
替换不是永久的——finally 块保证还原,而且用的是 reversed(patches) 确保嵌套场景不出问题。这也意味着框架代码在 runtime_guard 之外不会受影响。
被阻断的函数会检查调用来源:
1 2 3 4 5 6 7 8 def _blocked_runtime_call_factory (original_func, name: str ): def _inner (*args, **kwargs ): if _trusted_internal_import.get() and not _called_from_plugin_frame(caller_globals): return original_func(*args, **kwargs) raise PermissionError(f"插件 {plugin_id} 无权调用危险函数: {name} " ) return _inner
第四层:getattr/setattr 属性守卫——堵死反射绕行 前面说了 getattr(obj, '__class__') 是常见绕行手段。第三层把 getattr 换成了 _guarded_getattr,它会在每次调用时检查属性名是不是在黑名单里:
1 2 3 4 5 6 7 8 BLOCKED_ATTRIBUTE_NAMES = { '__class__' , '__bases__' , '__mro__' , '__subclasses__' , '__globals__' , '__code__' , '__closure__' , '__func__' , '__self__' , '__module__' , '__dict__' , '__defaults__' , 'gi_frame' , 'gi_code' , 'cr_frame' , 'cr_code' , 'tb_frame' , 'f_globals' , 'f_locals' , 'func_globals' , 'modules' , 'path_hooks' , 'meta_path' , }
这串名单几乎涵盖了 Python 里所有能从对象逃逸到其他命名空间的属性。__class__.__bases__[0].__subclasses__() 这条经典沙箱逃逸路线,在第一步 .__class__ 就被拦住了。
第五层:RestrictedPython 编译——AST 级别的终极保险 RestrictPython 是 Zope 基金会的老牌沙箱库,它的 compile_restricted() 在 AST 层面做额外限制:禁止未赋值就使用的变量名、拦截不安全的属性访问等。它还要求定义 _getattr_、_getitem_、_getiter_ 这些钩子函数,在编译后的代码里每次属性访问都会调用这些钩子。
我们自定义了 plugin_guarded_getattr:
1 2 3 4 5 6 def plugin_guarded_getattr (obj, name ): if name in BLOCKED_ATTRIBUTE_NAMES: raise AttributeError(f'"_getattr_" not allowed to access dangerous name "{name} "' ) if name.startswith('_' ) and name not in ALLOWED_DUNDER_METHODS: raise AttributeError(f'"_getattr_" not allowed to access private name "{name} "' ) return getattr (obj, name)
这样即便 runtime_guard 因为某种原因没生效,RestrictedPython 编译出的字节码本身也不会让插件访问危险属性。
第六层:文件系统权限控制——os.open 也不放过 文件访问的守卫覆盖得非常细致,不光拦了 open(),连 os.open、os.listdir、os.stat、os.rename、os.remove、os.mkdir、os.chmod、os.chown、甚至 os.scandir 都换成了带权限检查的版本。
更狠的是,连 dir_fd 参数都给封了。为什么?因为 dir_fd 可以让插件打开一个文件描述符,然后通过它访问本来没权限的路径——一种典型的目录穿越手法:
1 2 3 4 5 6 7 8 9 10 def _guarded_os_open_factory (original_open ): def _guarded_os_open (path, flags, mode=0o777 , *args, **kwargs ): ctx = _plugin_context.get() if ctx is None : return original_open(path, flags, mode, *args, **kwargs) if kwargs.get('dir_fd' ) is not None : plugin_id = ctx.get('plugin_id' , 'unknown' ) raise PermissionError(f"插件 {plugin_id} 无权通过 dir_fd 访问文件" ) return _guarded_os_open
插件能访问的文件路径只有两类:自己 data/ 目录下的,或者在 file.path:/some/dir 权限里明确声明的。路径穿越也没用,因为用了 os.path.realpath() 做规范化,再用 os.path.commonpath() 做前缀校验:
1 2 3 4 def _is_path_inside (base_path: str , candidate_path: str ) -> bool : base_real = os.path.realpath(base_path) candidate_real = os.path.realpath(candidate_path) return os.path.commonpath([base_real, candidate_real]) == base_real
第七层:数据库访问隔离——只能动自己的表 插件的数据库操作走了专门的 PluginDatabaseAdapter,限制了表名必须在 plugin_<plugin_id>_ 命名空间下。SQL 执行也做了多语句注入检查和 DDL 权限控制。插件要是想 DROP TABLE auth_user,门都没有。
contextvars 贯穿全局——并发安全的关键 七层防线虽然各管各的,但它们共享一套身份识别机制——contextvars.ContextVar。_plugin_context 保存了当前插件的 ID、权限列表、数据目录路径。_trusted_internal_import 标记当前是否在框架信任代码的栈帧里。_third_party_caller 标记当前 import 调用是否来自第三方库。
1 2 3 4 _plugin_context: ContextVar = ContextVar('plugin_context' , default=None ) _policy_path_resolution: ContextVar = ContextVar('policy_path_resolution' , default=False ) _trusted_internal_import: ContextVar = ContextVar('trusted_internal_import' , default=False ) _third_party_caller: ContextVar = ContextVar('_third_party_caller' , default=False )
ContextVar 在 asyncio 里天然线程安全(每个协程有独立上下文),但 copy_context() 被用在同步插件的线程池执行里,确保上下文正确传播:
1 2 3 4 5 6 7 async def execute_safe_async (self, plugin_id, func, *args, ... ): ctx = copy_context() result = await asyncio.wait_for( loop.run_in_executor(None , lambda : ctx.run(func, *args, **kwargs)), timeout=timeout )
七层未必够,但够了 这篇文章没有一个”结论”段告诉你”七层防线完美无缺”——那不诚实。Python 的沙箱历史上被绕过太多次了,真正安全的容器隔离还得靠 OS 级别的 namespace/cgroup。
但在这个场景下,插件是由管理员主动安装的(不是随便谁都能上传),七层防线加上权限审批流程,绝大多数攻击向量都堵住了。更重要的是,这套方案的收益很大——免去进程管理的复杂性,Django 的工作进程直接执行插件代码,延迟低、资源开销小,对单机十多节点这种规模来说刚刚好。
如果哪天插件生态做大了、开始有不信任的第三方插件,那就该考虑给每个插件开子进程或者容器了。但在那之前,这七道门够用。