项目做到一定规模,你会发现自己写了一大堆”约定”:别用 getattr 分发消息、签名策略必须从注册表读、新动作必须加 ActionSpec、路径拼接必须走 safe_join……这些约定写在文档里,新人来了未必看,自己过两个月也未必记得。

ServerManager 的 Node 端踩过这个坑。重构的时候在项目规范文档里写了一堆规则——“不许用 getattr 分发动作”“签名策略必须读注册表”“所有路径必须走 safe_join”。写了没两天,PR 里照样出现 getattr(self, action_name)。不是开发者不守规矩,是规矩只在文档里,代码不会自己拒绝你。

后来把关键约束写成了脚本,CI 一跑就知道有没有人犯规。这篇聊聊这些脚本都检查什么、怎么检查,以及为什么这么做而不是那么做。

check_action_registry:Panel 和 Node 的动作表对不齐?

ServerManager 是双仓库架构——Panel(Django 后端)和 Node(Python 客户端)各自维护一份 WebSocket 动作注册表。Panel 定义了 77+ 个动作,Node 也注册了对应的一批。两边必须完全同步,否则 Panel 下发了一个 Node 不认识的动作(或者更糟的,Node 忽略了一个应该签名验证的动作),线上就会出问题。

check_action_registry.py 干的事很简单——从两边各 import 注册表,然后找差异:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--panel", required=False)
parser.add_argument("--report-only", action="store_true")
args = parser.parse_args()
node_actions = get_all_actions()
missing = []
if args.panel:
panel_root = Path(args.panel)
sys.path.insert(0, str(panel_root))
from apps.node_manager.protocol import get_all_actions as panel_get_all
panel_actions = panel_get_all()
for action, spec in panel_actions.items():
direction = spec.direction.value if isinstance(spec.direction, Enum) else str(spec.direction)
if direction in {"server_to_node", "bidirectional"} and action not in node_actions:
missing.append(action)
print(json.dumps({
"node_actions": len(node_actions),
"missing_panel_to_node_actions": missing,
}, ensure_ascii=False, indent=2))
return 0 if args.report_only or not missing else 1

有几个设计决策值得说:

只检查 Panel→Node 方向的缺失server_to_nodebidirectional 的动作是 Panel 一定会下发给 Node 的。如果 Node 没注册这种动作,那消息来了就没人接。但 Node→Panel 方向的动作是 Node 主动发的,Panel 那边多几个不影响——Node 不会因为 Panel 不认识自己的上报消息而崩溃。

退出码不是 0 就是 1。CI 脚本最实用的信号就是退出码——0 表示通过,1 表示有缺失。--report-only 选项让脚本变成只输出 JSON 不退出 1,用于仪表盘展示。

import 真实注册表而不是读 JSON。脚本直接从 Python 模块 import get_all_actions(),拿到的是运行时的真实数据。如果改成读静态 JSON 文件,那 JSON 可能是过期的。

在 CI 里加一行:

1
- python scripts/check_action_registry.py --panel /path/to/ServerManager

有人改了 Panel 的注册表忘了同步 Node,CI 就红了。不用再靠人肉 grep 两个仓库来对齐了。

check_architecture:谁在偷偷用 getattr?

重构前的代码大量使用 getattr(self, action_name) 来分发 WebSocket 消息。重构后改成了显式的 MappingProxyType 分发表,但谁能保证以后没人偷偷摸摸又写回 getattr

check_architecture.py 里有个 AST 扫描器干的就是这事:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def dynamic_usage():
findings = []
for path in sorted(ROOT.glob("**/*.py")):
if any(part in {".venv", "__pycache__", ".git"} for part in path.parts):
continue
try:
tree = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
except (SyntaxError, UnicodeDecodeError):
continue
rel = path.relative_to(ROOT).as_posix()
for node in ast.walk(tree):
if isinstance(node, ast.Call):
name = dotted(node.func).rsplit(".", 1)[-1]
if name in {"getattr", "setattr", "hasattr", "__import__"}:
findings.append({"path": rel, "line": node.lineno, "symbol": name})
return findings

遍历项目里所有 .py 文件(排除 .venv__pycache__ 等),解析成 AST,找所有 Call 节点,看函数名是不是 getattrsetattrhasattr__import__

找到就报出来,不管合理不合理。为什么不做白名单?因为白名单本身就可能过时。与其维护一个”这里的 getattr 是允许的”列表,不如全部报出来让人工判断。CI 输出里有文件名和行号,扫一眼就知道哪些是新加的需要处理,哪些是历史遗留需要迁移。

dotted() 函数把嵌套属性调用还原成完整路径:

1
2
3
4
5
6
7
def dotted(node):
if isinstance(node, ast.Name):
return node.id
if isinstance(node, ast.Attribute):
parent = dotted(node.value)
return f"{parent}.{node.attr}" if parent else node.attr
return ""

这样 os.path.join 会被识别为 os.path.join,而不是只识别出 join。脚本只取最后一个部分(.rsplit(".", 1)[-1]),因为约束是针对内置函数名的,不需要完整路径。

check_security:安全矩阵骨架

目前 check_security.py 只是个骨架——输出一个空的 JSON:

1
print(json.dumps({"status": "security matrix scaffold", "cases": []}, ensure_ascii=False, indent=2))

但从名字和项目规划文档就能看出意图:安全矩阵检查脚本要做的,是对每个安全相关的动作(签名、路径、命令、文件操作、Docker 操作)验证其测试覆盖率。

实现思路已经在 check_test_matrix.py 里有了雏形——扫描 tests/ 目录下的所有测试文件。安全矩阵脚本最终要做的是:从 ActionSpec 注册表里读出所有标记为 critical=True 的动作,然后检查对应的测试文件里有没有安全边界测试。

比如 execute:run_shell 标记为 critical,那 tests/security/test_command_policy.py 里必须有”合法命令通过”和”危险命令被拦”两组测试用例。缺了就报出来。

这是一个”让测试覆盖率和安全策略显式绑定”的思路——不是靠人记哪些操作需要安全测试,而是注册表和数据驱动。

check_test_matrix:测试文件清单

这个脚本更简单,就是列出所有测试文件:

1
2
3
root = Path(__file__).resolve().parents[1]
tests = sorted(path.relative_to(root).as_posix() for path in root.glob("tests/**/*.py"))
print(json.dumps({"tests": tests}, ensure_ascii=False, indent=2))

单独看没什么用,但配合其他脚本就变得有价值了。比如 check_security.py 要看”有没有 test_docker_security.py“,就得知道 tests/ 下到底有哪些文件。check_test_matrix.py 提供了这份清单。

export_action_registry:注册表导出

export_action_registry.py 把注册表导出成 JSON 文件:

1
2
actions = [asdict(spec) for spec in get_all_actions().values()]
Path(args.output).write_text(json.dumps(actions, ensure_ascii=False, indent=2), encoding="utf-8")

这东西看起来多此一举——注册表不是已经在代码里了吗?为什么要导出 JSON?

用途有两:

Panel 端的协议文档。导出的 JSON 可以丢进 Panel 的文档或 API 描述里,让前端开发者知道有哪些动作可以用,不需要翻 Node 端的 Python 代码。

CI 脚本的数据源check_action_registry.py 是 import Python 模块来比较的。但如果 Panel 是另一个语言写的(比如 Node.js),import 不进来怎么办?导出 JSON 文件作为接口协议的中间格式,两边各 import 自己的注册表、和 JSON 对比,就不需要语言层面的互操作了。

为什么不用 linter 和 type checker?

有人会问:这些约束为什么不放到 mypy 或 ruff 的规则里?

getattr 检查——ruff 确实可以配置规则禁止某些函数调用,但 getattr 在很多场景下是合理的(比如动态配置读取)。你要的是”在 WebSocket 分发代码里不用 getattr”,不是”全局禁止 getattr”。AST 扫描可以根据上下文(文件路径、类名)做更精细的判断,linter 的规则做不了这么细。

注册表同步——这是跨仓库的检查,需要同时 import 两个项目的代码。mypy 和 ruff 都是在单仓库范围内跑的,没有”对比两个仓库的数据结构”这个能力。

安全矩阵——需要结合运行时数据(ActionSpec 注册表)和文件系统状态(测试文件),这超出了 linter 的能力范围。

linter 和 type checker 解决的是语言层面的问题——类型错误、未定义变量、代码风格。架构约束解决的是设计层面的问题——“新动作必须注册到分发表”“签名策略必须从注册表读取”“路径必须走 safe_join”。这些问题不是代码写错了,是代码违反了设计意图。

怎么把这些脚本串进 CI

GitHub Actions 里加一个 job:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
check-architecture:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- run: pip install -r requirements.txt
- name: Check dynamic usage
run: python scripts/check_architecture.py dynamic-usage --report-only
- name: Check action registry sync
run: |
git clone --depth 1 https://github.com/org/ServerManager.git /tmp/panel
python scripts/check_action_registry.py --panel /tmp/panel
- name: Export action registry
run: python scripts/export_action_registry.py --output reports/action_registry.json

三个步骤:扫描动态用法、对比注册表、导出注册表。任何一个非零退出码都会让整个 job 变红。

从”写文档”到”写检查”的思维转变

写文档是”请各位注意”,写检查脚本是”不遵守就 CI 红”。两种方式的差异不是写不写的问题,是执行力的区别。

ServerManager 的这批脚本还不是完善的——check_security.py 还是个骨架,check_test_matrix.py 只是列文件。但框架已经搭好了:注册表是数据、AST 分析是工具、CI 是执行者。每加一个新约束,不用在文档里多写一行”请注意”,只要在对应的脚本里加一个检查条件,CI 就替你守着这条规矩。

项目是活的,规矩也应该是活的。用脚本守规矩,改脚本就能改规矩,比改文档靠谱——因为文档改了没人看,脚本改了 CI 下次跑就生效。

ServerManager 做的是远程管理节点——浏览器连 Panel,Panel 通过 WebSocket 连 Node,Node 在本地操作 Docker。听起来链路不长,但要把镜像、容器、网络、卷、配置、Compose 六类资源的四十多个操作都可靠地搬上 WebSocket,里面的弯弯绕并不少。

第一个问题:六类资源怎么统一路由?

Docker 的 API 表面上一堆方法,但抽象出来就六类——image、container、network、volume、config、compose。每个操作都是”资源.动作”的组合,比如 docker:image:pulldocker:container:start

消息到达 Node 的分发器后,按冒号拆分:

1
2
3
4
5
6
7
8
9
10
11
async def dispatch_docker(self, action: str, payload, raw_message: dict) -> bool:
if not await self.verify_if_required(action, payload, raw_message, label="Docker动作"):
return False
await self.docker_manager_action(action, payload)

async def docker_manager_action(self, action: str, data):
parts = action.split(":")
if len(parts) != 3 or parts[0] != "docker":
logger.warning(f"非法Docker动作: {action}")
return
await self.client.services.docker_manager.handle(parts[1], parts[2], data or {})

三段式 docker:resource:operation,第二段是资源类型,第三段是操作名。DockerManager.handle(resource, operation, payload) 收到后分发到 __image__container 等内部方法。为什么不用一个大的 if-else?因为六类资源的方法各有各的参数校验和返回格式,塞在一个方法里三四百行,拆开每个资源一个方法,代码结构清晰得多。

安全分层:读操作畅通,写操作要令牌

不是所有 Docker 操作都一样危险。docker:image:listdocker:container:kill 的安全级别显然不同。

dockerManager.py 里,安全检查分层了:

1
2
3
4
5
6
7
8
9
10
11
12
13
async def handle(self, resource: str, operation: str, payload: dict):
if not self.__safe_enabled("docker_manager", False):
return await self.__send_error(payload, resource, operation, "Docker管理未启用")

read_operations = {"list", "inspect", "history", "stats", "logs", "info", "df", "version", "events", "get", "validate"}
is_read = operation in read_operations or (
resource == "compose" and operation in {"list", "ps", "logs", "config"}
)

if not is_read:
required_flag = RESOURCE_FLAGS.get(resource)
if required_flag and not self.__safe_enabled(required_flag, False):
return await self.__send_error(payload, resource, operation, "该Docker写操作未启用")

总开关 docker_manager 控制能不能用 Docker 功能。如果打开了,读操作(list、inspect、logs 之类)不需要额外权限——让监控数据流出去问题不大。

但写操作要看更细粒度的权限:拉镜像要 docker_image_manage,搞容器要 docker_container_manage,Compose 要……好吧,目前没有单独的 docker_compose_manage,用的是 docker_manager 总开关。不过关键是所有安全检查都在业务逻辑之前,不会出现”先操作了再检查权限”的情况。

