WebSocket 节点管理里那些没法绕开的工程问题

ServerManager 这个项目要管一堆远端节点——可能是云服务器、可能是边缘的工控机、也可能是机器人小车。中控服务端要给它们下指令、收它们的状态。HTTP 轮询太重,所以一开始就选了 WebSocket。

WebSocket 一行代码就连上了,难的是上线之后的那些事——网络抖一下连接断了、token 过期了得重新认证、消息怎么防篡改、命令怎么不让节点端乱执行。下面把这几块各自的思路捋一下。

永远在重连

节点客户端的主体是一个 while True:连接、跑消息循环、断开、等几秒、再连。所有异常都不能让它跳出最外层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import aiohttp
import asyncio
import json

class NodeWebSocketClient:
def __init__(self):
self._session: aiohttp.ClientSession = None
self._ws: aiohttp.ClientWebSocketResponse = None

async def run_forever(self) -> None:
while True:
try:
self._session, auth_status, access_token = await self._authenticate()
if not auth_status:
await asyncio.sleep(5)
continue

async with self._session.ws_connect(
self._ws_url(access_token),
autoping=True,
) as ws:
logger.success("WebSocket 已连接")
self._ws = ws
await self._message_handler()

except aiohttp.ClientError as err:
logger.error(f"WebSocket 连接失败: {err}")
finally:
self._ws = None
await asyncio.sleep(5)

autoping=True 让 aiohttp 自动发 ping,对付 NAT 超时和负载均衡器主动断流。

固定 5 秒重连间隔,在节点规模一大就要换成带抖动的退避——想象一下,一万台机器同一秒全断了再同一秒全重连,服务端那边瞬间就被打挂了。

Bearer Token + 消息签名

光有 Token 是不够的。Token 只能证明”这个客户端有权连”,但连接建立之后,每一条消息是不是真的来自这个客户端、有没有被中间人改过——那是另一个问题。

所以关键操作的消息额外做一层 HMAC 签名。

HTTP 拿 token:

1
2
3
4
5
6
7
async def _authenticate(self):
async with aiohttp.ClientSession() as session:
async with session.post(self._auth_url, json=self._auth_data) as resp:
data = await resp.json()
if resp.status == 200:
return session, True, data["access_token"]
return session, False, None

消息签名用 HMAC-SHA256,key 是节点专属 token(和 access_token 不是一个东西,是节点注册时发的长期密钥):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import hashlib
import hmac

def generate_signature(node_token: str, action: str, payload: dict) -> str:
message = f"{action}:{json.dumps(payload, sort_keys=True)}"
signature = hmac.new(
node_token.encode(),
message.encode(),
hashlib.sha256,
).hexdigest()
return signature

async def websocket_send_json(self, data: dict) -> None:
action = data.get("action", "")
payload = data.get("data", {})

if self._needs_signature(action):
data["_sign"] = generate_signature(self.node_token(), action, payload)

await self._ws.send_str(json.dumps(data))

服务端验证:

1
2
3
def verify_signature(node_token: str, action: str, payload: dict, signature: str) -> bool:
expected = generate_signature(node_token, action, payload)
return hmac.compare_digest(expected, signature)

两个细节要划重点:

hmac.compare_digest 千万别图省事换成 ==——后者有时序攻击风险。这种事自己写很容易写错,能用标准库的就用,别炫技。

sort_keys=True 也是必须的——客户端和服务端要是用不同语言,dict 序列化的字段顺序可能不一致,签名当场对不上。

Action 分发

服务端下发的消息固定有个 action 字段。客户端按 action 分发到对应处理器,写法就是个注册表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class NodeActionDispatcher:
def __init__(self, client: NodeWebSocketClient):
self._client = client
self._handlers: Dict[str, Callable] = {
"execute_command": self._handle_execute_command,
"download_file": self._handle_download_file,
"update_config": self._handle_update_config,
"restart": self._handle_restart,
}

