ServerManager 的一个常见操作是:Panel 决定往某个节点下发一个文件——可能是安装包、配置文件、或者插件资源包。文件存在 MinIO 对象存储里,节点需要通过某种方式拿到它。
听起来简单——发个 URL 让节点下载不就完了?但把这事拆开看,从”Panel 发一个下载指令”到”节点硬盘上多了一个完整的文件”,中间的安全缝合比你想象的要多。
第一步:授权——不能谁都能从 MinIO 拿东西
MinIO 本身有自己的认证体系,但直接把 MinIO 的 access key 给节点?那节点就能拿任何存储桶里的任何文件了。权限太大了。
所以 Panel 先要做一个授权接口。节点带着自己的 JWT(就是 HMAC 签名那篇里拿到的 access token)去问 Panel:“我有个下载任务,给我一个能用的 URL”。Panel 验证 JWT 合法性、检查下载权限、然后向 MinIO 请求一个预签名 URL——这个 URL 绑定了特定的文件、过了有效期就废,而且只能 GET。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| async def __get_download_url(self, download_task: DownloadTaskConfig) -> tuple[str | None, str | None]: headers = {"Authorization": f"Bearer {self.__access_token}"} params = {"task": download_task.task_id, "file": download_task.file_id} async with self.__session.get( self.__url, params=params, headers=headers, timeout=aiohttp.ClientTimeout(total=15) ) as response: data = await response.json() if response.status != 200 or data.get("status") != 1: raise RuntimeError(f"下载授权失败({response.status}): {data.get('msg')}") payload = data.get("data") or {} return payload.get("url"), payload.get("file_name")
|
这段代码说明几件事:
JWT 只用来请求授权,不直接访问 MinIO。JWT 的权限是”以某节点的身份向 Panel 请求下载 URL”,而不是”直接读取对象存储”。
预签名 URL 是一次性的桥梁。MinIO 签发 URL 时绑定了 bucket、object key、过期时间。节点拿到后直接向 MinIO 发 GET——这个过程中 Panel 不参与,不转发,不代理,完全没有性能瓶颈。
file_name 从授权接口拿回。MinIO 存储的文件名可能是一个 SHA-256 哈希值,真实文件名由 Panel 维护。节点从授权接口同时拿到 URL 和真实文件名,两份信息同源,不存在不一致。
第二步:参数校验——恶意输入在门口就拦住
下载请求进入节点代码的第一件事不是去拿 URL,而是校验参数:
1 2 3 4 5 6 7 8 9
| class DownloadTaskConfig: def __init__(self, task_id: str, file_id: str, file_name: str | None, save_path: str, check_hash: bool, file_hash: str): self.task_id = validate_uuid(task_id, "task_id") self.file_id = validate_sha256(file_id, "file_id") self.file_name = file_name self.save_path = save_path self.check_hash = check_hash self.file_hash = validate_sha256(file_hash or file_id, "file_hash")
|
validate_uuid 只允许 [0-9a-fA-F-]{1,64} 的格式,validate_sha256 必须是 64 位十六进制。这意味着如果你往 task_id 里塞 ../../etc/passwd 这种路径穿越字符,在校验层直接就弹回去了。
然后是保存路径的解析:
1 2 3 4 5 6 7
| def resolve_download_path(self, requested_path: str | None) -> str: base = safe_data_dir("download", create=True) if self.safe_enabled("allow_custom_download_path", False) and requested_path: return str(Path(requested_path).expanduser().resolve()) if not requested_path or os.path.isabs(str(requested_path)): return str(base) return str(safe_join(base, requested_path, create_dir=True))
|
三种情况:
- 默认行为——所有文件都下到
data/download/ 下 - 开启
allow_custom_download_path(默认关闭)——允许 Panel 指定绝对路径。这东西明显有风险,所以安全开关默认关着 - Panel 给了相对路径——用
safe_join 拼接,防路径穿越
safe_join 的实现在 security.py 里,它对路径逃逸做的是零容忍:
1 2 3 4 5 6 7 8
| def safe_join(base: str | os.PathLike, *parts: str | os.PathLike, create_dir: bool = False) -> Path: base_path = Path(base).resolve() target = base_path.joinpath(*[str(part) for part in parts]).resolve() if target != base_path and base_path not in target.parents: raise SecurityError(f"路径越界: {target}") if create_dir: target.mkdir(parents=True, exist_ok=True) return target
|
Path.resolve() 会展开所有符号链接和 ..,然后检查目标路径是否在 base 里面。../../etc/passwd 这种老把戏在这里完全不管用——resolve() 之后它是 /etc/passwd,不在 data/download/ 的父路径里,直接 SecurityError。
第三步:下载——边下边哈希
到了真正下载的环节,代码用了流式读取而不是一次性加载:
1 2 3 4 5 6 7 8 9 10 11 12 13
| async with self.__session.get(download_url, timeout=aiohttp.ClientTimeout(total=300)) as response: if response.status != 200: filename = sanitize_filename(download_task.file_name or authorized_file_name, download_task.file_id) target_path = safe_join(save_dir, filename) sha256 = hashlib.sha256() with open(target_path, 'wb') as file: while True: chunk = await response.content.read(8192) sha256.update(chunk) if not chunk: break file.write(chunk)
|
这段代码暗藏了几个不明显的决策:
边下边算哈希。每次 read(8192) 之后立即 sha256.update(chunk),不用等全下完再算。这意味着内存里永远只有 8KB 的数据量——就算文件有几个 G,内存也扛得住。
文件名再次被 sanitize_filename 过滤:
1 2 3 4 5 6 7
| def sanitize_filename(filename: Any, fallback: str) -> str: fallback = os.path.basename(str(fallback or "download")) filename = os.path.basename(str(filename or fallback)) filename = filename.replace("\x00", "").strip() if not filename or filename in {".", ".."}: filename = fallback or "download" return filename[:255]
|
os.path.basename() 剥离所有路径前缀,replace("\x00", "") 去掉空字节攻击,. 和 .. 被替换成 fallback,最终截断到 255 字符。就算有人把文件名设成 ../../../etc/crontab,到这里也变成了 crontab。
safe_join 是最后一道保险。万一 sanitize_filename 漏了什么(虽然不应该),目标路径仍然会被检查是否在 save_dir 下。
第四步:哈希校验——下载完的文件对不对?
下载完了不算完,还得确保文件完整:
1 2 3 4 5 6 7 8
| sha256 = sha256.hexdigest() if download_task.check_hash and str(download_task.file_hash) != str(sha256): return self.__send_websocket_action('file_download:failure', { 'task': download_task.task_id, 'file': download_task.file_id, 'error_type': "文件哈希校验失败", 'error_content': f"{download_task.file_hash} != {sha256}" })
|
哈希校验是可选的(check_hash 参数控制),但一旦开启就是硬校验——哈希对不上就算下载失败。这里用的是 SHA-256,跟 file_id 本身的格式一致,因为 Panel 那边存文件时就用 SHA-256 做了去重标识。
第五步:并发控制和队列——别把节点搞挂了
多文件同时下的时候,不能任所有下载一起跑。代码里有个简单的线程池控制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class DownloadFileUtil: __max_download_thread: int = 4 __download_threads: int = 0 __thread_lock: Lock()
def __handle_download_queue(self): while True: if self.__download_queue.empty(): break if self.__download_threads >= self.__max_download_thread: time.sleep(0.3) continue task = self.__download_queue.get() asyncio.run_coroutine_threadsafe( self.__download(task), self.__loop ) self.__increment_download_threads()
|
最多 4 个并发下载,超了就 0.3 秒轮询等一个空位。锁保护的是计数值 _download_threads,下载完了 _decrement_download_threads。
这里有个有意思的选择——用了 asyncio.run_coroutine_threadsafe 把下载协程扔到事件循环里跑,而不是在下载线程里直接 await。因为下载方法本身是 async 的(aiohttp 的流式读取只有异步版),但队列调度在同步线程里跑的。这是 aiohttp + threading 混合时的常见模式,因为 WebSocket 收到下载指令的时候,处理线程不能被阻塞着等下载完。
第六步:状态汇报——Panel 得知道下完了没有
下载结果通过 WebSocket 回报给 Panel。成功和失败走同一个 file_download:success/failure 动作:
1 2 3 4 5 6 7 8 9 10 11 12
| self.__send_websocket_action('file_download:success', { 'task': download_task.task_id, 'file': download_task.file_id, })
self.__send_websocket_action('file_download:failure', { 'task': download_task.task_id, 'file': download_task.file_id, 'error_type': "文件哈希校验失败", 'error_content': f"{download_task.file_hash} != {sha256}" })
|
WS 消息的发送本身也走了队列——__websocket_message_queue。 Почему?因为下载在后台线程里跑,但 websocket_send_json 是异步的。后台线程不能直接 await,得把消息塞进队列,让专门的线程从队列里取消息,再通过 asyncio.run_coroutine_threadsafe 扔回事件循环发出去。
这是一个很实用的模式——生产者(下载线程)和消费者(WS 发送线程)解耦,中间用一个 Queue 做缓冲。关连接的时候 close() 方法往两个队列各塞一个 None,让消费者线程优雅退出。
整条链路一份清单
从一个下载指令到文件落盘,整个安全缝合点是这样的:
| 环节 | 防护对象 | 手段 |
|---|
| 参数入口 | UUID/SHA-256 格式 | validate_uuid, validate_sha256 |
| 路径拼接 | 路径穿越 | safe_join + resolve() + 父路径检查 |
| 文件名净化 | ../, NUL, . | sanitize_filename + basename |
| 下载授权 | 无权限访问 MinIO | JWT + 预签名 URL |
| 传输过程 | 窃听/篡改 | TLS + 流式读取 |
| 文件完整性 | 损坏/被替换 | SHA-256 流式校验 |
| 自定义路径 | 写到危险位置 | allow_custom_download_path 默认关 |
| 并发保护 | 资源耗尽 | 4 线程上限 + 队列 |
| 状态汇报 | Panel 不知道结果 | WS 成功/失败回调 |
每一层单独看都算不上多高明,但叠在一起就构成了完整的纵深防御。最怕的不是某个环节被突破,而是压根没想到那个环节需要防护。这条链路里每一层都是踩过坑之后加上的——不是过度设计,是血泪换来的经验。