RESOURCE_FLAGS 这个映射表也是显式声明的:

1
2
3
4
5
6
7
RESOURCE_FLAGS = {
"image": "docker_image_manage",
"container": "docker_container_manage",
"network": "docker_network_manage",
"volume": "docker_volume_manage",
"config": "docker_config_manage",
}

跟前面那篇 Action Registry 的思路一脉相承——安全策略是数据,不是散落在代码里的字符串常量。

同步转异步:不能把事件循环堵住

Docker 的 Python SDK 是同步的——client.images.list()client.containers.get() 这些调用都会阻塞。但 Node 端跑的是 asyncio 事件循环,一个 docker.images.pull("python:3.12") 可能要跑几十秒。要是直接在协程里调,整个 WebSocket 的接收循环就卡住了。

解决办法是 asyncio.to_thread

1
2
result = await asyncio.to_thread(self.__execute, resource, operation, payload)
await self.__send_success(payload, resource, operation, result)

一行把同步调用甩到线程池里,事件循环不阻塞,其他 WebSocket 消息照常接收和处理。to_thread 内部用的是 loop.run_in_executor(None, func),默认线程池大小够用了。

镜像自动拉取:拉还是不拉?

创建容器的时候有个细节——用户指定的镜像本地不存在怎么办?

1
2
3
4
5
6
7
8
9
10
11
12
def __ensure_container_image(self, client, image: str, payload: dict):
try:
client.images.get(image)
return None # 本地有,直接用
except (ImageNotFound, NotFound):
pass

if not self.__coerce_bool(payload.get("pull_missing"), True):
raise RuntimeError(f"节点本机缺少镜像 {image},请先拉取镜像或启用自动拉取")
if not self.__safe_enabled("docker_image_manage", False):
raise RuntimeError("节点本机缺少镜像,自动拉取需要启用 docker_image_manage")
return self.__pull_image_reference(client, image, payload)

三步走:先查本地有没有,没有的话看 pull_missing 参数(默认 True),再看有没有 docker_image_manage 权限。 Defaults toward 便利(自动拉),但安全策略随时可以拦住。

这里有个容易忽略的问题——pull_missing 默认是 True。如果你没关掉这个选项,创建容器的操作可能会隐式触发一个耗时很长的镜像拉取。在自动化场景下这是利好,但如果网络不好或者镜像仓库连不上,就会有一个请求卡在那很久。docker_manager 没有设全局超时来兜底拉镜像操作,这是个需要改进的点。

Compose 管理:在节点上用 docker compose CLI

Docker Compose 的操作没走 Python SDK——因为 SDK 对 Compose 的支持太弱了。取而代之的是直接调 docker compose 命令行:

1
2
3
4
5
6
7
8
9
10
11
12
13
def __compose_command(self, project_dir: Path, command: list[str],
json_lines: bool = False, check: bool = True):
if not shutil.which("docker"):
raise RuntimeError("当前系统未安装docker命令")
result = subprocess.run(
["docker", "compose", *command],
cwd=str(project_dir),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=300,
)
# ...

配置文件存在 data/docker_compose/<project>/compose.yml 下,项目名就是目录名。这里有个路径安全检查:

1
2
3
4
5
6
7
8
def __compose_project_dir(self, project: str, create: bool = False) -> Path:
project = str(project or "").strip()
if not project.replace("-", "").replace("_", "").isalnum():
raise ValueError("Compose项目名只能包含字母、数字、下划线和短横线")
base = COMPOSE_BASE_DIR.resolve()
project_dir = (base / project).resolve()
if base != project_dir and base not in project_dir.parents:
raise ValueError("Compose项目路径非法")

项目名只允许字母数字下划线短横线,然后做路径穿越检查。同样是那条铁律——用户输入的路径不能逃出预设的根目录。

容器执行:docker exec 的远程版

docker:container:exec 是个值得单独说的操作。它允许通过 Panel 在节点上的容器里执行命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def __container_exec(self, payload: dict):
command = payload.get("command") or payload.get("cmd")
if isinstance(command, str):
command = command.strip()
if not command:
raise ValueError("容器执行命令不能为空")
result = self.__get_container(payload).exec_run(
command,
stdout=True, stderr=True,
tty=self.__coerce_bool(payload.get("tty"), False),
privileged=self.__coerce_bool(payload.get("privileged"), False),
user=payload.get("user") or "",
workdir=payload.get("workdir") or None,
environment=payload.get("environment") if isinstance(payload.get("environment"), dict) else None,
)

privileged=True 这个选项是故意暴露的——有些运维场景确实需要在特权容器里操作。但 docker:container:execSIGNED 级别的动作,签名验不过去连接直接断。安全边界不在参数白名单里,而在 HMAC 签名和应用层权限控制里。

Docker 配置管理:写 daemon.json 的正确姿势

docker:config:update 直接改 /etc/docker/daemon.json,听起来很危险。代码里做了几层保护:

