文件从服务器落到节点:一条下载链路的安全缝合

ServerManager 的一个常见操作是:Panel 决定往某个节点下发一个文件——可能是安装包、配置文件、或者插件资源包。文件存在 MinIO 对象存储里,节点需要通过某种方式拿到它。

听起来简单——发个 URL 让节点下载不就完了?但把这事拆开看,从”Panel 发一个下载指令”到”节点硬盘上多了一个完整的文件”,中间的安全缝合比你想象的要多。

第一步:授权——不能谁都能从 MinIO 拿东西

MinIO 本身有自己的认证体系,但直接把 MinIO 的 access key 给节点?那节点就能拿任何存储桶里的任何文件了。权限太大了。

所以 Panel 先要做一个授权接口。节点带着自己的 JWT(就是 HMAC 签名那篇里拿到的 access token)去问 Panel:“我有个下载任务,给我一个能用的 URL”。Panel 验证 JWT 合法性、检查下载权限、然后向 MinIO 请求一个预签名 URL——这个 URL 绑定了特定的文件、过了有效期就废,而且只能 GET。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def __get_download_url(self, download_task: DownloadTaskConfig) -> tuple[str | None, str | None]:
headers = {"Authorization": f"Bearer {self.__access_token}"}
params = {"task": download_task.task_id, "file": download_task.file_id}
async with self.__session.get(
self.__url,
params=params,
headers=headers,
timeout=aiohttp.ClientTimeout(total=15)
) as response:
data = await response.json()
if response.status != 200 or data.get("status") != 1:
raise RuntimeError(f"下载授权失败({response.status}): {data.get('msg')}")
payload = data.get("data") or {}
return payload.get("url"), payload.get("file_name")

这段代码说明几件事:

JWT 只用来请求授权,不直接访问 MinIO。JWT 的权限是”以某节点的身份向 Panel 请求下载 URL”,而不是”直接读取对象存储”。

预签名 URL 是一次性的桥梁。MinIO 签发 URL 时绑定了 bucket、object key、过期时间。节点拿到后直接向 MinIO 发 GET——这个过程中 Panel 不参与,不转发,不代理,完全没有性能瓶颈。

file_name 从授权接口拿回。MinIO 存储的文件名可能是一个 SHA-256 哈希值,真实文件名由 Panel 维护。节点从授权接口同时拿到 URL 和真实文件名,两份信息同源,不存在不一致。

第二步:参数校验——恶意输入在门口就拦住

下载请求进入节点代码的第一件事不是去拿 URL,而是校验参数:

1
2
3
4
5
6
7
8
9
class DownloadTaskConfig:
def __init__(self, task_id: str, file_id: str, file_name: str | None,
save_path: str, check_hash: bool, file_hash: str):
self.task_id = validate_uuid(task_id, "task_id")
self.file_id = validate_sha256(file_id, "file_id")
self.file_name = file_name
self.save_path = save_path
self.check_hash = check_hash
self.file_hash = validate_sha256(file_hash or file_id, "file_hash")

validate_uuid 只允许 [0-9a-fA-F-]{1,64} 的格式,validate_sha256 必须是 64 位十六进制。这意味着如果你往 task_id 里塞 ../../etc/passwd 这种路径穿越字符,在校验层直接就弹回去了。

然后是保存路径的解析:

1
2
3
4
5
6
7
def resolve_download_path(self, requested_path: str | None) -> str:
base = safe_data_dir("download", create=True)
if self.safe_enabled("allow_custom_download_path", False) and requested_path:
return str(Path(requested_path).expanduser().resolve())
if not requested_path or os.path.isabs(str(requested_path)):
return str(base)
return str(safe_join(base, requested_path, create_dir=True))

三种情况:

  1. 默认行为——所有文件都下到 data/download/
  2. 开启 allow_custom_download_path(默认关闭)——允许 Panel 指定绝对路径。这东西明显有风险,所以安全开关默认关着
  3. Panel 给了相对路径——用 safe_join 拼接,防路径穿越

safe_join 的实现在 security.py 里,它对路径逃逸做的是零容忍:

1
2
3
4
5
6
7
8
def safe_join(base: str | os.PathLike, *parts: str | os.PathLike, create_dir: bool = False) -> Path:
base_path = Path(base).resolve()
target = base_path.joinpath(*[str(part) for part in parts]).resolve()
if target != base_path and base_path not in target.parents:
raise SecurityError(f"路径越界: {target}")
if create_dir:
target.mkdir(parents=True, exist_ok=True)
return target

Path.resolve() 会展开所有符号链接和 ..,然后检查目标路径是否在 base 里面。../../etc/passwd 这种老把戏在这里完全不管用——resolve() 之后它是 /etc/passwd,不在 data/download/ 的父路径里,直接 SecurityError

第三步:下载——边下边哈希

到了真正下载的环节,代码用了流式读取而不是一次性加载:

1
2
3
4
5
6
7
8
9
10
11
12
13
async with self.__session.get(download_url, timeout=aiohttp.ClientTimeout(total=300)) as response:
if response.status != 200:
# ... 错误处理
filename = sanitize_filename(download_task.file_name or authorized_file_name, download_task.file_id)
target_path = safe_join(save_dir, filename)
sha256 = hashlib.sha256()
with open(target_path, 'wb') as file:
while True:
chunk = await response.content.read(8192)
sha256.update(chunk)
if not chunk:
break
file.write(chunk)

