管服务器的,监控数据是个大头——CPU 使用率、每核占用、内存、网络流量、GPU 温度和显存,每个节点每几秒就报一次。ServerManager 用了三层数据架构来存这些东西:PostgreSQL 管业务数据,Redis 缓实时状态,TDengine 存历史时序。
为啥要用三种数据库?PostgreSQL 的表结构是给”一个用户一条记录”准备的,不是给”一个节点每秒一条记录连写三个月”准备的。我们试过把监控数据全写 PostgreSQL,一个月后那张表上千万行,查询直接卡死。
TDengine 天生就是干这个的——超级表(stable)定义结构,子表(table)按节点自动分片,数据保留策略设个 KEEP 180d,过期数据自动清理,查询毫秒级返回。
超级表和子表:一个节点一张表 TDengine 的超级表和子表模型跟关系型数据库的”一张大表 ALL_IN_ONE”思路不同。打个比方:超级表是模板,子表是从模板按标签实例化出来的。每个节点创建自己的子表,标签就是节点 UUID:
1 2 3 4 5 6 7 8 9 10 11 12 CREATE STABLE IF NOT EXISTS st_node_usage ( ts TIMESTAMP , cpu_usage INT , memory_used BIGINT , swap_used BIGINT , disk_io_read BIGINT , disk_io_write BIGINT , loadavg_1m DOUBLE , loadavg_5m DOUBLE , loadavg_15m DOUBLE ) TAGS (node_uuid NCHAR (37 ));
写数据时,st_node_usage_<uuid_safe> 是子表名,<uuid_safe> 是 UUID 去掉连字符后的版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 async def write_node_usage (cls, node_uuid, cpu_usage, memory_used, ... ): node_uuid = _validate_node_uuid(node_uuid) await ensure_node_subtables(node_uuid) uuid_safe = str (node_uuid).replace('-' , '_' ) subtable = f"{TD_SUBTABLE_NODE_USAGE} _{uuid_safe} " ts = int (timestamp.timestamp() * 1000 ) if timestamp else "NOW" sql = ( f"INSERT INTO {subtable} VALUES " f"({ts} , {cpu_usage} , {memory_used} , {swap_used} , " f"{disk_io_read} , {disk_io_write} , " f"{loadavg_1m} , {loadavg_5m} , {loadavg_15m} )" ) return await tdengine_client.ainsert(sql)
ensure_node_subtables() 会检查子表是否存在,不存在就创建。之所以不在启动时全建,是因为节点可能动态上下线,按需创建更合理。
批量写入:一条 SQL 插多行 单个指标分开写太浪费了。TDengine 支持一条 INSERT 语句写多张子表的数据,write_node_usage_batch() 就是这么干的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 async def write_node_usage_batch (cls, node_uuid, cpu_usage, memory_used, ..., cpu_cores, network_io, accelerators=None , timestamp=None ): uuid_safe = str (node_uuid).replace('-' , '_' ) ts = int (timestamp.timestamp() * 1000 ) if timestamp else "NOW" results = [] subtable = f"{TD_SUBTABLE_NODE_USAGE} _{uuid_safe} " sql = f"INSERT INTO {subtable} VALUES ({ts} , {cpu_usage} , ...)" results.append(await tdengine_client.ainsert(sql)) if cpu_cores: core_inserts = [] for core_data in cpu_cores: await ensure_cpu_core_subtable(node_uuid, core_data['core_index' ]) core_subtable = f"{TD_SUBTABLE_NODE_CPU_CORE} _{uuid_safe} _{core_data['core_index' ]} " core_inserts.append(f"{core_subtable} VALUES ({ts} , {core_data['usage' ]} )" ) sql = "INSERT INTO " + " " .join(core_inserts) results.append(await tdengine_client.ainsert(sql)) if network_io: net_inserts = [] for port_name, io_data in network_io.items(): await ensure_network_subtable(node_uuid, port_name) port_safe = port_name.replace('.' , '_' ).replace('-' , '_' )[:40 ] net_subtable = f"{TD_SUBTABLE_NODE_NETWORK} _{uuid_safe} _{port_safe} " net_inserts.append(f"{net_subtable} VALUES ({ts} , {io_data['bytes_sent' ]} , {io_data['bytes_recv' ]} )" ) sql = "INSERT INTO " + " " .join(net_inserts) results.append(await tdengine_client.ainsert(sql)) if accelerators: return all (results) if results else True
TDengine 的 INSERT 语法允许同时插入多张子表的数据,用空格隔开就行。这样做减少了网络往返次数,一个有 16 核 CPU 和 2 个网口的节点,一次批量写入只需要 4 条 SQL(整机、CPU 核心、网络、加速器),而不是 19 条。
SQL 注入防护:UUID 和网卡名的白名单校验 子表名是拼接的(TDengine 的 REST API 不支持参数化查询),所以拼表名之前要做严格校验:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 _UUID_RE = re.compile (r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$' , re.IGNORECASE) _PORT_NAME_RE = re.compile (r'^[A-Za-z0-9_.\-]{1,40}$' )def _validate_node_uuid (node_uuid: str ) -> str : uuid_str = str (node_uuid or '' ).strip() if not _UUID_RE.match (uuid_str): raise ValueError(f"无效的 node_uuid 格式: {uuid_str} " ) return uuid_strdef _validate_port_name (port_name: str ) -> str : name = str (port_name or '' ).strip() if not _PORT_NAME_RE.match (name): raise ValueError(f"无效的网卡名: {name} " ) return name
UUID 必须是标准格式 xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx,网卡名只允许字母数字点连字符下划线,长度不超过 40。标签值通过 _escape_tdengine_string() 转义单引号。表名里的连字符换成下划线(TDengine 表名不允许连字符),长度限制在 60 字符。
这些校验不是做样子——在节点端就开始了(因为节点是跑在远端机器上的 Agent,数据也是它报上来的),但服务端再校验一遍是深度防御。
Redis 缓实时:查询先看缓存 历史数据查 TDengine,但”最近一次上报”的实时数据查 TDengine 就太慢了——你要 SELECT LAST_ROW(*) 扫一遍超级表。所以实时数据缓到 Redis 里,键名 node_usage_<uuid>,节点每次上报时更新:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @staticmethod async def get_last_node_usage (node_uuid: str ) -> Optional [dict ]: uuid_safe = str (node_uuid).replace('-' , '_' ) subtable = f"{TD_SUBTABLE_NODE_USAGE} _{uuid_safe} " sql = f"SELECT LAST_ROW(*) FROM {subtable} " try : rows = await tdengine_client.aquery(sql) if rows: row = rows[0 ] row['timestamp' ] = _ms_to_datetime(row.get('ts' , 0 )) return row except Exception: pass return None
实际使用中,仪表盘展示当前状态时直接走 Redis 缓存(毫秒级),展示历史趋势图时走 TDengine 查询(秒级返回 7 天数据)。两种数据库各司其职,不互相抢活。
时间范围查询 历史数据查询都接受 start_time 和 end_time 参数,内部转成毫秒时间戳:
1 2 3 4 5 6 7 8 def _parse_to_ms (time_str: str ) -> int : for fmt in ('%Y-%m-%d %H:%M:%S' , '%Y-%m-%dT%H:%M' , '%Y-%m-%dT%H:%M:%S' , '%Y-%m-%d' ): try : dt = datetime.strptime(time_str, fmt) return int (dt.timestamp() * 1000 ) except ValueError: continue return int (datetime.now().timestamp() * 1000 )
支持四种时间格式,都不行就用当前时间。SQL 里用 ts >= start_ms AND ts <= end_ms 做范围过滤,走的是 TDengine 的时间分区索引,查询基本是 O(分区数) 而不是 O(总行数)。
数据生命周期:180 天自动过期 TDengine 的超级表在创建时可以指定 KEEP 参数:
1 CREATE STABLE IF NOT EXISTS st_node_usage (...) TAGS (node_uuid NCHAR (37 )) KEEP 180 d;
这意味着写入的数据 180 天后自动删除,不需要写 cron job 去清理。对比 PostgreSQL 方案要手动 DELETE FROM node_usage WHERE ts < now() - interval '180 days',还得担心删除大表锁表的问题。
查询聚合:一次返回全维度数据 query_performance_record() 把整机使用率、每核 CPU、网络流量、加速器数据按时间戳对齐,合并成一条查询结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @staticmethod async def query_performance_record (node_uuid, start_time, end_time, device="_all" , processor_count=0 , memory_total=0 ): usage_rows = await TDengineQuery.query_node_usage(node_uuid, start_time, end_time) if not usage_rows: return [] cpu_core_map = {} net_map = {} accelerator_map = {} if device in ("_all" , "cpu" ): cpu_core_rows = await TDengineQuery.query_cpu_core_usage(...) for r in cpu_core_rows: cpu_core_map.setdefault(r.get('ts' , 0 ), []).append({...}) result = [] for row in usage_rows: ts = row.get('ts' , 0 ) item = {"timestamp" : row['timestamp' ]} if device == "_all" or device == "cpu" : item["cpu_usage" ] = row.get('cpu_usage' , 0 ) item["cpu_cores_usage" ] = cpu_core_map.get(ts, []) if device == "_all" or device == "network" : item["network_usage" ] = net_map.get(ts, []) result.append(item) return result
device 参数控制查询维度——"cpu" 只查 CPU 相关,"network" 只查网络,"_all" 全查。前端仪表盘可以按需请求不同维度的数据,不需要一次全拉。
三种数据库不是炫技,是各管各最能管的:PostgreSQL 管用户、权限、节点信息这种关系型数据;Redis 管在线状态、实时监控这些 TTL 短、读写频率高的数据;TDengine 管按时间排列的历史监控数据,自动过期、自动聚合。这种分层让每种数据库都在自己最擅长的场景下工作,而不是逼着 PostgreSQL 做它不擅长的事。