1
2
3
4
5
6
7
8
9
10
11
12
13
def __config(self, operation: str, payload: dict):
if operation == "update":
if os.name == "nt":
raise RuntimeError("Windows节点暂不支持修改daemon.json")
data = self.__daemon_config_data(payload)
DAEMON_CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
if DAEMON_CONFIG_PATH.exists():
backup = DAEMON_CONFIG_PATH.with_suffix(f".json.bak")
shutil.copy2(DAEMON_CONFIG_PATH, backup)
DAEMON_CONFIG_PATH.write_text(
json.dumps(data, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8"
)

先备份旧配置(.json.bak),再写入新配置。万一新配置写坏了,至少有个回退点。

reloadrestart_service 更是通过 subprocess.run 调用 systemctl reload dockersystemctl restart docker,这些是标准的服务管理命令,不是什么黑魔法。但它们也需要 docker_config_manage 权限开关开着才能执行。

返回值的 JSON 安全化

Docker SDK 返回的对象经常包含不能直接 JSON 序列化的类型(比如 datetimeImage 对象等)。所有返回到 WebSocket 的数据都要过一遍 JSON 安全化:

1
2
3
4
5
6
@staticmethod
def __json_safe(value: Any):
try:
return json.loads(json.dumps(value, default=str))
except Exception:
return str(value)

default=str 是个万能兜底——任何不能序列化的类型都转成字符串。虽然丢失了原始类型信息,但保证了 WebSocket 消息永远不会因为序列化报错而中断。比起精心设计每个类型的序列化器,这种简单粗暴的做法在运维场景下更靠谱——最怕的不是数据格式不完美,而是操作到一半因为序列化炸了。

错误处理的分层

Docker 操作可能出各种错——镜像不存在、容器名冲突、网络被占用、权限不够……DockerManager 的错误处理分了三层:

参数校验层——在调 Docker SDK 之前就拦住明显不合法的输入:

1
2
3
4
5
6
7
@staticmethod
def __required(payload: dict, *keys: str) -> Any:
for key in keys:
value = payload.get(key)
if value not in (None, ""):
return value
raise ValueError(f"缺少参数: {'/'.join(keys)}")

__required 支持多个候选 key——container/container_id/id 都能匹配,这在对齐 Panel 和 Node 的数据结构差异时很实用。

Docker SDK 异常层——ImageNotFoundAPIError 这些异常会在 handle 方法的外层 try-catch 里被统一捕获,然后通过 __send_error 发回 Panel。

未知异常兜底——任何 Exception 都会被捕获,error 消息发回 Panel,不会导致节点断连或者 WebSocket 消息循环崩溃。

四十条操作的完整画面

把六类资源的操作列出来,就是这么大一张表:

资源操作安全
imagelist, inspect, history, pull, remove, tag, prunepull/remove/tag/prune 需 docker_image_manage
containerlist, inspect, stats, logs, create, start, stop, restart, kill, pause, unpause, remove, rename, update, exec写操作需 docker_container_manage
networklist, inspect, create, remove, connect, disconnect, prune写操作需 docker_network_manage
volumelist, inspect, create, remove, prune写操作需 docker_volume_manage
configget, validate, update, reload, restart_service写操作需 docker_config_manage
composelist, save, ps, logs, config, up, down, start, stop, restart, pull, build, rm, delete写操作需 docker_manager

每一个操作都是 WebSocket action → 安全检查 → Docker SDK 调用 → JSON 安全化 → WebSocket response 的标准流程。看起来枯燥,但这种一致性的好处是出了问题能快速定位——任何 Docker 操作失败,查日志就能看到 action、resource、operation、error 四个字段,不用猜。

关于直播室和手术室的关系——WebSocket 是直播室,Docker SDK 是手术室。直播室里消息飞来飞去出了问题可以打断、可以重试,手术室里出了问题就是真出了问题。这中间的缓冲层——安全检查、参数校验、错误回传——每一层都是保护手术室不出事故的双保险。

ServerManager 节点端的代码最早是用 getattr(self, action_name) 来分发 WebSocket 消息的——消息里带个 action 字段,拿到什么就叫什么方法。这写法简洁、直觉、上手飞快,项目初期 Rapid Prototyping 的时候没毛病。

但项目从”能跑”到”多人协作”再到”上线运维”,这种分发方式就开始处处漏风了。这篇文章聊聊从草台班子怎么一步步走到 Action Registry,以及中间踩过的那些坑。

草台班子时代:getattr 什么都答应

早期的分发代码长这样:

1
2
3
4
5
6
async def handle_action(self, action: str, data: dict):
handler = getattr(self, f"handle_{action}", None)
if handler:
await handler(data)
else:
logger.warning(f"Unknown action: {action}")

看着挺干净,但问题一个接一个冒出来:

安全问题getattr 会匹配到任何同名属性——你不小心写了个 handle_close_all_connections 的内部方法,外部发一条 action: "close_all_connections" 就能触发。属性名即接口,而且这个接口没有任何访问控制,任何人只要知道了方法名就能调用。

签名检查散落各处。每个关键操作方法内部都要自己检查签名:

1
2
3
4
5
6
7
async def handle_execute_command(self, data):
sign_data = data.get("_sign")
if not sign_data:
return await self.close_ws(3001)
if not verify_signature(self.node_token(), "execute:run_shell", data, sign_data):
return await self.close_ws(3001)
# ... 然后才是业务逻辑

每个 handler 开头三行几乎一模一样,复制粘贴了四十多次。哪天签名逻辑要改,就得 grep 全项目一个个修。

安全开关也是手工 ifexecute_command 要查 config['safe']['execute_command']connect_terminal 要查 config['safe']['connect_terminal']……安全开关和动作名称之间的映射只存在于开发者的脑子里,没有代码强制对应。

没人知道有哪些动作。新加一个动作,写个方法就行了。删一个动作?没人知道它在哪里被引用过。Panel 那边下发了节点已经不支持的动作?节点默默忽略,连个日志都不打。

第一步:显式注册表——从”有什么方法”变成”声明了什么接口”

重构的第一步是把隐式的 getattr 分发换成显式的注册表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class NodeActionDispatcher:
def __init__(self, client):
self.client = client
self.actions = MappingProxyType({
"node:close": self.close,
"node:init_config": self.init_node_config,
"terminal:create_session": self.terminal_create_session,
"terminal:close_session": self.terminal_close_session,
"terminal:input": self.terminal_input,
"terminal:resize": self.terminal_resize,
"execute:run_shell": self.execute_shell,
"file_manager:list": self.file_manager_list,
"file_manager:read": self.file_manager_read,
# ...
})

改动不大但意义不小——MappingProxyType 让这个分发表在运行时不可修改,而且方法到动作的映射变成了一目了然的显式声明。现在要加一个动作,必须显式写进表里,不可能通过发个消息就意外调到内部方法。

分发的逻辑也清晰多了:

1
2
3
4
5
6
async def dispatch(self, action: str, payload, raw_message: dict) -> bool:
handler = self.actions.get(action)
if handler is None:
logger.error(f"Undefined action: {action}")
return True
# ...

注意 return True——未知动作只打个 warning 然后继续运行,不会断连接。版本迭代的时候,新 Panel 下发了一个老 Node 不认识的动作,通常不是断连的理由。

第二步:ActionSpec——一个动作不只是个名字

注册表有了,但动作还只是字符串到函数的映射。签名要不要查?安全开关是哪个?这动作是从 Panel 发向 Node、还是 Node 报给 Panel、还是双向?这些信息全靠开发者记住——谁也说不全。

于是有了 ActionSpec

1
2
3
4
5
6
7
8
@dataclass(frozen=True)
class ActionSpec:
action: str
direction: ActionDirection # "panel_to_node" | "node_to_panel" | "bidirectional" | ...
critical: bool = False # 需要 HMAC 签名吗?
protocol_version: int = 1
visibility: Literal["public_protocol", "internal"] = "public_protocol"
safe_flag: str = "" # 对应 config.toml 里的安全开关名

frozen=True 是深思熟虑的——注册表建好就不该被运行时修改,这是安全策略的数据基础,不是配置项。

注册用的是批量注册的写法,看着直观:

1
2
3
4
5
6
_bulk(("execute:run_shell",), "panel_to_node", critical=True, safe_flag="execute_command")
_bulk(("terminal:create_session", "terminal:close_session", "terminal:input", "terminal:resize"),
"panel_to_node", critical=True, safe_flag="connect_terminal")
_bulk(("node:heartbeat", "node:upload_running_data"), "node_to_panel")
_bulk(("docker:image:pull", "docker:container:create", "docker:container:stop"),
"panel_to_node", critical=True, safe_flag="docker_manager")

一眼就能看出来:哪些动作需要签名、哪些方向是 Panel→Node、对应哪个安全开关。信息从散落在各处变成集中声明,可审计、可交叉校验。

第三步:签名策略从手工 if 变成注册表驱动

之前每个 handler 要自己查签名,现在 needs_signature() 直接查注册表:

1
2
3
4
5
def needs_signature(action: str) -> bool:
spec = _ACTIONS.get(action)
if spec:
return spec.critical
return action.startswith("plugin:")

两行查完,不需要在每个 handler 里写 if not sign_data: return。验签的代码集中在 dispatcher 的一个方法里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def verify_if_required(self, action: str, payload, raw_message: dict,
label: str = "关键动作") -> bool:
if not needs_signature(action):
return True
sign_data = raw_message.get("_sign")
if not sign_data:
logger.warning(f"{label}缺少签名: {action}")
await self.client.close_current_ws(code=3001)
return False
if not verify_signature(self.client.node_token(), action, payload or {}, sign_data):
logger.warning(f"{label}签名验证失败: {action}")
await self.client.close_current_ws(code=3001)
return False
return True

一个方法管所有动作的验签,签名的逻辑不可能跟业务逻辑对不上——因为签名策略是注册表声明的,不是复制粘贴的。

安全开关的检查也类似——注册表里有 safe_flag,dispatcher 就能自动映射:

1
2
3
4
5
# 文件管理类操作
async def file_manager_list(self, data):
if not self.client.safe_enabled("file_manager", True):
return
await self.client.services.file_manager.list_dir(data or {})

safe_flag="file_manager" 对应 config.toml 里的 [safe] 段。只要注册表写对了,安全开关永远跟动作是对得上的。

第四步:协议校验脚本——让 Panel 和 Node 的动作表保持同步

ServerManager 是两个独立仓库——Panel 一个,Node 一个。ActionSpec 注册表在 Node 端定义,Panel 那边也有一份协议定义。两边怎么保证一致?

答案是脚本:

1
python scripts/check_action_registry.py --panel /Volumes/Projects/ServerManager

这个脚本会同时解析两边的注册表,然后对比:

  • Panel 定义了但 Node 没注册的动作(下发了无法处理)
  • Node 注册了但 Panel 没有的动作(死代码或者版本不同步)
  • 两边 critical 标记不一致的动作(安全策略有差异)
  • 两边 direction 不一致的动作(协议语义冲突)

脚本在 CI 里跑,任何一方改了注册表没同步另一方,CI 就会红。这不是什么高深的技术,但在多人协作、双仓库维护的场景下,自动化的协议一致校验防止了太多低级失误。

动作分发的三层结构

重构后分发的完整流程长这样:

1
2
3
4
5
6
7
8
9
10
11
收到 WS 消息 → 解析 action 字符串

查 ActionRegistry:这个动作存在吗?→ 不存在,打日志,继续

查 ActionSpec:需要签名吗?→ 需要,验签失败,断连(code=3001)

查 ActionSpec:需要安全开关吗?→ 需要,开关关着,忽略

查 MappingProxyType 分发表:调用对应 handler

handler 内部解析参数,执行业务逻辑

四个决议点,每个都基于注册表里的声明数据,没有一处需要开发者”记住”什么。想加一个新动作?往注册表加一行 ActionSpec,往分发表加一个方法引用,签个名,配好安全开关——CI 脚本会告诉你有没有遗漏。

为什么不用 getattr 之外的其他方案?

有人可能会问:getattr 不好,那用字符串到函数的字典不就行了?为什么还要搞 ActionSpec?

字典是最小可行的改进,它解决了”误触发内部方法”的问题,但解决不了后面三个:签名策略怎么查?安全开关怎么映射?方向怎么校验?这些信息如果只存在于代码的各个角落,那和维护 getattr 版本没太大区别——只是从”散落在方法里”变成了”散落在常量里”。

ActionSpec 的核心价值在于协议即数据。一个 frozen=True 的 dataclass 既是注册表,又是文档,又是校验依据。代码从注册表读取策略,脚本从注册表校验一致性,安全开关从注册表映射动作——都指向同一个数据源。这才是正儿八经的 single source of truth。

从草台到正规,到底值不值?

坦白说,项目初期用 getattr 完全没问题。十几个动作的时候,谁管什么注册表——写个方法就完事了。

但动作数上了四五十,涉及签名、安全开关、双向通信、插件扩展,再加上 Panel 和 Node 两个仓库同步维护,getattr 那套就开始还债了——安全检查遗漏、签名策略不一致、新旧版本动作对不上……这些问题每一个都够你查半天。

重构花了一周,但省下来的是以后每次动动作都有注册表替你守门,CI 脚本替你校验,代码阅读者一眼就能看明白”这个动作需要签名、方向是 Panel→Node、对应 docker_image_manage 开关”。

从”写代码只要能跑”到”写代码要让别人能安全地改动”,中间差的就是这种数据驱动的声明式设计。

前面两篇分别聊了 WebSocket 节点管理的工程问题和 HMAC 签名防篡改。但有个场景一直没细说——用户在浏览器里点”连接终端”,填入 SSH 密码,这条密码怎么安全地送到远端节点?

最直觉的做法是浏览器把密码明文扔给 Panel,Panel 再明文通过 WebSocket 转给节点。TLS 能防窃听没错,但中间的 Panel 服务端日志、节点日志、甚至 WebSocket 消息调试窗口里,密码都在那裸奔。换个思路,要是有人拿到了 Panel 数据库的访问权,存下来的日志里全是明文密码——想想都瘆人。

所以在 ServerManager 里,终端密码走的是一条完全不同的路:加密票据

密码不走明文,走票据

整个流程可以这么理解——密码是一张舞会的入场券,只有持票人能在指定时间段内入场,过了时间作废,而且这张票本身不包含任何有意义的原文信息。

实际做了这些:

  1. Panel 用 Fernet 对称加密,把密码和时间戳打包成票据
  2. 票据通过 WebSocket 送到节点
  3. 节点用同样的密钥解密票据,取出密码
  4. 校验时间戳——票据超过 45 秒就过期
  5. 密码用完即丢,不落盘、不进日志

这样就算有人中间截获了 WebSocket 消息,他能看到的也就是一串 Fernet token——没有密钥解不开,而且 45 秒后这事就翻篇了。

Fernet 密钥从哪来?

Fernet 是 cryptography 库里的对称加密方案,它的密钥是 32 字节的 base64url 编码字符串。但 ServerManager 里没有额外的密钥分发机制——密钥从 node_token 派生:

1
2
3
4
5
6
7
8
from cryptography.fernet import Fernet, InvalidToken
import base64
import hashlib

def _derive_fernet_key(node_token: str) -> bytes:
"""用 node_token 的 SHA-256 派生 Fernet 密钥"""
digest = hashlib.sha256(node_token.encode('utf-8')).digest()
return base64.urlsafe_b64encode(digest)

为什么选 node_token?因为 Panel 和 Node 都已经知道它——Node 注册时 Panel 就发下来了,HMAC 签名用的也是同一个 token。不需要额外的密钥协商,不需要密钥交换协议,现有的信任链就够了。

但要注意一点:node_token 本身千万不要以任何形式出现在 WebSocket 消息、日志、或 API 响应里。它只参与 HMAC 签名计算和 Fernet 密钥派生,结果是不可逆的摘要或密文。这是整条信任链的根——泄露了它,签名和加密就都形同虚设了。

Panel 端:签发票据

Panel 收到浏览器的终端连接请求后,先生成票据,再把票据塞进 WebSocket 消息发给 Node:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from cryptography.fernet import Fernet
import hashlib
import base64
import json
import time

def issue_terminal_ticket(node_token: str, password: str, ttl: int = 45) -> str:
key = base64.urlsafe_b64encode(hashlib.sha256(node_token.encode()).digest())
fernet = Fernet(key)
payload = json.dumps({
"password": password,
"expires_at": time.time() + ttl,
})
return fernet.encrypt(payload.encode()).decode()

expires_at 写进票据内部——Fernet 自身有时间戳(默认 TTL 60 秒),但那是 Fernet 层面的过期校验。我们再加上业务层的 expires_at,是为了做到更精确的控制,也为了在必要时给出更友好的错误信息。

票据生成后在 WebSocket 消息里长这样:

1
2
3
4
5
6
7
8
9
10
11
{
"action": "terminal:create_session",
"data": {
"host": "10.0.1.5",
"port": 22,
"username": "root",
"password_encrypted": "gAAAAABl...",
"index": 3
},
"_sign": { "timestamp": "...", "nonce": "...", "signature": "..." }
}

password_encrypted 就是 Fernet 票据。明文密码从来没出现在这条消息里。

顺带说一句,terminal:create_session 是个 SIGNED 级别的动作——没有 _sign 字段,节点直接断连。就算有人想伪造终端连接请求,没有 node_token 算不出合法签名。

Node 端:解密并校验

节点收到消息后的处理在 action_dispatcher.py 里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def decrypt_terminal_password(self, payload: dict | None) -> str:
payload = payload if isinstance(payload, dict) else {}
encrypted = payload.get("password_encrypted")
if not encrypted:
return ""
try:
import base64
import hashlib

token = self.client.node_token().encode("utf-8")
key = base64.urlsafe_b64encode(hashlib.sha256(token).digest())
raw = Fernet(key).decrypt(str(encrypted).encode("utf-8"), ttl=45)
data = self.client.decode_json(raw.decode("utf-8"))
if data.get("expires_at", 0) and float(data["expires_at"]) < self.client.now():
raise ValueError("终端密码票据已过期")
return str(data.get("password") or "")
except (InvalidToken, ValueError, TypeError) as e:
logger.warning(f"终端密码票据解密失败: {e}")
return ""

几层防线叠在一起:

Fernet 的 ttl=45。Fernet 在解密时自动检查 token 自身的时间戳是否在 45 秒内生成。这个校验是加密层面的——攻击者没法篡改 Fernet 内部的时间戳,因为 Fernet token 带有 HMAC。

业务层的 expires_at。即使 Fernet 校验通过(说明票据是最近生成的),还得看业务时间戳有没有过期。双重时间校验,谁也不敢偷懒。

解密失败返回空字符串,而不是报错断连。这跟签名验证的策略不一样——签名验证失败直接 close(3001),因为那意味着连接可信度出了大问题。但密码解密失败,可能只是 Panel 版本太老没加密、或者票据格式不对,没必要把整个连接都干掉。终端连接创建会失败,但节点继续运行。

为什么不用 RSA 非对称加密?

有人可能会问:既然 Panel 要加密、Node 要解密,为什么不用 RSA——Panel 拿 Node 的公钥加密密码,Node 用私钥解密?这样连节点都不用存对称密钥了。

实际上在这个场景下,RSA 反而更麻烦:

密钥分发问题。RSA 需要每个节点生成密钥对、把公钥注册到 Panel。对 ServerManager 这种”一个 Panel 管一堆 Node”的架构来说,多了一个密钥生命周期要管理。

性能没必要。密码就几十个字节,RSA 的性能优势在这里毫无意义。对称加密的加解密速度远快于 RSA,虽然几十字节数据差异可以忽略。

信任模型相同。node_token 已经是 Panel 和 Node 共享的密钥,再用 RSA 生成一对密钥只是增加了一个等效的信任根,安全边际并没有提升——node_token 泄露,RSA 私钥当然也可以从节点拿走。

Fernet 自身的优势。Fernet token 自带时间戳和 HMAC,这意味着即使攻击者截获了 token,他既不能解密(没有密钥),也不能延长时间窗口(篡改时间戳会导致 HMAC 校验失败)。RSA 加密后的密文没有这些校验机制。

所以在这个”两端共享密钥”的场景下,Fernet 是更自然的选择。

日志安全:密码绝不上墙

decrypt_terminal_password 的日志只记录”解密失败”这个事实,不记录密文原文、不记录解密结果、不记录 node_token。而在 WebSocket 消息的日志脱敏里,password_encrypted 也会被替换成 ***

1
2
3
4
5
6
SENSITIVE_LOG_KEYS = {
'password', 'passwd', 'pwd', 'password_encrypted',
'token', 'node_token', 'client_token',
'access', 'refresh', 'authorization',
'signature', '_sign', 'secret',
}

passwordpassword_encrypted 都在脱敏列表里。就算有人翻遍了所有日志,他也找不到密码的任何痕迹——不管是明文还是密文。

回头看整条链路

把视角拉远一点,终端密码的安全传输其实是之前那套 HMAC 签名体系的自然延伸:

  • 签名保证的是”这条消息没被篡改”——密码票据里没有任何东西能被替掉
  • 加密保证的是”看不懂”——截获了也解不开
  • 时间窗保证的是”过期作废”——45 秒后就算解开了也没用
  • 日志脱敏保证的是”不留痕”——事后审计找不到密码

四层叠加,就算其中某一层出了问题(比如 TLS 配置失误被中间人截获),其他层仍然能兜住。安全从来不是单一技术能搞定的事,得像洋葱一样一层一层的。

ServerManager 的一个常见操作是:Panel 决定往某个节点下发一个文件——可能是安装包、配置文件、或者插件资源包。文件存在 MinIO 对象存储里,节点需要通过某种方式拿到它。

听起来简单——发个 URL 让节点下载不就完了?但把这事拆开看,从”Panel 发一个下载指令”到”节点硬盘上多了一个完整的文件”,中间的安全缝合比你想象的要多。

第一步:授权——不能谁都能从 MinIO 拿东西

MinIO 本身有自己的认证体系,但直接把 MinIO 的 access key 给节点?那节点就能拿任何存储桶里的任何文件了。权限太大了。

所以 Panel 先要做一个授权接口。节点带着自己的 JWT(就是 HMAC 签名那篇里拿到的 access token)去问 Panel:“我有个下载任务,给我一个能用的 URL”。Panel 验证 JWT 合法性、检查下载权限、然后向 MinIO 请求一个预签名 URL——这个 URL 绑定了特定的文件、过了有效期就废,而且只能 GET。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def __get_download_url(self, download_task: DownloadTaskConfig) -> tuple[str | None, str | None]:
headers = {"Authorization": f"Bearer {self.__access_token}"}
params = {"task": download_task.task_id, "file": download_task.file_id}
async with self.__session.get(
self.__url,
params=params,
headers=headers,
timeout=aiohttp.ClientTimeout(total=15)
) as response:
data = await response.json()
if response.status != 200 or data.get("status") != 1:
raise RuntimeError(f"下载授权失败({response.status}): {data.get('msg')}")
payload = data.get("data") or {}
return payload.get("url"), payload.get("file_name")

这段代码说明几件事:

JWT 只用来请求授权,不直接访问 MinIO。JWT 的权限是”以某节点的身份向 Panel 请求下载 URL”,而不是”直接读取对象存储”。

预签名 URL 是一次性的桥梁。MinIO 签发 URL 时绑定了 bucket、object key、过期时间。节点拿到后直接向 MinIO 发 GET——这个过程中 Panel 不参与,不转发,不代理,完全没有性能瓶颈。

file_name 从授权接口拿回。MinIO 存储的文件名可能是一个 SHA-256 哈希值,真实文件名由 Panel 维护。节点从授权接口同时拿到 URL 和真实文件名,两份信息同源,不存在不一致。

第二步:参数校验——恶意输入在门口就拦住

下载请求进入节点代码的第一件事不是去拿 URL,而是校验参数:

1
2
3
4
5
6
7
8
9
class DownloadTaskConfig:
def __init__(self, task_id: str, file_id: str, file_name: str | None,
save_path: str, check_hash: bool, file_hash: str):
self.task_id = validate_uuid(task_id, "task_id")
self.file_id = validate_sha256(file_id, "file_id")
self.file_name = file_name
self.save_path = save_path
self.check_hash = check_hash
self.file_hash = validate_sha256(file_hash or file_id, "file_hash")

validate_uuid 只允许 [0-9a-fA-F-]{1,64} 的格式,validate_sha256 必须是 64 位十六进制。这意味着如果你往 task_id 里塞 ../../etc/passwd 这种路径穿越字符,在校验层直接就弹回去了。

然后是保存路径的解析:

1
2
3
4
5
6
7
def resolve_download_path(self, requested_path: str | None) -> str:
base = safe_data_dir("download", create=True)
if self.safe_enabled("allow_custom_download_path", False) and requested_path:
return str(Path(requested_path).expanduser().resolve())
if not requested_path or os.path.isabs(str(requested_path)):
return str(base)
return str(safe_join(base, requested_path, create_dir=True))

三种情况:

  1. 默认行为——所有文件都下到 data/download/
  2. 开启 allow_custom_download_path(默认关闭)——允许 Panel 指定绝对路径。这东西明显有风险,所以安全开关默认关着
  3. Panel 给了相对路径——用 safe_join 拼接,防路径穿越

safe_join 的实现在 security.py 里,它对路径逃逸做的是零容忍:

1
2
3
4
5
6
7
8
def safe_join(base: str | os.PathLike, *parts: str | os.PathLike, create_dir: bool = False) -> Path:
base_path = Path(base).resolve()
target = base_path.joinpath(*[str(part) for part in parts]).resolve()
if target != base_path and base_path not in target.parents:
raise SecurityError(f"路径越界: {target}")
if create_dir:
target.mkdir(parents=True, exist_ok=True)
return target

Path.resolve() 会展开所有符号链接和 ..,然后检查目标路径是否在 base 里面。../../etc/passwd 这种老把戏在这里完全不管用——resolve() 之后它是 /etc/passwd,不在 data/download/ 的父路径里,直接 SecurityError

第三步:下载——边下边哈希

到了真正下载的环节,代码用了流式读取而不是一次性加载:

1
2
3
4
5
6
7
8
9
10
11
12
13
async with self.__session.get(download_url, timeout=aiohttp.ClientTimeout(total=300)) as response:
if response.status != 200:
# ... 错误处理
filename = sanitize_filename(download_task.file_name or authorized_file_name, download_task.file_id)
target_path = safe_join(save_dir, filename)
sha256 = hashlib.sha256()
with open(target_path, 'wb') as file:
while True:
chunk = await response.content.read(8192)
sha256.update(chunk)
if not chunk:
break
file.write(chunk)

这段代码暗藏了几个不明显的决策:

边下边算哈希。每次 read(8192) 之后立即 sha256.update(chunk),不用等全下完再算。这意味着内存里永远只有 8KB 的数据量——就算文件有几个 G,内存也扛得住。

文件名再次被 sanitize_filename 过滤

1
2
3
4
5
6
7
def sanitize_filename(filename: Any, fallback: str) -> str:
fallback = os.path.basename(str(fallback or "download"))
filename = os.path.basename(str(filename or fallback))
filename = filename.replace("\x00", "").strip()
if not filename or filename in {".", ".."}:
filename = fallback or "download"
return filename[:255]

os.path.basename() 剥离所有路径前缀,replace("\x00", "") 去掉空字节攻击,... 被替换成 fallback,最终截断到 255 字符。就算有人把文件名设成 ../../../etc/crontab,到这里也变成了 crontab

safe_join 是最后一道保险。万一 sanitize_filename 漏了什么(虽然不应该),目标路径仍然会被检查是否在 save_dir 下。

第四步:哈希校验——下载完的文件对不对?

下载完了不算完,还得确保文件完整:

1
2
3
4
5
6
7
8
sha256 = sha256.hexdigest()
if download_task.check_hash and str(download_task.file_hash) != str(sha256):
return self.__send_websocket_action('file_download:failure', {
'task': download_task.task_id,
'file': download_task.file_id,
'error_type': "文件哈希校验失败",
'error_content': f"{download_task.file_hash} != {sha256}"
})

哈希校验是可选的(check_hash 参数控制),但一旦开启就是硬校验——哈希对不上就算下载失败。这里用的是 SHA-256,跟 file_id 本身的格式一致,因为 Panel 那边存文件时就用 SHA-256 做了去重标识。

第五步:并发控制和队列——别把节点搞挂了

多文件同时下的时候,不能任所有下载一起跑。代码里有个简单的线程池控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class DownloadFileUtil:
__max_download_thread: int = 4
__download_threads: int = 0
__thread_lock: Lock()

def __handle_download_queue(self):
while True:
if self.__download_queue.empty():
break
if self.__download_threads >= self.__max_download_thread:
time.sleep(0.3)
continue
task = self.__download_queue.get()
asyncio.run_coroutine_threadsafe(
self.__download(task), self.__loop
)
self.__increment_download_threads()

最多 4 个并发下载,超了就 0.3 秒轮询等一个空位。锁保护的是计数值 _download_threads,下载完了 _decrement_download_threads

这里有个有意思的选择——用了 asyncio.run_coroutine_threadsafe 把下载协程扔到事件循环里跑,而不是在下载线程里直接 await。因为下载方法本身是 async 的(aiohttp 的流式读取只有异步版),但队列调度在同步线程里跑的。这是 aiohttp + threading 混合时的常见模式,因为 WebSocket 收到下载指令的时候,处理线程不能被阻塞着等下载完。

第六步:状态汇报——Panel 得知道下完了没有

下载结果通过 WebSocket 回报给 Panel。成功和失败走同一个 file_download:success/failure 动作:

1
2
3
4
5
6
7
8
9
10
11
12
# 成功
self.__send_websocket_action('file_download:success', {
'task': download_task.task_id,
'file': download_task.file_id,
})
# 失败
self.__send_websocket_action('file_download:failure', {
'task': download_task.task_id,
'file': download_task.file_id,
'error_type': "文件哈希校验失败",
'error_content': f"{download_task.file_hash} != {sha256}"
})

WS 消息的发送本身也走了队列——__websocket_message_queue。 Почему?因为下载在后台线程里跑,但 websocket_send_json 是异步的。后台线程不能直接 await,得把消息塞进队列,让专门的线程从队列里取消息,再通过 asyncio.run_coroutine_threadsafe 扔回事件循环发出去。

这是一个很实用的模式——生产者(下载线程)和消费者(WS 发送线程)解耦,中间用一个 Queue 做缓冲。关连接的时候 close() 方法往两个队列各塞一个 None,让消费者线程优雅退出。

整条链路一份清单

从一个下载指令到文件落盘,整个安全缝合点是这样的:

环节防护对象手段
参数入口UUID/SHA-256 格式validate_uuid, validate_sha256
路径拼接路径穿越safe_join + resolve() + 父路径检查
文件名净化../, NUL, .sanitize_filename + basename
下载授权无权限访问 MinIOJWT + 预签名 URL
传输过程窃听/篡改TLS + 流式读取
文件完整性损坏/被替换SHA-256 流式校验
自定义路径写到危险位置allow_custom_download_path 默认关
并发保护资源耗尽4 线程上限 + 队列
状态汇报Panel 不知道结果WS 成功/失败回调

每一层单独看都算不上多高明,但叠在一起就构成了完整的纵深防御。最怕的不是某个环节被突破,而是压根没想到那个环节需要防护。这条链路里每一层都是踩过坑之后加上的——不是过度设计,是血泪换来的经验。

分布式系统里有个经典问题:业务数据写数据库成功了,但通知消息还没发出去——进程就挂了。或者反过来,消息先发了,业务数据入库失败——消费端收到了一个不存在的事件。

Django 项目里这问题尤其常见。你在一个 transaction.atomic() 里做了这两件事:

1
2
3
4
5
6
with transaction.atomic():
order = Order.objects.create(...)
channel_layer.group_send(f"order_{order.id}", {
"type": "order.created",
"order_id": order.id,
})

看似没问题,但如果 group_send 执行了而事务回滚了——消费端收到一个已经被回滚的 order_id,去数据库里查,查不到。反过来,如果事务提交了但 group_send 之前进程崩了——消息就丢了,消费端永远不知道这个订单被创建了。

ServerManager 的 UnitOfWorkOutboxPublisher 把这个问题彻底解决了。

UnitOfWork:transaction.atomic() 的门面

UnitOfWork 只有一层薄包装:

1
2
3
4
5
6
7
8
9
10
class UnitOfWork:
@contextmanager
def atomic(self) -> Iterator[None]:
with transaction.atomic():
yield

def on_commit(self, callback: Callable[[], object]) -> None:
transaction.on_commit(callback)

default_unit_of_work = UnitOfWork()

你没看错,就这。但 on_commit() 是关键——它注册的回调只在事务成功提交后才执行。回滚了?回调不执行。这让业务代码可以在事务里”预约”事件,不必担心事件提前泄露。

OutboxPublisher:写 DB 才是真的发了

1
2
3
4
5
6
7
8
9
@dataclass(frozen=True)
class OutboxEvent:
event_id: str
event_type: str
aggregate_type: str
aggregate_id: str
idempotency_key: str
payload: Any
status: Literal["pending", "sent", "failed"] = "pending"

OutboxEvent 也是 frozen dataclass——不可变,创建后谁也改不了。

核心方法是 publish_after_commit()

1
2
3
4
5
6
7
8
9
10
11
12
class OutboxPublisher:
def __init__(self, *, use_persistent: bool = True):
self._use_persistent = use_persistent
self._memory_queue: deque[OutboxEvent] = deque(maxlen=10000)
self._handlers: dict[str, list[Callable]] = {}

def publish_after_commit(self, event: OutboxEvent) -> OutboxEvent:
if self._use_persistent:
transaction.on_commit(lambda: self._persist_event(event))
else:
transaction.on_commit(lambda: self._enqueue_memory(event))
return event

transaction.on_commit() 是 Django 提供的功能——注册的回调在当前事务提交后才执行。所以 self._persist_event(event) 只会在数据库写入成功后被调用。

持久模式的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def _persist_event(self, event: OutboxEvent) -> None:
try:
OutboxRecord.objects.update_or_create(
idempotency_key=event.idempotency_key,
defaults={
"event_id": event.event_id,
"event_type": event.event_type,
"aggregate_type": event.aggregate_type,
"aggregate_id": event.aggregate_id,
"payload": event.payload if isinstance(event.payload, dict) else {"data": str(event.payload)},
"status": "pending",
},
)
except Exception:
logger.exception("Failed to persist outbox event: %s", event.event_id)

update_or_create 配合 idempotency_key——相同幂等键的事件不会重复创建。即使 publish_after_commit() 被调用两次(比如业务代码写了两次同一个事件),数据库里也只有一条记录。

OutboxRecord:持久化的保证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class OutboxRecord(models.Model):
event_id = models.CharField(max_length=64, unique=True, primary_key=True)
event_type = models.CharField(max_length=128, db_index=True)
aggregate_type = models.CharField(max_length=128)
aggregate_id = models.CharField(max_length=128, db_index=True)
idempotency_key = models.CharField(max_length=128, unique=True)
payload = models.JSONField()
status = models.CharField(max_length=16, choices=[("pending", "Pending"), ("sent", "Sent"), ("failed", "Failed")], default="pending", db_index=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
retry_count = models.IntegerField(default=0)

class Meta:
db_table = "core_outbox_event"
indexes = [
models.Index(fields=["status", "created_at"], name="outbox_status_created_idx"),
]

联合索引 (status, created_at) 让 worker 可以高效地查出所有 status=pending 的记录,按创建时间排序消费。idempotency_key 的唯一约束防止重复。

Worker 消费:从 pending 到 sent

Worker 的逻辑很直白——轮询 pending 状态的事件,处理后标记为 sent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def consume_pending(self, limit: int = 100) -> list[OutboxEvent]:
if not self._use_persistent:
return list(self._memory_queue)
records = list(
OutboxRecord.objects.filter(status="pending")
.order_by("created_at")[:limit]
)
return [r.to_event() for r in records]

def mark_sent(self, event_id: str) -> None:
if self._use_persistent:
OutboxRecord.objects.filter(event_id=event_id).update(status="sent")
# 内存模式直接从队列里移除
self._memory_queue = deque(
(e for e in self._memory_queue if e.event_id != event_id),
maxlen=self._memory_queue.maxlen,
)

消息处理成功就 mark_sent,事件从 pendingsent,完成一次投递生命周期。

失败重试:三次机会

万一 Worker 处理失败了?mark_failed()retry_failed() 配合使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def mark_failed(self, event_id: str) -> None:
if self._use_persistent:
OutboxRecord.objects.filter(event_id=event_id).update(
status="failed",
retry_count=models.F("retry_count") + 1,
)

def retry_failed(self, max_retries: int = 3) -> list[OutboxEvent]:
if not self._use_persistent:
return []
records = list(
OutboxRecord.objects.filter(
status="failed",
retry_count__lt=max_retries,
).order_by("created_at")[:100]
)
ids = [r.event_id for r in records]
OutboxRecord.objects.filter(event_id__in=ids).update(status="pending")
return [r.to_event() for r in records]

失败后 retry_count 加 1,状态改为 failed。定时任务调用 retry_failed(3) 把重试次数不到 3 次的失败事件捞出来,状态改回 pending,交给 Worker 重新处理。超过 3 次的?留在 failed 状态,等人工排查。

内存模式:开发环境的简单回退

不是所有场景都需要持久化——开发环境可能跑的是 SQLite,起 Worker 太重了。use_persistent=False 切到内存模式:

1
2
3
4
5
6
7
8
9
10
11
def _enqueue_memory(self, event: OutboxEvent) -> None:
self._memory_queue.append(event)
self._dispatch_memory(event)

def _dispatch_memory(self, event: OutboxEvent) -> None:
handlers = self._handlers.get(event.event_type, [])
for handler in handlers:
try:
handler(event)
except Exception:
logger.exception("Handler failed for event: %s", event.event_id)

内存模式用 deque(maxlen=10000) 存事件,同时直接调注册的 handler。不写数据库,不启动 Worker,事务里 on_commit 触发后同步执行。当然,进程挂了就没了——开发环境可以接受。

注册 handler:

1
2
def register_handler(self, event_type: str, handler: Callable[[OutboxEvent], None]) -> None:
self._handlers.setdefault(event_type, []).append(handler)

DomainEventBus 类似,但 Outbox 的 handler 处理的是投递层面的事(发 WebSocket、调 webhooks 等),而 EventBus 的 handler 处理的是业务逻辑层面的响应。

业务代码怎么用

一个完整的例子——创建集群后发送事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from apps.core.persistence.outbox import default_outbox_publisher, OutboxEvent
from apps.core.persistence.unit_of_work import default_unit_of_work
from apps.core.contracts.auth import AuthPrincipal
import uuid
from datetime import datetime

def create_group(name, user_principal: AuthPrincipal):
with default_unit_of_work.atomic():
group = Node_Group.objects.create(name=name)

# 事务内预约事件
default_outbox_publisher.publish_after_commit(OutboxEvent(
event_id=str(uuid.uuid4()),
event_type="group.created",
occurred_at=datetime.now(),
actor=user_principal,
payload={"group_id": group.id, "name": group.name},
aggregate_type="group",
aggregate_id=str(group.id),
idempotency_key=f"group.created:{group.id}",
))

# 事务提交后,OutboxPublisher 才把事件持久化
# Worker 捞取后发 WebSocket 通知

idempotency_keygroup.created:{group.id} 格式——同一个集群的创建事件只会有一条记录,即使 publish_after_commit 被意外调用两次。

为什么不用消息队列?

Redis 的 list 已经做了任务队列(apps/workers/task_queue.py),为什么 Outbox 还要用数据库?

因为数据库事务和消息队列是两个不同的保障域。Redis list 的 LPUSH 是即时操作,没法跟 Django 的 transaction.atomic() 绑定——如果事务回滚,已经 LPUSH 的消息没法自动撤回。但 OutboxRecord 是数据库表,跟业务数据在同一个事务里——要么一起提交,要么一起回滚。这是只有关系型数据库才能给的保证。

Redis 任务队列适合”即发即忘”的任务——发邮件、生成报表、下载文件。Outbox 适合”业务数据不丢,事件就不丢”的场景——订单创建、节点上线、集群删除这种核心业务事件。

两者不冲突,各管各的。

Django 的开发哲学是”快”——一个 models.py,一个 views.py,加几行 URL 配置,功能就上线了。这里面没有业务层的概念,Model 既管持久化又管业务逻辑,View 既管 HTTP 又管权限又管数据组装。小程序这么做没问题,但 ServerManager 的代码量滚到了十几万行,面条代码开始反噬了——改一个 WebSocket 动作的权限逻辑,得在 Consumer 里翻三个文件才能找到 auth 在哪里做的。

所以我们做了一次彻底的重构,把核心层(apps/core/)从业务代码里抽出来,用 frozen dataclass 做契约、用 command bus 做调度、用 event bus 做解耦、用 outbox 做可靠投递。下面把每一块的思路和为什么这么做聊一下。

契约先行:frozen dataclass

所有跨层传递的数据结构一律用 @dataclass(frozen=True) 定义成契约。frozen=True 不是摆设——它保证对象创建后不可变,不会有谁在半路偷偷改掉 user_id 或者塞一个新字段进去。

最核心的是 AuthPrincipal——一个统一的身份类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@dataclass(frozen=True)
class AuthPrincipal:
principal_type: Literal["user", "node", "download_ticket", "anonymous"]
user_id: int | None = None
username: str = ""
node_uuid: str | None = None
role_ids: tuple[int, ...] = ()
permission_codes: tuple[str, ...] = ()
is_superuser: bool = False
is_disabled: bool = False

@property
def is_node(self) -> bool:
return self.principal_type == "node"

def has_permission(self, code: str) -> bool:
return self.is_superuser or "all" in self.permission_codes or code in self.permission_codes

无论你是 HTTP 请求里的登录用户、WebSocket 连接上的节点、一次性下载票据、还是匿名访问——都用同一个 AuthPrincipal。中间件构造它,后续所有层只读它。不需要再判断”这个 request 是用户还是节点”,看 principal_type 就行。

同级的还有 RequestContext(携带 request_id、IP、语言等)和 DomainEvent(领域事件),都是 frozen 的。在代码规范里甚至有明确禁令:不可以在 DTO 上使用 getattr/setattr

为什么用 dataclass 而不是 Pydantic?因为 dataclass 是标准库、零依赖、快。Pydantic 的验证和序列化能力在这里用不上——这些契约对象只在进程内传递,不序列化。

模块清单:声明式的权限和路由注册

Django 项目长大的一个征兆就是 URL 配置变成一坨意大利面。urls.py 里几百行 path(),你搞不清哪个 URL 要什么权限、哪个 WebSocket 动作归哪个模块管。

重构后,每个模块用 ModuleManifest 声明自己的一切:

1
2
3
4
5
6
7
8
9
10
11
12
@dataclass(frozen=True)
class ModuleManifest:
module_id: str
label: str
url_prefix: str = ""
urls_module: str = ""
permissions: tuple[PermissionSpec, ...] = ()
permission_categories: tuple[PermissionCategorySpec, ...] = ()
routes: tuple[RouteSpec, ...] = ()
websocket_actions: tuple[WebSocketActionSpec, ...] = ()
dashboard_providers: tuple[DashboardProviderSpec, ...] = ()
scheduler_jobs: tuple[SchedulerJobSpec, ...] = ()

ModuleManifestRegistry 在启动时做全量校验——模块 ID 不能重复、URL 前缀不能冲突、权限码不能重复、路由引用的权限必须已声明、WebSocket 动作不能重复……

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ModuleManifestRegistry:
def validate(self) -> list[ManifestViolation]:
violations = []
seen_modules = set()
seen_permissions = set()
seen_routes = set()
seen_ws = set()
# ...
for manifest in self._manifests:
if manifest.module_id in seen_modules:
violations.append(ManifestViolation("DUP_MODULE", ...))
# 权限引用检查:路由声明的权限必须存在于某个模块的 permissions 里
for route in manifest.routes:
violations.extend(
_validate_requirement(route.permission, all_permissions, ...)
)
return violations

这样做的好处是权限变成了一张全局表,哪些模块有哪些权限、谁依赖谁,一目了然。代码审查不需要再去找零散的 @permission_required 装饰器。

Command Bus:把 WebSocket 消息调度收口

重构之前,业务代码直接调 channel_layer.group_send() 发 WebSocket 消息,散落在各个 view 和 consumer 里。重构后,所有到节点的命令走同一条路——NodeCommandBus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class NodeCommandBus:
def __init__(self, adapter: ChannelsNodeAdapter | None = None):
self._adapter = adapter or default_channels_node_adapter

async def send(self, command: NodeCommand) -> NodeCommandResult:
payload = command.payload
if is_dataclass(payload):
payload = asdict(payload)
event = {
"type": "send.message",
"action": command.action,
"request_id": command.request_id,
"payload": payload,
"requires_signature": command.requires_signature,
}
await self._adapter.send_to_node(command.node_uuid, event)
return NodeCommandResult(request_id=command.request_id, status="sent")

业务代码只跟 NodeCommandNodeCommandResult 打交道,不碰 Channels 的底层 API。好处在哪?

  1. 单一出口——所有发往节点的消息都经过 bus,日志、签名、权限检查统一处理
  2. 可测试——测试时 inject 一个 mock adapter 就行,不需要起 Channels layer
  3. 可追踪——每条命令都有 request_id,从产生到送达全链路可追踪

新代码规范很明确:不允许在 bus 之外直接调 channel_layer.group_send()

Event Bus:模块间松耦合

有了 Command Bus 管节点方向,模块之间的横向通知就要靠 Event Bus 了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class DomainEventBus:
def __init__(self):
self._subscribers: dict[str, list[Subscriber]] = defaultdict(list)

def subscribe(self, event_type: str, subscriber: Subscriber) -> None:
self._subscribers[event_type].append(subscriber)

def publish(self, event: DomainEvent) -> list[object]:
results = []
for subscriber in tuple(self._subscribers.get(event.event_type, ())):
results.append(subscriber(event))
for subscriber in tuple(self._subscribers.get("*", ())):
results.append(subscriber(event))
return results

简单到只有两个方法,但这就够了。* 通配符订阅让审计模块可以监听所有事件做日志,不需要每个事件都显式通知它。

事件也是 frozen dataclass:

1
2
3
4
5
6
7
8
9
10
@dataclass(frozen=True)
class DomainEvent:
event_id: str
event_type: str
occurred_at: datetime
actor: AuthPrincipal | None
payload: object
aggregate_type: str = ""
aggregate_id: str = ""
idempotency_key: str = ""

idempotency_key 是给消费端用的——同一个事件处理两次不会出事。这个后面 Outbox 模式会用到。

Unit of Work + Outbox:把事务做对

Django 的 transaction.atomic() 够用,但有个经典问题:业务逻辑在事务里写了一堆东西到数据库,事务提交前发了 WebSocket 消息。结果消息先到了消费端,消费端查数据库——还没提交呢,查不到。

UnitOfWork 极其简单,就是 transaction.atomic() 的薄封装:

1
2
3
4
5
6
7
8
9
10
class UnitOfWork:
@contextmanager
def atomic(self) -> Iterator[None]:
with transaction.atomic():
yield

def on_commit(self, callback: Callable[[], object]) -> None:
transaction.on_commit(callback)

default_unit_of_work = UnitOfWork()

但配合 OutboxPublisher 就有意思了:

1
2
3
4
5
6
7
class OutboxPublisher:
def publish_after_commit(self, event: OutboxEvent) -> OutboxEvent:
if self._use_persistent:
transaction.on_commit(lambda: self._persist_event(event))
else:
transaction.on_commit(lambda: self._enqueue_memory(event))
return event

on_commit 的回调在事务成功提交后执行。如果事务回滚了,事件也不会被persist——数据一致性天然保证。

OutboxPublisher 有两种模式:持久模式写入 OutboxRecord 数据库表,供 worker 消费;内存模式用 deque 做进程内分发,适合开发环境和测试。

1
2
3
4
5
6
7
8
9
@dataclass(frozen=True)
class OutboxEvent:
event_id: str
event_type: str
aggregate_type: str
aggregate_id: str
idempotency_key: str
payload: Any
status: Literal["pending", "sent", "failed"] = "pending"

Worker 消费后改状态为 sent;失败了改 failedretry_count 加 1。有 retry_failed(max_retries=3) 方法捞回失败的事件重试。整套机制保证了”业务操作成功则事件一定投递,业务操作失败则事件一定不投递”。

不只是代码,更是约束

这次重构最有价值的不是代码本身,而是写在项目规范文档里的约束规则:

  • DTO 必须用 frozen=True 的 dataclass
  • 不允许在 DTO 上使用 getattr/setattr
  • 发往节点的消息必须走 NodeCommandBus,禁止直接 channel_layer.group_send()
  • 所有跨模块事件走 DomainEventBus
  • 有副作用的操作必须包在 UnitOfWork.atomic()
  • 事件投递必须用 OutboxPublisher.publish_after_commit()

代码是人写的,人会犯错。但架构约束写进了开发规范,lint 也能检查,新来的人只要按规范走,就不会写出面条代码。

Django 不会告诉你”这里应该分层”——它甚至鼓励你把所有东西塞进 View。但当一个项目长到十几万行,分层就是刚需,不是可选。把契约、总线、事件、Outbox 这些概念从业务代码里提取出来放进 core/ 层,剩下的业务模块只用关心自己的事。

ServerManager 是一个服务器集群管理系统,有个野心不小的插件系统——让用户自己写 Python 代码跑在服务端,而且不用开子进程、不用容器隔离,直接在 Django ASGI 进程里跑。听起来就挺危险的,对吧?

这事的难点不在于”让插件跑起来”——exec() 就行——难点在于”阻止插件做不该做的事”。一个恶意插件能 import os 然后 os.system('rm -rf /'),能 __import__('subprocess'),能通过 obj.__class__.__bases__[0].__subclasses__() 链回溯到任意类型……Python 的动态性让沙箱成了猫鼠游戏。

折腾了大半年,最终在这套系统里落下了七层防线。每一层单独看都有绕过的可能,但叠在一起,攻击面就缩到很小了。

第一层:AST 静态校验——代码还没跑就先拦一轮

插件代码在执行之前,先过一遍 AST(抽象语法树)校验。ASTValidator 会遍历整棵语法树,找出所有的 importfrom ... import 语句,检查是否涉及被禁止的模块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ASTValidator:
BLOCKED_ATTRIBUTE_NAMES = {
'__class__', '__bases__', '__mro__', '__subclasses__',
'__globals__', '__code__', '__closure__', '__func__',
# ... 更多危险属性
}
BLOCKED_REFLECTION_FUNCTIONS = {'vars', 'dir', 'globals', 'locals'}

@staticmethod
def validate(source_code: str, filepath: str = '',
blocked_modules: Optional[Set[str]] = None) -> List[str]:
tree = ast.parse(source_code)
warnings = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
if _is_module_blocked(alias.name, blocked_modules):
warnings.append(f"{filepath}: 导入被禁止的模块: {alias.name}")
if isinstance(node, ast.Attribute):
if node.attr in ASTValidator.BLOCKED_ATTRIBUTE_NAMES:
warnings.append(f"{filepath}: 访问被禁止的属性: {node.attr}")
return warnings

subprocessos.systemctypes 这种直接能起shell的模块,在这一层就打回去了。同时也在 AST 层面拦截了对 __class____globals__ 这类反射属性的访问。

但这只是第一道门。字符串拼接能绕 AST——getattr(os, 'sys'+'tem') 这种写法 AST 看不出问题。所以还得往下加。

第二层:sys.meta_path 导入拦截——运行时也休想偷偷 import

AST 校验是在加载前做的,但插件运行时可能通过 __import__()importlib 动态导入。这就要靠 sys.meta_path 来拦截了。

_PluginImportFinder 注册在 sys.meta_path 的最前面,每次 Python 解释器执行 import 都会先过它这一关:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class _PluginImportFinder:
def find_spec(self, fullname, path, target=None):
ctx = _plugin_context.get()
if ctx is None:
return None # 非插件上下文,放行

plugin_id = ctx.get('plugin_id', 'unknown')
permissions = ctx.get('permissions', [])

# 关键模块(subprocess, os.system 等):直接报错
critical = get_critical_blocked_modules()
if _is_module_blocked(fullname, critical):
raise ImportError(f"插件无权导入模块: {fullname}")

# 根据权限列表过滤
blocked = get_blocked_modules_for_permissions(permissions)
if _is_module_blocked(fullname, blocked):
raise ImportError(f"插件无权导入被禁止的模块: {fullname}")

# 内部模块(apps.*, django.* 等):需要走白名单
root_module = fullname.split('.')[0]
if root_module in ('apps', 'middleware', 'consumers', 'util', 'django'):
if not self._is_safe_internal_import(fullname, permissions):
raise ImportError(f"插件无权直接导入: {fullname}")

return None # 放行

这里有个细节很关键——_trusted_internal_import 这个 ContextVar。框架自己的代码在替插件做事时(比如 BasePlugin.get_nodes()),需要临时打开信任通道。但这个通道只在框架代码的栈帧里有效,插件代码试图import还是会被拦。

另一个细节是 third_party_caller 的检测。当插件调用了 requests.get(),requests 库内部会调用 import——这时候调用栈里顶层是插件代码,但中间经过第三方库。系统通过 _is_plugin_import_caller() 判断调用源,如果第三方库有 package.import 权限就放行其内部 import,但不让插件借道。

第三层:运行时 builtins 篡改——把 eval、exec、getattr 全换成替身

即便前两层拦住了 import,插件还可以用 Python 内置函数搞事。eval()exec() 能执行任意字符串,getattr() 能绕过属性名检查,globals()locals() 能一览所有局部变量。

runtime_guard() 是一个上下文管理器,在插件执行前挨个替换 builtins 里的危险函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@contextlib.contextmanager
def runtime_guard():
patches = []

def patch(obj, attr, replacement):
original = getattr(obj, attr)
patches.append((obj, attr, original))
setattr(obj, attr, replacement)

# 直接封杀的:eval、exec、compile、breakpoint
for name in ('eval', 'exec', 'compile', 'breakpoint'):
patch(builtins, name, _blocked_runtime_call_factory(getattr(builtins, name), name))

# 属性访问守卫:getattr、setattr、delattr
patch(builtins, 'getattr', _guarded_getattr_factory(builtins.getattr))
patch(builtins, 'setattr', _guarded_setattr_factory(builtins.setattr))
patch(builtins, 'delattr', _guarded_delattr_factory(builtins.delattr))

# 反射函数封杀:globals、locals、vars、dir
for name in ('globals', 'locals', 'vars', 'dir'):
patch(builtins, name, _guarded_reflection_call_factory(getattr(builtins, name), name))

try:
yield
finally:
# 还原——从后往前还原,确保嵌套时不出问题
for obj, attr, original in reversed(patches):
setattr(obj, attr, original)

替换不是永久的——finally 块保证还原,而且用的是 reversed(patches) 确保嵌套场景不出问题。这也意味着框架代码在 runtime_guard 之外不会受影响。

被阻断的函数会检查调用来源:

1
2
3
4
5
6
7
8
def _blocked_runtime_call_factory(original_func, name: str):
def _inner(*args, **kwargs):
# 框架代码信任标记为 True,且调用栈顶部不是插件代码——放行
if _trusted_internal_import.get() and not _called_from_plugin_frame(caller_globals):
return original_func(*args, **kwargs)
# 否则直接报错
raise PermissionError(f"插件 {plugin_id} 无权调用危险函数: {name}")
return _inner

第四层:getattr/setattr 属性守卫——堵死反射绕行

前面说了 getattr(obj, '__class__') 是常见绕行手段。第三层把 getattr 换成了 _guarded_getattr,它会在每次调用时检查属性名是不是在黑名单里:

1
2
3
4
5
6
7
8
BLOCKED_ATTRIBUTE_NAMES = {
'__class__', '__bases__', '__mro__', '__subclasses__',
'__globals__', '__code__', '__closure__', '__func__',
'__self__', '__module__', '__dict__', '__defaults__',
'gi_frame', 'gi_code', 'cr_frame', 'cr_code', 'tb_frame',
'f_globals', 'f_locals', 'func_globals',
'modules', 'path_hooks', 'meta_path',
}

这串名单几乎涵盖了 Python 里所有能从对象逃逸到其他命名空间的属性。__class__.__bases__[0].__subclasses__() 这条经典沙箱逃逸路线,在第一步 .__class__ 就被拦住了。

第五层:RestrictedPython 编译——AST 级别的终极保险

RestrictPython 是 Zope 基金会的老牌沙箱库,它的 compile_restricted() 在 AST 层面做额外限制:禁止未赋值就使用的变量名、拦截不安全的属性访问等。它还要求定义 _getattr__getitem__getiter_ 这些钩子函数,在编译后的代码里每次属性访问都会调用这些钩子。

我们自定义了 plugin_guarded_getattr

1
2
3
4
5
6
def plugin_guarded_getattr(obj, name):
if name in BLOCKED_ATTRIBUTE_NAMES:
raise AttributeError(f'"_getattr_" not allowed to access dangerous name "{name}"')
if name.startswith('_') and name not in ALLOWED_DUNDER_METHODS:
raise AttributeError(f'"_getattr_" not allowed to access private name "{name}"')
return getattr(obj, name)

这样即便 runtime_guard 因为某种原因没生效,RestrictedPython 编译出的字节码本身也不会让插件访问危险属性。

第六层:文件系统权限控制——os.open 也不放过

文件访问的守卫覆盖得非常细致,不光拦了 open(),连 os.openos.listdiros.statos.renameos.removeos.mkdiros.chmodos.chown、甚至 os.scandir 都换成了带权限检查的版本。

更狠的是,连 dir_fd 参数都给封了。为什么?因为 dir_fd 可以让插件打开一个文件描述符,然后通过它访问本来没权限的路径——一种典型的目录穿越手法:

1
2
3
4
5
6
7
8
9
10
def _guarded_os_open_factory(original_open):
def _guarded_os_open(path, flags, mode=0o777, *args, **kwargs):
ctx = _plugin_context.get()
if ctx is None:
return original_open(path, flags, mode, *args, **kwargs)
if kwargs.get('dir_fd') is not None:
plugin_id = ctx.get('plugin_id', 'unknown')
raise PermissionError(f"插件 {plugin_id} 无权通过 dir_fd 访问文件")
# ... 黑白名单检查
return _guarded_os_open

插件能访问的文件路径只有两类:自己 data/ 目录下的,或者在 file.path:/some/dir 权限里明确声明的。路径穿越也没用,因为用了 os.path.realpath() 做规范化,再用 os.path.commonpath() 做前缀校验:

1
2
3
4
def _is_path_inside(base_path: str, candidate_path: str) -> bool:
base_real = os.path.realpath(base_path)
candidate_real = os.path.realpath(candidate_path)
return os.path.commonpath([base_real, candidate_real]) == base_real

第七层:数据库访问隔离——只能动自己的表

插件的数据库操作走了专门的 PluginDatabaseAdapter,限制了表名必须在 plugin_<plugin_id>_ 命名空间下。SQL 执行也做了多语句注入检查和 DDL 权限控制。插件要是想 DROP TABLE auth_user,门都没有。

contextvars 贯穿全局——并发安全的关键

七层防线虽然各管各的,但它们共享一套身份识别机制——contextvars.ContextVar_plugin_context 保存了当前插件的 ID、权限列表、数据目录路径。_trusted_internal_import 标记当前是否在框架信任代码的栈帧里。_third_party_caller 标记当前 import 调用是否来自第三方库。

1
2
3
4
_plugin_context: ContextVar = ContextVar('plugin_context', default=None)
_policy_path_resolution: ContextVar = ContextVar('policy_path_resolution', default=False)
_trusted_internal_import: ContextVar = ContextVar('trusted_internal_import', default=False)
_third_party_caller: ContextVar = ContextVar('_third_party_caller', default=False)

ContextVar 在 asyncio 里天然线程安全(每个协程有独立上下文),但 copy_context() 被用在同步插件的线程池执行里,确保上下文正确传播:

1
2
3
4
5
6
7
async def execute_safe_async(self, plugin_id, func, *args, ...):
ctx = copy_context()
# 同步函数放进线程池,带着自己的上下文跑
result = await asyncio.wait_for(
loop.run_in_executor(None, lambda: ctx.run(func, *args, **kwargs)),
timeout=timeout
)

七层未必够,但够了

这篇文章没有一个”结论”段告诉你”七层防线完美无缺”——那不诚实。Python 的沙箱历史上被绕过太多次了,真正安全的容器隔离还得靠 OS 级别的 namespace/cgroup。

但在这个场景下,插件是由管理员主动安装的(不是随便谁都能上传),七层防线加上权限审批流程,绝大多数攻击向量都堵住了。更重要的是,这套方案的收益很大——免去进程管理的复杂性,Django 的工作进程直接执行插件代码,延迟低、资源开销小,对单机十多节点这种规模来说刚刚好。

如果哪天插件生态做大了、开始有不信任的第三方插件,那就该考虑给每个插件开子进程或者容器了。但在那之前,这七道门够用。

在浏览器里连上一台远程服务器的桌面——这听起来像是个浏览器 RDP 客户端就能搞定的事,对吧?Apache Guacamole 就是干这个的:浏览器里跑 JavaScript,通过 guacd(Guacamole proxy daemon)把 RDP/VNC 协议翻译成 Guacamole 自己的协议,再通过 WebSocket 送到浏览器渲染。

但 ServerManager 的场景有个特殊需求——控制端(Panel)在 A 机房,被控节点(Node)可能在 B 机房甚至用户的内网里。guacd 放哪?放 Panel 这边,它直接连不上目标机器的 3389 端口;放 Node 那边,浏览器没法直接访问。

答案是一个独立 Node.js 网关进程,它不翻译协议——它做 TCP 隧道。guacd 还是在 Panel 这边跑,但它不连目标机器的 3389,而是连网关动态开的一个本地端口。网关再通过一个到 Node 的 WebSocket 连接,把 TCP 数据透传到 Node 本地,Node 再连 RDP/VNC。整个链路是这样的:

1
浏览器 ←WebSocket→ guacd ←TCP→ 网关 ←WebSocket(二进制帧)→ Node ←TCP→ 目标(3389/5900)

这套方案一共 490 行 JavaScript,但里面的门道不少。

第一层:Django 签发票据

远程桌面不是想开就开的。用户在浏览器点”远程桌面”按钮后,Django 要做三件事:

  1. 验证用户有远程桌面权限
  2. 创建一个会话记录,包含目标节点 UUID、协议类型(RDP/VNC)、目标端口等
  3. 把会话信息存进 Redis,生成一个一次性票据 token

票据验证是网关做的——网关拿到 token 后,会向 Django 的 /verify 接口发 POST 请求,Django 从 Redis 取出会话数据返回。网关和 Django 之间有一个共享密钥 REMOTE_DESKTOP_GATEWAY_VERIFY_SECRET,放在 HTTP 头 x-remote-desktop-secret 里:

1
2
3
4
5
6
7
8
9
10
11
12
async function verifyTicket(token, role) {
const headers = { 'content-type': 'application/json' };
if (VERIFY_SECRET) {
headers['x-remote-desktop-secret'] = VERIFY_SECRET;
}
const response = await fetch(VERIFY_URL, {
method: 'POST',
headers,
body: JSON.stringify({ token, role }),
});
// ...
}

为什么用 HTTP 验证而不是 WebSocket?因为网关和 Django 在同一内网,HTTP 调用简单可靠,没必要把简单的鉴权搞成 WebSocket 那么复杂。

第二层:Node.js 网关——TCP 中继的艺术

网关是整个系统的核心。它承担两个角色:管理 Node 的 WebSocket 连接,以及为 guacd 提供本地 TCP 端口。

连的是一个 Node,但一个 Node 可能同时开多个桌面——不同的协议、不同的分辨率、甚至同一个用户开了两个 RDP 会话。怎么办?每个 TCP 连接需要一个唯一标识。

答案在二进制帧格式里。当 Node 通过 WebSocket 发来二进制消息时,前 16 字节是 connection ID,后面的是 TCP 原始数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
ws.on('message', (message, isBinary) => {
if (isBinary) {
const frame = Buffer.isBuffer(message) ? message : Buffer.from(message);
if (frame.length <= 16) return; // 太短,忽略
const connectionId = frame.subarray(0, 16).toString('hex');
const socket = state.sockets.get(connectionId);
if (socket && !socket.destroyed) {
socket.write(frame.subarray(16));
}
return;
}
// 文本消息是控制信令...
});

反向也一样,当 guacd 连上本地的 TCP 端口后,网关把 TCP 数据包上 16 字节的 connection ID 头,用 WebSocket 二进制帧发给 Node:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
function forwardTcpToNode(state, socket) {
const connectionId = crypto.randomBytes(16).toString('hex');
state.sockets.set(connectionId, socket);

// 告诉 Node:新连接来了
sendJson(state.nodeWs, {
type: 'tcp_connect',
connection_id: connectionId,
host: state.session.target_host,
port: state.session.target_port,
protocol: state.session.protocol,
});

socket.on('data', (chunk) => {
const frame = Buffer.concat([Buffer.from(connectionId, 'hex'), chunk]);
state.nodeWs.send(frame, { binary: true });
});
}

一个 WebSocket 连接,多路 TCP 数据。这比给每个桌面会话开一个 WebSocket 连接要高效得多——减少了连接管理的复杂度,Node 端也只需要维护一个 WebSocket。

会话状态管理

每个远程桌面会话用一个 state 对象管理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
function ensureSession(sessionData) {
const sessionId = sessionData.session_id;
let state = sessions.get(sessionId);
if (!state) {
state = {
sessionId,
session: sessionData,
nodeWs: null, // Node 的 WebSocket 连接
tcpServer: null, // 为 guacd 开的本地 TCP 服务器
tcpPort: null, // TCP 端口号
sockets: new Map(), // connectionId → TCP socket 的映射
};
sessions.set(sessionId, state);
}
return state;
}

Node 连上来之后,网关为这个会话动态开启一个 TCP 服务器,绑定一个随机端口,然后把端口号告诉 Node:

1
2
3
4
5
6
7
8
9
10
11
12
async function prepareNodeRelay(state, ws) {
state.nodeWs = ws;
state.tcpServer = net.createServer((socket) => forwardTcpToNode(state, socket));
const address = await listen(state.tcpServer, RELAY_TCP_BIND_HOST, 0); // 端口 0 = 随机
state.tcpPort = address.port;
sendJson(ws, {
type: 'relay_ready',
session_id: state.sessionId,
tcp_host: state.tcpHost,
tcp_port: state.tcpPort,
});
}

端口 0 让操作系统分配一个空闲端口,避免了端口冲突。guacd 连上这个端口后,数据就走 guacd → TCP → 网关 → WebSocket → Node → TCP → 目标机器 这条路。

第三层:浏览器的连接票据

浏览器不能直接连 guacd——那样密钥就暴露在前端了。所以有一个 connect 接口,浏览器拿第一步验证过的 token 换取一个短命 connect_token:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async function handleConnectTicket(req, res) {
const body = await readJsonBody(req);
const sessionData = await verifyTicket(body.token, 'client');
const state = await waitForNode(sessionData.session_id);
const connectToken = crypto.randomBytes(24).toString('base64url');
const ticket = {
session: sessionData,
tcpHost: state.tcpHost,
tcpPort: state.tcpPort,
width: clampNumber(body.width, 1280, 640, 7680),
height: clampNumber(body.height, 720, 480, 4320),
dpi: clampNumber(body.dpi, 96, 72, 240),
expiresAt: Date.now() + CONNECT_TICKET_TTL * 1000, // 30 秒过期
};
connectTickets.set(connectToken, ticket);
setTimeout(() => connectTickets.delete(connectToken), CONNECT_TICKET_TTL * 1000);
return jsonResponse(res, 200, { status: 1, data: {
connect_token: connectToken,
expires_in: CONNECT_TICKET_TTL,
}});
}

注意 clampNumber——宽高和 DPI 都有边界限制。这不是摆设:如果恶意请求传入 99999 的宽度,Guacamole 会试图创建超大帧缓冲区,直接把 guacd 内存撑爆。

票据 30 秒自动过期,用完就销毁:

1
2
3
4
5
6
7
8
9
10
11
12
if (url.pathname === '/ws') {
const connectToken = url.searchParams.get('connect_token');
const ticket = connectTickets.get(connectToken);
if (!ticket || ticket.expiresAt <= Date.now()) {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
return;
}
connectTickets.delete(connectToken); // 一次性票据
const encryptedToken = encryptGuacToken(buildGuacConnection(ticket));
// ...
}

Guacamole token 加密

guacamole-lite 库需要 token 来建立到 guacd 的连接。这个 token 包含了目标机器的 host/port/分辨率/RDP 凭据等敏感信息,所以做了 AES-256-CBC 加密:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const CIPHER = 'AES-256-CBC';
const CRYPT_KEY = crypto.createHash('sha256')
.update(process.env.GUAC_TOKEN_KEY || VERIFY_SECRET || 'ServerManagerRemoteDesktopGateway')
.digest();

function encryptGuacToken(jsonData) {
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(CIPHER, CRYPT_KEY, iv);
let encrypted = cipher.update(JSON.stringify(jsonData), 'utf8', 'binary');
encrypted += cipher.final('binary');
return Buffer.from(JSON.stringify({
iv: Buffer.from(iv).toString('base64'),
value: Buffer.from(encrypted, 'binary').toString('base64'),
})).toString('base64');
}

密钥从环境变量 GUAC_TOKEN_KEY 取,没有的话回退到 VERIFY_SECRET,再没有就用硬编码默认值(当然生产环境绝不能这样)。每次加密都生成随机 IV,同样的明文加密出来的密文也不一样。

等待 Node 就绪

还有一个不得不说的细节。用户点”远程桌面”后,浏览器先请求 connect 票据。但这时候 Node 可能还没连上网关——毕竟 WebSocket 连接什么时候建起来不确定。waitForNode 函数处理了这个时序问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function waitForNode(sessionId) {
const current = sessions.get(sessionId);
if (current && current.nodeWs && current.tcpPort) {
return Promise.resolve(current); // Node 已就绪
}
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
reject(new Error('节点远程桌面转发未就绪'));
}, NODE_WAIT_MS); // 默认 15 秒超时

const waiters = nodeWaiters.get(sessionId) || [];
waiters.push({ resolve, reject, timer });
nodeWaiters.set(sessionId, waiters);
});
}

