在同一个进程里跑不可信代码:Python 插件沙箱的七层防线

ServerManager 是一个服务器集群管理系统,有个野心不小的插件系统——让用户自己写 Python 代码跑在服务端,而且不用开子进程、不用容器隔离,直接在 Django ASGI 进程里跑。听起来就挺危险的,对吧?

这事的难点不在于”让插件跑起来”——exec() 就行——难点在于”阻止插件做不该做的事”。一个恶意插件能 import os 然后 os.system('rm -rf /'),能 __import__('subprocess'),能通过 obj.__class__.__bases__[0].__subclasses__() 链回溯到任意类型……Python 的动态性让沙箱成了猫鼠游戏。

折腾了大半年,最终在这套系统里落下了七层防线。每一层单独看都有绕过的可能,但叠在一起,攻击面就缩到很小了。

第一层:AST 静态校验——代码还没跑就先拦一轮

插件代码在执行之前,先过一遍 AST(抽象语法树)校验。ASTValidator 会遍历整棵语法树,找出所有的 importfrom ... import 语句,检查是否涉及被禁止的模块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ASTValidator:
BLOCKED_ATTRIBUTE_NAMES = {
'__class__', '__bases__', '__mro__', '__subclasses__',
'__globals__', '__code__', '__closure__', '__func__',
# ... 更多危险属性
}
BLOCKED_REFLECTION_FUNCTIONS = {'vars', 'dir', 'globals', 'locals'}

@staticmethod
def validate(source_code: str, filepath: str = '',
blocked_modules: Optional[Set[str]] = None) -> List[str]:
tree = ast.parse(source_code)
warnings = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
if _is_module_blocked(alias.name, blocked_modules):
warnings.append(f"{filepath}: 导入被禁止的模块: {alias.name}")
if isinstance(node, ast.Attribute):
if node.attr in ASTValidator.BLOCKED_ATTRIBUTE_NAMES:
warnings.append(f"{filepath}: 访问被禁止的属性: {node.attr}")
return warnings

subprocessos.systemctypes 这种直接能起shell的模块,在这一层就打回去了。同时也在 AST 层面拦截了对 __class____globals__ 这类反射属性的访问。

但这只是第一道门。字符串拼接能绕 AST——getattr(os, 'sys'+'tem') 这种写法 AST 看不出问题。所以还得往下加。

第二层:sys.meta_path 导入拦截——运行时也休想偷偷 import

AST 校验是在加载前做的,但插件运行时可能通过 __import__()importlib 动态导入。这就要靠 sys.meta_path 来拦截了。

_PluginImportFinder 注册在 sys.meta_path 的最前面,每次 Python 解释器执行 import 都会先过它这一关:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class _PluginImportFinder:
def find_spec(self, fullname, path, target=None):
ctx = _plugin_context.get()
if ctx is None:
return None # 非插件上下文,放行

plugin_id = ctx.get('plugin_id', 'unknown')
permissions = ctx.get('permissions', [])

# 关键模块(subprocess, os.system 等):直接报错
critical = get_critical_blocked_modules()
if _is_module_blocked(fullname, critical):
raise ImportError(f"插件无权导入模块: {fullname}")

# 根据权限列表过滤
blocked = get_blocked_modules_for_permissions(permissions)
if _is_module_blocked(fullname, blocked):
raise ImportError(f"插件无权导入被禁止的模块: {fullname}")

# 内部模块(apps.*, django.* 等):需要走白名单
root_module = fullname.split('.')[0]
if root_module in ('apps', 'middleware', 'consumers', 'util', 'django'):
if not self._is_safe_internal_import(fullname, permissions):
raise ImportError(f"插件无权直接导入: {fullname}")

return None # 放行

这里有个细节很关键——_trusted_internal_import 这个 ContextVar。框架自己的代码在替插件做事时(比如 BasePlugin.get_nodes()),需要临时打开信任通道。但这个通道只在框架代码的栈帧里有效,插件代码试图import还是会被拦。

