前面写过一篇关于 WebSocket 节点管理的文章,聊了连接保活、重连、认证这些事。但连接建立只是开始——消息在 WebSocket 连接上明文飞来飞去,中间人要是能插一脚,就能伪造指令让节点执行任意 shell 命令、删文件、拉镜像……后果不寒而栗。
TLS 能防窃听,但防不了应用层的伪造——万一节点 token 泄露了呢?万一有人拿到了 WebSocket 连接的权限呢?所以 ServerManager 在应用层又加了一道签名。不是所有消息都签,只签关键操作——那些真正有破坏力的。
三个级别,不该签的不浪费
先看动作注册表。每个 WebSocket 动作都有一个安全级别:
1 2 3 4
| class SecurityLevel(Enum): UNSIGNED = "unsigned" CHANNEL_AUTH = "channel_auth" SIGNED = "signed"
|
心跳这种消息——node:heartbeat——级别是 CHANNEL_AUTH,只要 WebSocket 连接本身认证过了就够,不签名。但下面这些动作全是 SIGNED:
execute:run_shell——跑 shell 命令terminal:create_session——开终端file_manager:write——写文件docker:container:exec——容器里执行命令node:close——关掉节点
光是看动作名就觉得脊背发凉的操作,全都需要签名验证。顺便说一下,这套注册表注册了 77+ 个动作,光 Docker 相关的就三十多个——镜像拉取删除、容器创建销毁重启、网络增删、卷管理、Compose 全家桶。每一个都被标注了方向(Panel→Node 还是 Node→Panel 还是双向)和安全级别。
1 2 3 4 5 6 7 8
| @dataclass(frozen=True) class ActionSpec: name: str direction: Direction security: SecurityLevel payload_schema: Optional[Type] = None description: str = ""
|
frozen=True 是有意为之的——注册表一旦建好就不应该被谁偷偷改掉。这跟整条链路的设计哲学一致:权限声明是静态的、可审计的、不可运行时篡改的。
签名的计算:把消息揉成一根绳
签名算法其实简单,关键是覆盖面要对:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| def generate_signature(token: str, action: str, payload: dict) -> dict: timestamp = str(time.time()) nonce = secrets.token_hex(16) payload_hash = hashlib.sha256( json.dumps(payload, sort_keys=True, separators=(',', ':')).encode('utf-8') ).hexdigest() message = f"{action}:{timestamp}:{nonce}:{payload_hash}" signature = hmac.new( token.encode('utf-8'), message.encode('utf-8'), hashlib.sha256 ).hexdigest() return { "timestamp": timestamp, "nonce": nonce, "signature": signature, }
|
几点设计考量:
载荷单独 hash。签名的消息不直接包含 payload JSON,而是包含 payload 的 SHA-256 摘要。这样签名跟载荷可以分开传输——WebSocket 消息里 payload 放 data 字段,签名放 _sign 字段,结构清晰。
sort_keys=True + separators=(',', ':')。JSON 序列化在不同的 Python 版本、不同的解释器里,key 的顺序可能不一样。加 sort_keys 确保无论谁算出来的 JSON 字符串都一样。separators=(',', ':') 去掉空格,避免空白字符的差异。
nonce 是 32 字节的随机十六进制。每次签名都不同,哪怕 payload 一模一样。
验证:时间漂移 + 重放防御
收到消息后,验证这步比生成要关键得多:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| def verify_signature(token, action, payload, sign_data, nonce_scope=None): timestamp = sign_data.get("timestamp") nonce = sign_data.get("nonce") signature = sign_data.get("signature") req_time = float(timestamp) if abs(time.time() - req_time) > ALLOWED_TIME_DRIFT: return False payload_hash = hashlib.sha256( json.dumps(payload, sort_keys=True, separators=(',', ':')).encode('utf-8') ).hexdigest() message = f"{action}:{timestamp}:{nonce}:{payload_hash}" expected = hmac.new( token.encode('utf-8'), message.encode('utf-8'), hashlib.sha256 ).hexdigest() if not hmac.compare_digest(expected, signature): return False if nonce_scope is not None and not _claim_nonce(nonce_scope, action, nonce): return False return True
|
三道关,一道比一道狠:
时间漂移——节点和服务端的时钟不可能完全同步,5 分钟的容差覆盖了 NTP 偶尔漂移的场景。超过 5 分钟的消息直接拒绝。
签名比对——hmac.compare_digest() 而不是 ==,前者是恒定时间比较,防时序攻击。用 == 的话,攻击者可以通过比较响应时间逐字符爆破签名。
Nonce 防重放——_claim_nonce() 用 Redis 的 cache.add() 实现原子性 nonce 消耗。add 只在 key 不存在时成功,刚好可以判断 nonce 是否已经被用过。nonce 的 key 包含 action 的哈希和 scope(一般是 Consumer类名:节点UUID),所以不同节点的同动作 nonce 不会冲突,但同一节点同一动作的同一个 nonce 只能用一次:
1 2 3 4 5
| def _claim_nonce(nonce_scope, action, nonce): scope_hash = hashlib.sha256(str(nonce_scope or "default").encode("utf-8")).hexdigest()[:16] action_hash = hashlib.sha256(str(action or "").encode("utf-8")).hexdigest()[:16] key = f"{NONCE_CACHE_PREFIX}:{scope_hash}:{action_hash}:{nonce}" return bool(cache.add(key, "1", timeout=ALLOWED_TIME_DRIFT + 30))
|
TTL 设成 330 秒(比 300 秒时间窗多 30 秒),确保过期 nonce 的 key 自动清理,Redis 不会堆积无用数据。
在 WebSocket 消费者里自动签名/验签
WebSocket Consumer 的基类 AsyncBaseConsumer 把签名和验证做成了自动行为——正常业务代码完全不用管:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| class AsyncBaseConsumer(AsyncWebsocketConsumer, ABC): verify_incoming_signatures = True sign_outgoing_actions = True
async def receive(self, text_data=None, bytes_data=None): data = json.loads(text_data) action = data.get('action') payload = data.get('data', None)
if self.verify_incoming_signatures and needs_signature(action): sign_data = data.get('_sign') if not sign_data: return await self.send(json.dumps({ "action": "error:signature_required", "data": {"action": action} })) node_token = self.__get_node_token() if not verify_signature(node_token, action, payload or {}, sign_data, nonce_scope=self.__signature_nonce_scope()): return await self.close(3001)
if action in self.actions: await self.actions[action](self, payload)
async def send_action(self, action, payload=None): data = {"action": action, "data": deepcopy(payload)} if self.sign_outgoing_actions and needs_signature(action): node_token = self.__get_node_token() if node_token: data['_sign'] = generate_signature(node_token, action, payload or {}) await self.send(json.dumps(data, cls=ComplexEncoder))
|
两处亮点:
签名验证失败断开连接(code 3001),不是返回错误消息然后继续。这很重要——既然签名对不上,说明对方要么不是合法节点,要么通信已经出了问题,再聊下去没有意义。
缺少签名时返回 error:signature_required 而不是断连,这是一个阶梯式的安全策略:可能对方还没实现签名功能(比如旧版节点),给个提示比直接踢掉更友好。但合法的伪造签名?那就是攻击了,断连没商量。
日志也要脱敏
顺带一提,WebSocket 消息进日志的时候,密码、token、密钥这些字段会被脱敏:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| SENSITIVE_LOG_KEYS = { 'password', 'passwd', 'pwd', 'token', 'node_token', 'client_token', 'server_token', 'access', 'refresh', 'authorization', 'signature', '_sign', 'secret', 'secret_key', }
def sanitize_ws_payload(value, depth=0): if isinstance(value, dict): result = {} for key, item in value.items(): if str(key).lower() in SENSITIVE_LOG_KEYS: result[key] = '***' else: result[key] = sanitize_ws_payload(item, depth + 1) return result
|
签名值本身(signature、_sign)也被脱敏了。日志里留下的只是一堆星号,攻击者就算拿到日志也反推不出 node_token。
小结
这套 HMAC 签名机制不是在造轮子——它是在 WebSocket 这种无状态长连接上,用最小的开销实现了:
- 消息完整性——payload 被篡改会导致签名校验失败
- 消息来源认证——只有持有 node_token 的一方才能生成合法签名
- 防重放——同一个 nonce 只能用一次,截获消息重发无效
- 时间窗保护——过期消息自动作废
- 阶梯式响应——缺少签名给提示,伪造签名直接断连
配合 TLS,就形成了两层保护:TLS 保证传输加密,HMAC 保证即使连接被劫持,关键指令也无法伪造或重放。