Node 连上来之后,wakeNodeWaiters 会唤醒所有等待这个会话的 Promise:

1
2
3
4
5
6
7
8
function wakeNodeWaiters(sessionId, state) {
const waiters = nodeWaiters.get(sessionId) || [];
nodeWaiters.delete(sessionId);
waiters.forEach(({ resolve, timer }) => {
clearTimeout(timer);
resolve(state);
});
}

这样不论浏览器和 Node 谁先到,都不会丢失请求。

为什么要三层?

为什么不直接让 guacd 连 Node?因为 guacd 是一个固定进程,它不知道哪个 Node 在哪、什么时候会连上来。网关扮演的是”动态中介”的角色:Node 一旦连上,网关就为它开一个本地端口、把端口信息告诉 Node,然后让 guacd 连这个端口。

为什么不让浏览器直接连 Node?因为 Node 在内网里,浏览器从公网根本连不到。

为什么用 Node.js 而不是 Python?因为在连接密集的场景下,Node.js 的事件循环比 Python 的 asyncio 更适合做大量 TCP 连接的中继。而且 guacamole-lite 本身就是 Node.js 生态的——没必要在 Python 里再包一层。

三层架构看起来多余,但每一层各司其职:Django 管认证和授权,网关管连接中继,guacd 管协议翻译。哪一层挂了都可以独立重启,不影响其他层。

