WebSocket 消息防篡改:给控制指令加 HMAC 签名

前面写过一篇关于 WebSocket 节点管理的文章,聊了连接保活、重连、认证这些事。但连接建立只是开始——消息在 WebSocket 连接上明文飞来飞去,中间人要是能插一脚,就能伪造指令让节点执行任意 shell 命令、删文件、拉镜像……后果不寒而栗。

TLS 能防窃听,但防不了应用层的伪造——万一节点 token 泄露了呢?万一有人拿到了 WebSocket 连接的权限呢?所以 ServerManager 在应用层又加了一道签名。不是所有消息都签,只签关键操作——那些真正有破坏力的。

三个级别,不该签的不浪费

先看动作注册表。每个 WebSocket 动作都有一个安全级别:

1
2
3
4
class SecurityLevel(Enum):
UNSIGNED = "unsigned" # 不需要签名
CHANNEL_AUTH = "channel_auth" # 连接认证过了就行
SIGNED = "signed" # 必须 HMAC 签名

心跳这种消息——node:heartbeat——级别是 CHANNEL_AUTH,只要 WebSocket 连接本身认证过了就够,不签名。但下面这些动作全是 SIGNED

  • execute:run_shell——跑 shell 命令
  • terminal:create_session——开终端
  • file_manager:write——写文件
  • docker:container:exec——容器里执行命令
  • node:close——关掉节点

光是看动作名就觉得脊背发凉的操作,全都需要签名验证。顺便说一下,这套注册表注册了 77+ 个动作,光 Docker 相关的就三十多个——镜像拉取删除、容器创建销毁重启、网络增删、卷管理、Compose 全家桶。每一个都被标注了方向(Panel→Node 还是 Node→Panel 还是双向)和安全级别。

1
2
3
4
5
6
7
8
# 注册表的设计——frozen dataclass,不可变
@dataclass(frozen=True)
class ActionSpec:
name: str
direction: Direction
security: SecurityLevel
payload_schema: Optional[Type] = None
description: str = ""

frozen=True 是有意为之的——注册表一旦建好就不应该被谁偷偷改掉。这跟整条链路的设计哲学一致:权限声明是静态的、可审计的、不可运行时篡改的。

签名的计算:把消息揉成一根绳