另一个细节是 third_party_caller 的检测。当插件调用了 requests.get(),requests 库内部会调用 import——这时候调用栈里顶层是插件代码,但中间经过第三方库。系统通过 _is_plugin_import_caller() 判断调用源,如果第三方库有 package.import 权限就放行其内部 import,但不让插件借道。

第三层:运行时 builtins 篡改——把 eval、exec、getattr 全换成替身

即便前两层拦住了 import,插件还可以用 Python 内置函数搞事。eval()exec() 能执行任意字符串,getattr() 能绕过属性名检查,globals()locals() 能一览所有局部变量。

runtime_guard() 是一个上下文管理器,在插件执行前挨个替换 builtins 里的危险函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@contextlib.contextmanager
def runtime_guard():
patches = []

def patch(obj, attr, replacement):
original = getattr(obj, attr)
patches.append((obj, attr, original))
setattr(obj, attr, replacement)

# 直接封杀的:eval、exec、compile、breakpoint
for name in ('eval', 'exec', 'compile', 'breakpoint'):
patch(builtins, name, _blocked_runtime_call_factory(getattr(builtins, name), name))

# 属性访问守卫:getattr、setattr、delattr
patch(builtins, 'getattr', _guarded_getattr_factory(builtins.getattr))
patch(builtins, 'setattr', _guarded_setattr_factory(builtins.setattr))
patch(builtins, 'delattr', _guarded_delattr_factory(builtins.delattr))

# 反射函数封杀:globals、locals、vars、dir
for name in ('globals', 'locals', 'vars', 'dir'):
patch(builtins, name, _guarded_reflection_call_factory(getattr(builtins, name), name))

try:
yield
finally:
# 还原——从后往前还原,确保嵌套时不出问题
for obj, attr, original in reversed(patches):
setattr(obj, attr, original)

替换不是永久的——finally 块保证还原,而且用的是 reversed(patches) 确保嵌套场景不出问题。这也意味着框架代码在 runtime_guard 之外不会受影响。

被阻断的函数会检查调用来源:

1
2
3
4
5
6
7
8
def _blocked_runtime_call_factory(original_func, name: str):
def _inner(*args, **kwargs):
# 框架代码信任标记为 True,且调用栈顶部不是插件代码——放行
if _trusted_internal_import.get() and not _called_from_plugin_frame(caller_globals):
return original_func(*args, **kwargs)
# 否则直接报错
raise PermissionError(f"插件 {plugin_id} 无权调用危险函数: {name}")
return _inner

第四层:getattr/setattr 属性守卫——堵死反射绕行

前面说了 getattr(obj, '__class__') 是常见绕行手段。第三层把 getattr 换成了 _guarded_getattr,它会在每次调用时检查属性名是不是在黑名单里:

1
2
3
4
5
6
7
8
BLOCKED_ATTRIBUTE_NAMES = {
'__class__', '__bases__', '__mro__', '__subclasses__',
'__globals__', '__code__', '__closure__', '__func__',
'__self__', '__module__', '__dict__', '__defaults__',
'gi_frame', 'gi_code', 'cr_frame', 'cr_code', 'tb_frame',
'f_globals', 'f_locals', 'func_globals',
'modules', 'path_hooks', 'meta_path',
}

这串名单几乎涵盖了 Python 里所有能从对象逃逸到其他命名空间的属性。__class__.__bases__[0].__subclasses__() 这条经典沙箱逃逸路线,在第一步 .__class__ 就被拦住了。

第五层:RestrictedPython 编译——AST 级别的终极保险

RestrictPython 是 Zope 基金会的老牌沙箱库,它的 compile_restricted() 在 AST 层面做额外限制:禁止未赋值就使用的变量名、拦截不安全的属性访问等。它还要求定义 _getattr__getitem__getiter_ 这些钩子函数,在编译后的代码里每次属性访问都会调用这些钩子。

我们自定义了 plugin_guarded_getattr

1
2
3
4
5
6
def plugin_guarded_getattr(obj, name):
if name in BLOCKED_ATTRIBUTE_NAMES:
raise AttributeError(f'"_getattr_" not allowed to access dangerous name "{name}"')
if name.startswith('_') and name not in ALLOWED_DUNDER_METHODS:
raise AttributeError(f'"_getattr_" not allowed to access private name "{name}"')
return getattr(obj, name)