管服务器的,监控数据是个大头——CPU 使用率、每核占用、内存、网络流量、GPU 温度和显存,每个节点每几秒就报一次。ServerManager 用了三层数据架构来存这些东西:PostgreSQL 管业务数据,Redis 缓实时状态,TDengine 存历史时序。

为啥要用三种数据库?PostgreSQL 的表结构是给”一个用户一条记录”准备的,不是给”一个节点每秒一条记录连写三个月”准备的。我们试过把监控数据全写 PostgreSQL,一个月后那张表上千万行,查询直接卡死。

TDengine 天生就是干这个的——超级表(stable)定义结构,子表(table)按节点自动分片,数据保留策略设个 KEEP 180d,过期数据自动清理,查询毫秒级返回。

超级表和子表:一个节点一张表

TDengine 的超级表和子表模型跟关系型数据库的”一张大表 ALL_IN_ONE”思路不同。打个比方:超级表是模板,子表是从模板按标签实例化出来的。每个节点创建自己的子表,标签就是节点 UUID:

1
2
3
4
5
6
7
8
9
10
11
12
-- 超级表,定义数据结构
CREATE STABLE IF NOT EXISTS st_node_usage (
ts TIMESTAMP,
cpu_usage INT,
memory_used BIGINT,
swap_used BIGINT,
disk_io_read BIGINT,
disk_io_write BIGINT,
loadavg_1m DOUBLE,
loadavg_5m DOUBLE,
loadavg_15m DOUBLE
) TAGS (node_uuid NCHAR(37));