签名算法其实简单,关键是覆盖面要对:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def generate_signature(token: str, action: str, payload: dict) -> dict:
timestamp = str(time.time())
nonce = secrets.token_hex(16)
payload_hash = hashlib.sha256(
json.dumps(payload, sort_keys=True, separators=(',', ':')).encode('utf-8')
).hexdigest()
message = f"{action}:{timestamp}:{nonce}:{payload_hash}"
signature = hmac.new(
token.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
return {
"timestamp": timestamp,
"nonce": nonce,
"signature": signature,
}

几点设计考量:

载荷单独 hash。签名的消息不直接包含 payload JSON,而是包含 payload 的 SHA-256 摘要。这样签名跟载荷可以分开传输——WebSocket 消息里 payload 放 data 字段,签名放 _sign 字段,结构清晰。

sort_keys=True + separators=(',', ':')。JSON 序列化在不同的 Python 版本、不同的解释器里,key 的顺序可能不一样。加 sort_keys 确保无论谁算出来的 JSON 字符串都一样。separators=(',', ':') 去掉空格,避免空白字符的差异。

nonce 是 32 字节的随机十六进制。每次签名都不同,哪怕 payload 一模一样。

验证:时间漂移 + 重放防御

收到消息后,验证这步比生成要关键得多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def verify_signature(token, action, payload, sign_data, nonce_scope=None):
timestamp = sign_data.get("timestamp")
nonce = sign_data.get("nonce")
signature = sign_data.get("signature")

# 1. 时间窗校验——5分钟容差
req_time = float(timestamp)
if abs(time.time() - req_time) > ALLOWED_TIME_DRIFT: # 300秒
return False

# 2. 重算签名并比对
payload_hash = hashlib.sha256(
json.dumps(payload, sort_keys=True, separators=(',', ':')).encode('utf-8')
).hexdigest()
message = f"{action}:{timestamp}:{nonce}:{payload_hash}"
expected = hmac.new(
token.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(expected, signature):
return False

# 3. Nonce 唯一性——同一个 scope 内 nonce 只能用一次
if nonce_scope is not None and not _claim_nonce(nonce_scope, action, nonce):
return False

return True

三道关,一道比一道狠:

时间漂移——节点和服务端的时钟不可能完全同步,5 分钟的容差覆盖了 NTP 偶尔漂移的场景。超过 5 分钟的消息直接拒绝。

签名比对——hmac.compare_digest() 而不是 ==,前者是恒定时间比较,防时序攻击。用 == 的话,攻击者可以通过比较响应时间逐字符爆破签名。

Nonce 防重放——_claim_nonce() 用 Redis 的 cache.add() 实现原子性 nonce 消耗。add 只在 key 不存在时成功,刚好可以判断 nonce 是否已经被用过。nonce 的 key 包含 action 的哈希和 scope(一般是 Consumer类名:节点UUID),所以不同节点的同动作 nonce 不会冲突,但同一节点同一动作的同一个 nonce 只能用一次:

1
2
3
4
5
def _claim_nonce(nonce_scope, action, nonce):
scope_hash = hashlib.sha256(str(nonce_scope or "default").encode("utf-8")).hexdigest()[:16]
action_hash = hashlib.sha256(str(action or "").encode("utf-8")).hexdigest()[:16]
key = f"{NONCE_CACHE_PREFIX}:{scope_hash}:{action_hash}:{nonce}"
return bool(cache.add(key, "1", timeout=ALLOWED_TIME_DRIFT + 30))

TTL 设成 330 秒(比 300 秒时间窗多 30 秒),确保过期 nonce 的 key 自动清理,Redis 不会堆积无用数据。

在 WebSocket 消费者里自动签名/验签

WebSocket Consumer 的基类 AsyncBaseConsumer 把签名和验证做成了自动行为——正常业务代码完全不用管:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class AsyncBaseConsumer(AsyncWebsocketConsumer, ABC):
verify_incoming_signatures = True # 入站方向验签
sign_outgoing_actions = True # 出站方向签名

async def receive(self, text_data=None, bytes_data=None):
data = json.loads(text_data)
action = data.get('action')
payload = data.get('data', None)

# 如果这个动作需要签名,但消息里没有 _sign 字段——直接拒绝
if self.verify_incoming_signatures and needs_signature(action):
sign_data = data.get('_sign')
if not sign_data:
return await self.send(json.dumps({
"action": "error:signature_required",
"data": {"action": action}
}))
node_token = self.__get_node_token()
if not verify_signature(node_token, action, payload or {}, sign_data,
nonce_scope=self.__signature_nonce_scope()):
return await self.close(3001) # 签名验证失败——断开连接

# 分发到对应的 action handler
if action in self.actions:
await self.actions[action](self, payload)

async def send_action(self, action, payload=None):
data = {"action": action, "data": deepcopy(payload)}
if self.sign_outgoing_actions and needs_signature(action):
node_token = self.__get_node_token()
if node_token:
data['_sign'] = generate_signature(node_token, action, payload or {})
await self.send(json.dumps(data, cls=ComplexEncoder))

两处亮点:

签名验证失败断开连接(code 3001),不是返回错误消息然后继续。这很重要——既然签名对不上,说明对方要么不是合法节点,要么通信已经出了问题,再聊下去没有意义。

缺少签名时返回 error:signature_required 而不是断连,这是一个阶梯式的安全策略:可能对方还没实现签名功能(比如旧版节点),给个提示比直接踢掉更友好。但合法的伪造签名?那就是攻击了,断连没商量。

日志也要脱敏

顺带一提,WebSocket 消息进日志的时候,密码、token、密钥这些字段会被脱敏:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SENSITIVE_LOG_KEYS = {
'password', 'passwd', 'pwd', 'token', 'node_token',
'client_token', 'server_token', 'access', 'refresh',
'authorization', 'signature', '_sign', 'secret', 'secret_key',
}

def sanitize_ws_payload(value, depth=0):
if isinstance(value, dict):
result = {}
for key, item in value.items():
if str(key).lower() in SENSITIVE_LOG_KEYS:
result[key] = '***'
else:
result[key] = sanitize_ws_payload(item, depth + 1)
return result
# ...

签名值本身(signature_sign)也被脱敏了。日志里留下的只是一堆星号,攻击者就算拿到日志也反推不出 node_token。

小结

这套 HMAC 签名机制不是在造轮子——它是在 WebSocket 这种无状态长连接上,用最小的开销实现了:

  1. 消息完整性——payload 被篡改会导致签名校验失败
  2. 消息来源认证——只有持有 node_token 的一方才能生成合法签名
  3. 防重放——同一个 nonce 只能用一次,截获消息重发无效
  4. 时间窗保护——过期消息自动作废
  5. 阶梯式响应——缺少签名给提示,伪造签名直接断连

配合 TLS,就形成了两层保护:TLS 保证传输加密,HMAC 保证即使连接被劫持,关键指令也无法伪造或重放。

欢迎关注我的其它发布渠道