这段代码暗藏了几个不明显的决策:

边下边算哈希。每次 read(8192) 之后立即 sha256.update(chunk),不用等全下完再算。这意味着内存里永远只有 8KB 的数据量——就算文件有几个 G,内存也扛得住。

文件名再次被 sanitize_filename 过滤

1
2
3
4
5
6
7
def sanitize_filename(filename: Any, fallback: str) -> str:
fallback = os.path.basename(str(fallback or "download"))
filename = os.path.basename(str(filename or fallback))
filename = filename.replace("\x00", "").strip()
if not filename or filename in {".", ".."}:
filename = fallback or "download"
return filename[:255]

os.path.basename() 剥离所有路径前缀,replace("\x00", "") 去掉空字节攻击,... 被替换成 fallback,最终截断到 255 字符。就算有人把文件名设成 ../../../etc/crontab,到这里也变成了 crontab

safe_join 是最后一道保险。万一 sanitize_filename 漏了什么(虽然不应该),目标路径仍然会被检查是否在 save_dir 下。

第四步:哈希校验——下载完的文件对不对?

下载完了不算完,还得确保文件完整:

1
2
3
4
5
6
7
8
sha256 = sha256.hexdigest()
if download_task.check_hash and str(download_task.file_hash) != str(sha256):
return self.__send_websocket_action('file_download:failure', {
'task': download_task.task_id,
'file': download_task.file_id,
'error_type': "文件哈希校验失败",
'error_content': f"{download_task.file_hash} != {sha256}"
})

哈希校验是可选的(check_hash 参数控制),但一旦开启就是硬校验——哈希对不上就算下载失败。这里用的是 SHA-256,跟 file_id 本身的格式一致,因为 Panel 那边存文件时就用 SHA-256 做了去重标识。

第五步:并发控制和队列——别把节点搞挂了

多文件同时下的时候,不能任所有下载一起跑。代码里有个简单的线程池控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class DownloadFileUtil:
__max_download_thread: int = 4
__download_threads: int = 0
__thread_lock: Lock()

def __handle_download_queue(self):
while True:
if self.__download_queue.empty():
break
if self.__download_threads >= self.__max_download_thread:
time.sleep(0.3)
continue
task = self.__download_queue.get()
asyncio.run_coroutine_threadsafe(
self.__download(task), self.__loop
)
self.__increment_download_threads()

最多 4 个并发下载,超了就 0.3 秒轮询等一个空位。锁保护的是计数值 _download_threads,下载完了 _decrement_download_threads

这里有个有意思的选择——用了 asyncio.run_coroutine_threadsafe 把下载协程扔到事件循环里跑,而不是在下载线程里直接 await。因为下载方法本身是 async 的(aiohttp 的流式读取只有异步版),但队列调度在同步线程里跑的。这是 aiohttp + threading 混合时的常见模式,因为 WebSocket 收到下载指令的时候,处理线程不能被阻塞着等下载完。

第六步:状态汇报——Panel 得知道下完了没有

下载结果通过 WebSocket 回报给 Panel。成功和失败走同一个 file_download:success/failure 动作:

1
2
3
4
5
6
7
8
9
10
11
12
# 成功
self.__send_websocket_action('file_download:success', {
'task': download_task.task_id,
'file': download_task.file_id,
})
# 失败
self.__send_websocket_action('file_download:failure', {
'task': download_task.task_id,
'file': download_task.file_id,
'error_type': "文件哈希校验失败",
'error_content': f"{download_task.file_hash} != {sha256}"
})

WS 消息的发送本身也走了队列——__websocket_message_queue。 Почему?因为下载在后台线程里跑,但 websocket_send_json 是异步的。后台线程不能直接 await,得把消息塞进队列,让专门的线程从队列里取消息,再通过 asyncio.run_coroutine_threadsafe 扔回事件循环发出去。

这是一个很实用的模式——生产者(下载线程)和消费者(WS 发送线程)解耦,中间用一个 Queue 做缓冲。关连接的时候 close() 方法往两个队列各塞一个 None,让消费者线程优雅退出。

整条链路一份清单

从一个下载指令到文件落盘,整个安全缝合点是这样的:

环节防护对象手段
参数入口UUID/SHA-256 格式validate_uuid, validate_sha256
路径拼接路径穿越safe_join + resolve() + 父路径检查
文件名净化../, NUL, .sanitize_filename + basename
下载授权无权限访问 MinIOJWT + 预签名 URL
传输过程窃听/篡改TLS + 流式读取
文件完整性损坏/被替换SHA-256 流式校验
自定义路径写到危险位置allow_custom_download_path 默认关
并发保护资源耗尽4 线程上限 + 队列
状态汇报Panel 不知道结果WS 成功/失败回调

每一层单独看都算不上多高明,但叠在一起就构成了完整的纵深防御。最怕的不是某个环节被突破,而是压根没想到那个环节需要防护。这条链路里每一层都是踩过坑之后加上的——不是过度设计,是血泪换来的经验。

欢迎关注我的其它发布渠道