写数据时,st_node_usage_<uuid_safe> 是子表名,<uuid_safe> 是 UUID 去掉连字符后的版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
async def write_node_usage(cls, node_uuid, cpu_usage, memory_used, ...):
node_uuid = _validate_node_uuid(node_uuid)
await ensure_node_subtables(node_uuid) # 确保子表存在
uuid_safe = str(node_uuid).replace('-', '_')
subtable = f"{TD_SUBTABLE_NODE_USAGE}_{uuid_safe}"
ts = int(timestamp.timestamp() * 1000) if timestamp else "NOW"
sql = (
f"INSERT INTO {subtable} VALUES "
f"({ts}, {cpu_usage}, {memory_used}, {swap_used}, "
f"{disk_io_read}, {disk_io_write}, "
f"{loadavg_1m}, {loadavg_5m}, {loadavg_15m})"
)
return await tdengine_client.ainsert(sql)

ensure_node_subtables() 会检查子表是否存在,不存在就创建。之所以不在启动时全建,是因为节点可能动态上下线,按需创建更合理。

批量写入:一条 SQL 插多行

单个指标分开写太浪费了。TDengine 支持一条 INSERT 语句写多张子表的数据,write_node_usage_batch() 就是这么干的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async def write_node_usage_batch(cls, node_uuid, cpu_usage, memory_used, ...,
cpu_cores, network_io, accelerators=None,
timestamp=None):
uuid_safe = str(node_uuid).replace('-', '_')
ts = int(timestamp.timestamp() * 1000) if timestamp else "NOW"