这样即便 runtime_guard 因为某种原因没生效,RestrictedPython 编译出的字节码本身也不会让插件访问危险属性。

第六层:文件系统权限控制——os.open 也不放过

文件访问的守卫覆盖得非常细致,不光拦了 open(),连 os.openos.listdiros.statos.renameos.removeos.mkdiros.chmodos.chown、甚至 os.scandir 都换成了带权限检查的版本。

更狠的是,连 dir_fd 参数都给封了。为什么?因为 dir_fd 可以让插件打开一个文件描述符,然后通过它访问本来没权限的路径——一种典型的目录穿越手法:

1
2
3
4
5
6
7
8
9
10
def _guarded_os_open_factory(original_open):
def _guarded_os_open(path, flags, mode=0o777, *args, **kwargs):
ctx = _plugin_context.get()
if ctx is None:
return original_open(path, flags, mode, *args, **kwargs)
if kwargs.get('dir_fd') is not None:
plugin_id = ctx.get('plugin_id', 'unknown')
raise PermissionError(f"插件 {plugin_id} 无权通过 dir_fd 访问文件")
# ... 黑白名单检查
return _guarded_os_open

插件能访问的文件路径只有两类:自己 data/ 目录下的,或者在 file.path:/some/dir 权限里明确声明的。路径穿越也没用,因为用了 os.path.realpath() 做规范化,再用 os.path.commonpath() 做前缀校验:

1
2
3
4
def _is_path_inside(base_path: str, candidate_path: str) -> bool:
base_real = os.path.realpath(base_path)
candidate_real = os.path.realpath(candidate_path)
return os.path.commonpath([base_real, candidate_real]) == base_real

第七层:数据库访问隔离——只能动自己的表

插件的数据库操作走了专门的 PluginDatabaseAdapter,限制了表名必须在 plugin_<plugin_id>_ 命名空间下。SQL 执行也做了多语句注入检查和 DDL 权限控制。插件要是想 DROP TABLE auth_user,门都没有。

contextvars 贯穿全局——并发安全的关键

七层防线虽然各管各的,但它们共享一套身份识别机制——contextvars.ContextVar_plugin_context 保存了当前插件的 ID、权限列表、数据目录路径。_trusted_internal_import 标记当前是否在框架信任代码的栈帧里。_third_party_caller 标记当前 import 调用是否来自第三方库。

1
2
3
4
_plugin_context: ContextVar = ContextVar('plugin_context', default=None)
_policy_path_resolution: ContextVar = ContextVar('policy_path_resolution', default=False)
_trusted_internal_import: ContextVar = ContextVar('trusted_internal_import', default=False)
_third_party_caller: ContextVar = ContextVar('_third_party_caller', default=False)

ContextVar 在 asyncio 里天然线程安全(每个协程有独立上下文),但 copy_context() 被用在同步插件的线程池执行里,确保上下文正确传播:

1
2
3
4
5
6
7
async def execute_safe_async(self, plugin_id, func, *args, ...):
ctx = copy_context()
# 同步函数放进线程池,带着自己的上下文跑
result = await asyncio.wait_for(
loop.run_in_executor(None, lambda: ctx.run(func, *args, **kwargs)),
timeout=timeout
)

七层未必够,但够了

这篇文章没有一个”结论”段告诉你”七层防线完美无缺”——那不诚实。Python 的沙箱历史上被绕过太多次了,真正安全的容器隔离还得靠 OS 级别的 namespace/cgroup。

但在这个场景下,插件是由管理员主动安装的(不是随便谁都能上传),七层防线加上权限审批流程,绝大多数攻击向量都堵住了。更重要的是,这套方案的收益很大——免去进程管理的复杂性,Django 的工作进程直接执行插件代码,延迟低、资源开销小,对单机十多节点这种规模来说刚刚好。

如果哪天插件生态做大了、开始有不信任的第三方插件,那就该考虑给每个插件开子进程或者容器了。但在那之前,这七道门够用。

欢迎关注我的其它发布渠道