async def dispatch(self, action: str, payload: dict, raw_data: dict) -> bool:
handler = self._handlers.get(action)
if handler is None:
logger.warning(f"未知的 action: {action}")
return True

try:
await handler(payload)
except Exception as e:
logger.error(f"处理 action {action} 失败: {e}")
await self._client.websocket_send_json({
"action": f"{action}_response",
"data": {"success": False, "error": str(e)},
})

return True

async def _handle_execute_command(self, payload: dict):
command = payload.get("command")
timeout = payload.get("timeout", 30)

if not self._check_command_policy(command):
raise ValueError(f"命令被安全策略禁止: {command}")

result = await self._execute_command(command, timeout)

await self._client.websocket_send_json({
"action": "execute_command_response",
"data": {"success": True, "output": result},
})

注意未知 action 用的是 warning,不是 error,并且返回 True 继续处理后续消息——服务端可能下发了节点不认识的新 action(比如客户端版本太老),但这绝不应该让节点断连。

消息循环用了 Python 3.10 的 match 来分发消息类型,比一堆 if elif 清爽:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
async def _message_handler(self) -> None:
while self._ws and not self._ws.closed:
msg = await self._ws.receive()

match msg.type:
case aiohttp.WSMsgType.TEXT:
try:
data = json.loads(msg.data)
except json.JSONDecodeError:
logger.warning("收到非法 JSON")
continue

action = data.get("action")
payload = data.get("data")

if not isinstance(action, str) or not action:
logger.warning("收到缺少 action 的消息")
continue

should_continue = await self._dispatcher.dispatch(action, payload, data)
if not should_continue:
return

case aiohttp.WSMsgType.CLOSE:
logger.info("连接已断开")
return

命令策略

execute_command 这种 action 是 RCE 风险最大的地方——服务端被打穿,就能直接控制节点。所以节点这边自己也得有一道把关。

最低限度是一个黑名单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class CommandPolicy:
def __init__(self, config: dict):
self._disable_list = config.get("disable_command_list", "")
self._strict = config.get("strict", True)

def check(self, command: str) -> bool:
disabled = [c.strip() for c in self._disable_list.split(",") if c.strip()]

for pattern in disabled:
if pattern == "*":
return False
if pattern.startswith("*") and command.endswith(pattern[1:]):
return False
if pattern.endswith("*") and command.startswith(pattern[:-1]):
return False
if command == pattern:
return False

return True

说句实在话:黑名单的安全性永远低于白名单——总能找到等价命令绕过 (rm -rf 禁了还有 find ... -delete 嘛)。

但白名单在通用节点管理这种场景几乎没法定义——你哪知道用户会想跑啥命令?所以这里的策略只能定位为”防呆”,不是真正的安全边界。

真正的安全边界应该在节点之外——以受限用户运行节点、用 cgroup/容器隔离。

运行时服务

节点上有一堆模块——终端服务、文件服务、监控服务——都依赖同一个会话和连接。包成一个 NodeRuntimeServices 集中管理生命周期,省心:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class NodeRuntimeServices:
def __init__(self, client, session, connection, access_token, config_getter):
self.client = client
self.session = session
self.connection = connection
self.access_token = access_token
self.config_getter = config_getter

@classmethod
def create(cls, client, session, connection, access_token, config_getter, command_policy):
services = cls(client, session, connection, access_token, config_getter)
services.terminal_service = TerminalService(session, connection)
services.file_service = FileService(session, connection)
services.monitor_service = MonitorService(session, connection)
return services

async def close(self):
await self.terminal_service.close()
await self.file_service.close()
await self.monitor_service.close()

注意 config_getter 是个 callable 而不是 dict——配置可能在运行时被服务端下发更新,传 getter 才能拿到最新值。

要是图省事传了 dict,会出现”改了配置但节点还用旧的”这种说不清的灵异问题,做配置热更新的时候被这个坑撞过,痛苦面具直接戴稳。

欢迎关注我的其它发布渠道