results = []

# 1. 整机使用率
subtable = f"{TD_SUBTABLE_NODE_USAGE}_{uuid_safe}"
sql = f"INSERT INTO {subtable} VALUES ({ts}, {cpu_usage}, ...)"
results.append(await tdengine_client.ainsert(sql))

# 2. 每核 CPU —— 合并成一条 INSERT
if cpu_cores:
core_inserts = []
for core_data in cpu_cores:
await ensure_cpu_core_subtable(node_uuid, core_data['core_index'])
core_subtable = f"{TD_SUBTABLE_NODE_CPU_CORE}_{uuid_safe}_{core_data['core_index']}"
core_inserts.append(f"{core_subtable} VALUES ({ts}, {core_data['usage']})")
sql = "INSERT INTO " + " ".join(core_inserts)
results.append(await tdengine_client.ainsert(sql))

# 3. 网络流量 —— 同理合并
if network_io:
net_inserts = []
for port_name, io_data in network_io.items():
await ensure_network_subtable(node_uuid, port_name)
port_safe = port_name.replace('.', '_').replace('-', '_')[:40]
net_subtable = f"{TD_SUBTABLE_NODE_NETWORK}_{uuid_safe}_{port_safe}"
net_inserts.append(f"{net_subtable} VALUES ({ts}, {io_data['bytes_sent']}, {io_data['bytes_recv']})")
sql = "INSERT INTO " + " ".join(net_inserts)
results.append(await tdengine_client.ainsert(sql))

