ServerManager 这个项目要管一堆远端节点——可能是云服务器、可能是边缘的工控机、也可能是机器人小车。中控服务端要给它们下指令、收它们的状态。HTTP 轮询太重,所以一开始就选了 WebSocket。
WebSocket 一行代码就连上了,难的是上线之后的那些事——网络抖一下连接断了、token 过期了得重新认证、消息怎么防篡改、命令怎么不让节点端乱执行。下面把这几块各自的思路捋一下。
永远在重连 节点客户端的主体是一个 while True:连接、跑消息循环、断开、等几秒、再连。所有异常都不能让它跳出最外层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import aiohttpimport asyncioimport jsonclass NodeWebSocketClient : def __init__ (self ): self ._session: aiohttp.ClientSession = None self ._ws: aiohttp.ClientWebSocketResponse = None async def run_forever (self ) -> None : while True : try : self ._session, auth_status, access_token = await self ._authenticate() if not auth_status: await asyncio.sleep(5 ) continue async with self ._session.ws_connect( self ._ws_url(access_token), autoping=True , ) as ws: logger.success("WebSocket 已连接" ) self ._ws = ws await self ._message_handler() except aiohttp.ClientError as err: logger.error(f"WebSocket 连接失败: {err} " ) finally : self ._ws = None await asyncio.sleep(5 )
autoping=True 让 aiohttp 自动发 ping,对付 NAT 超时和负载均衡器主动断流。
固定 5 秒重连间隔,在节点规模一大就要换成带抖动的退避——想象一下,一万台机器同一秒全断了再同一秒全重连,服务端那边瞬间就被打挂了。
Bearer Token + 消息签名 光有 Token 是不够的。Token 只能证明”这个客户端有权连”,但连接建立之后,每一条消息是不是真的来自这个客户端、有没有被中间人改过——那是另一个问题。
所以关键操作的消息额外做一层 HMAC 签名。
HTTP 拿 token:
1 2 3 4 5 6 7 async def _authenticate (self ): async with aiohttp.ClientSession() as session: async with session.post(self ._auth_url, json=self ._auth_data) as resp: data = await resp.json() if resp.status == 200 : return session, True , data["access_token" ] return session, False , None
消息签名用 HMAC-SHA256,key 是节点专属 token(和 access_token 不是一个东西,是节点注册时发的长期密钥):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import hashlibimport hmacdef generate_signature (node_token: str , action: str , payload: dict ) -> str : message = f"{action} :{json.dumps(payload, sort_keys=True )} " signature = hmac.new( node_token.encode(), message.encode(), hashlib.sha256, ).hexdigest() return signatureasync def websocket_send_json (self, data: dict ) -> None : action = data.get("action" , "" ) payload = data.get("data" , {}) if self ._needs_signature(action): data["_sign" ] = generate_signature(self .node_token(), action, payload) await self ._ws.send_str(json.dumps(data))
服务端验证:
1 2 3 def verify_signature (node_token: str , action: str , payload: dict , signature: str ) -> bool : expected = generate_signature(node_token, action, payload) return hmac.compare_digest(expected, signature)
两个细节要划重点:
hmac.compare_digest 千万别图省事换成 ==——后者有时序攻击风险。这种事自己写很容易写错,能用标准库的就用,别炫技。
sort_keys=True 也是必须的——客户端和服务端要是用不同语言,dict 序列化的字段顺序可能不一致,签名当场对不上。
Action 分发 服务端下发的消息固定有个 action 字段。客户端按 action 分发到对应处理器,写法就是个注册表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class NodeActionDispatcher : def __init__ (self, client: NodeWebSocketClient ): self ._client = client self ._handlers: Dict [str , Callable ] = { "execute_command" : self ._handle_execute_command, "download_file" : self ._handle_download_file, "update_config" : self ._handle_update_config, "restart" : self ._handle_restart, } async def dispatch (self, action: str , payload: dict , raw_data: dict ) -> bool : handler = self ._handlers.get(action) if handler is None : logger.warning(f"未知的 action: {action} " ) return True try : await handler(payload) except Exception as e: logger.error(f"处理 action {action} 失败: {e} " ) await self ._client.websocket_send_json({ "action" : f"{action} _response" , "data" : {"success" : False , "error" : str (e)}, }) return True async def _handle_execute_command (self, payload: dict ): command = payload.get("command" ) timeout = payload.get("timeout" , 30 ) if not self ._check_command_policy(command): raise ValueError(f"命令被安全策略禁止: {command} " ) result = await self ._execute_command(command, timeout) await self ._client.websocket_send_json({ "action" : "execute_command_response" , "data" : {"success" : True , "output" : result}, })
注意未知 action 用的是 warning,不是 error,并且返回 True 继续处理后续消息——服务端可能下发了节点不认识的新 action(比如客户端版本太老),但这绝不应该让节点断连。
消息循环用了 Python 3.10 的 match 来分发消息类型,比一堆 if elif 清爽:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 async def _message_handler (self ) -> None : while self ._ws and not self ._ws.closed: msg = await self ._ws.receive() match msg.type : case aiohttp.WSMsgType.TEXT: try : data = json.loads(msg.data) except json.JSONDecodeError: logger.warning("收到非法 JSON" ) continue action = data.get("action" ) payload = data.get("data" ) if not isinstance (action, str ) or not action: logger.warning("收到缺少 action 的消息" ) continue should_continue = await self ._dispatcher.dispatch(action, payload, data) if not should_continue: return case aiohttp.WSMsgType.CLOSE: logger.info("连接已断开" ) return
命令策略 execute_command 这种 action 是 RCE 风险最大的地方——服务端被打穿,就能直接控制节点。所以节点这边自己也得有一道把关。
最低限度是一个黑名单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class CommandPolicy : def __init__ (self, config: dict ): self ._disable_list = config.get("disable_command_list" , "" ) self ._strict = config.get("strict" , True ) def check (self, command: str ) -> bool : disabled = [c.strip() for c in self ._disable_list.split("," ) if c.strip()] for pattern in disabled: if pattern == "*" : return False if pattern.startswith("*" ) and command.endswith(pattern[1 :]): return False if pattern.endswith("*" ) and command.startswith(pattern[:-1 ]): return False if command == pattern: return False return True
说句实在话:黑名单的安全性永远低于白名单——总能找到等价命令绕过 (rm -rf 禁了还有 find ... -delete 嘛)。
但白名单在通用节点管理这种场景几乎没法定义——你哪知道用户会想跑啥命令?所以这里的策略只能定位为”防呆”,不是真正的安全边界。
真正的安全边界应该在节点之外——以受限用户运行节点、用 cgroup/容器隔离。
运行时服务 节点上有一堆模块——终端服务、文件服务、监控服务——都依赖同一个会话和连接。包成一个 NodeRuntimeServices 集中管理生命周期,省心:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class NodeRuntimeServices : def __init__ (self, client, session, connection, access_token, config_getter ): self .client = client self .session = session self .connection = connection self .access_token = access_token self .config_getter = config_getter @classmethod def create (cls, client, session, connection, access_token, config_getter, command_policy ): services = cls(client, session, connection, access_token, config_getter) services.terminal_service = TerminalService(session, connection) services.file_service = FileService(session, connection) services.monitor_service = MonitorService(session, connection) return services async def close (self ): await self .terminal_service.close() await self .file_service.close() await self .monitor_service.close()
注意 config_getter 是个 callable 而不是 dict——配置可能在运行时被服务端下发更新,传 getter 才能拿到最新值。
要是图省事传了 dict,会出现”改了配置但节点还用旧的”这种说不清的灵异问题,做配置热更新的时候被这个坑撞过,痛苦面具直接戴稳。