# 4. 加速器(GPU/NPU)
if accelerators:
# ... 同理

return all(results) if results else True

TDengine 的 INSERT 语法允许同时插入多张子表的数据,用空格隔开就行。这样做减少了网络往返次数,一个有 16 核 CPU 和 2 个网口的节点,一次批量写入只需要 4 条 SQL(整机、CPU 核心、网络、加速器),而不是 19 条。

SQL 注入防护:UUID 和网卡名的白名单校验

子表名是拼接的(TDengine 的 REST API 不支持参数化查询),所以拼表名之前要做严格校验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
_UUID_RE = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.IGNORECASE)
_PORT_NAME_RE = re.compile(r'^[A-Za-z0-9_.\-]{1,40}$')

def _validate_node_uuid(node_uuid: str) -> str:
uuid_str = str(node_uuid or '').strip()
if not _UUID_RE.match(uuid_str):
raise ValueError(f"无效的 node_uuid 格式: {uuid_str}")
return uuid_str

def _validate_port_name(port_name: str) -> str:
name = str(port_name or '').strip()
if not _PORT_NAME_RE.match(name):
raise ValueError(f"无效的网卡名: {name}")
return name

UUID 必须是标准格式 xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx,网卡名只允许字母数字点连字符下划线,长度不超过 40。标签值通过 _escape_tdengine_string() 转义单引号。表名里的连字符换成下划线(TDengine 表名不允许连字符),长度限制在 60 字符。

这些校验不是做样子——在节点端就开始了(因为节点是跑在远端机器上的 Agent,数据也是它报上来的),但服务端再校验一遍是深度防御。

Redis 缓实时:查询先看缓存

历史数据查 TDengine,但”最近一次上报”的实时数据查 TDengine 就太慢了——你要 SELECT LAST_ROW(*) 扫一遍超级表。所以实时数据缓到 Redis 里,键名 node_usage_<uuid>,节点每次上报时更新:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@staticmethod
async def get_last_node_usage(node_uuid: str) -> Optional[dict]:
# 先查 Redis
# Redis 层没在 query.py 里,而是在 Node Manager 的 cache 里
# 这里是兜底:Redis 没命中时查 TDengine
uuid_safe = str(node_uuid).replace('-', '_')
subtable = f"{TD_SUBTABLE_NODE_USAGE}_{uuid_safe}"
sql = f"SELECT LAST_ROW(*) FROM {subtable}"
try:
rows = await tdengine_client.aquery(sql)
if rows:
row = rows[0]
row['timestamp'] = _ms_to_datetime(row.get('ts', 0))
return row
except Exception:
pass
return None

实际使用中,仪表盘展示当前状态时直接走 Redis 缓存(毫秒级),展示历史趋势图时走 TDengine 查询(秒级返回 7 天数据)。两种数据库各司其职,不互相抢活。

时间范围查询

历史数据查询都接受 start_timeend_time 参数,内部转成毫秒时间戳:

1
2
3
4
5
6
7
8
def _parse_to_ms(time_str: str) -> int:
for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d'):
try:
dt = datetime.strptime(time_str, fmt)
return int(dt.timestamp() * 1000)
except ValueError:
continue
return int(datetime.now().timestamp() * 1000)

支持四种时间格式,都不行就用当前时间。SQL 里用 ts >= start_ms AND ts <= end_ms 做范围过滤,走的是 TDengine 的时间分区索引,查询基本是 O(分区数) 而不是 O(总行数)。

数据生命周期:180 天自动过期

TDengine 的超级表在创建时可以指定 KEEP 参数:

1
CREATE STABLE IF NOT EXISTS st_node_usage (...) TAGS (node_uuid NCHAR(37)) KEEP 180d;

这意味着写入的数据 180 天后自动删除,不需要写 cron job 去清理。对比 PostgreSQL 方案要手动 DELETE FROM node_usage WHERE ts < now() - interval '180 days',还得担心删除大表锁表的问题。

查询聚合:一次返回全维度数据

query_performance_record() 把整机使用率、每核 CPU、网络流量、加速器数据按时间戳对齐,合并成一条查询结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@staticmethod
async def query_performance_record(node_uuid, start_time, end_time,
device="_all", processor_count=0, memory_total=0):
usage_rows = await TDengineQuery.query_node_usage(node_uuid, start_time, end_time)
if not usage_rows:
return []

# 按设备类型筛选
cpu_core_map = {} # {ts: [{core, usage}, ...]}
net_map = {} # {ts: [{name, bytes_sent, bytes_recv}, ...]}
accelerator_map = {}

if device in ("_all", "cpu"):
cpu_core_rows = await TDengineQuery.query_cpu_core_usage(...)
for r in cpu_core_rows:
cpu_core_map.setdefault(r.get('ts', 0), []).append({...})

# ... network, accelerator 同理

result = []
for row in usage_rows:
ts = row.get('ts', 0)
item = {"timestamp": row['timestamp']}
if device == "_all" or device == "cpu":
item["cpu_usage"] = row.get('cpu_usage', 0)
item["cpu_cores_usage"] = cpu_core_map.get(ts, [])
if device == "_all" or device == "network":
item["network_usage"] = net_map.get(ts, [])
# ...
result.append(item)
return result

device 参数控制查询维度——"cpu" 只查 CPU 相关,"network" 只查网络,"_all" 全查。前端仪表盘可以按需请求不同维度的数据,不需要一次全拉。


三种数据库不是炫技,是各管各最能管的:PostgreSQL 管用户、权限、节点信息这种关系型数据;Redis 管在线状态、实时监控这些 TTL 短、读写频率高的数据;TDengine 管按时间排列的历史监控数据,自动过期、自动聚合。这种分层让每种数据库都在自己最擅长的场景下工作,而不是逼着 PostgreSQL 做它不擅长的事。