TDengine 时序数据在 Django 里的分层接入:Redis 缓实时,超级表存历史

管服务器的,监控数据是个大头——CPU 使用率、每核占用、内存、网络流量、GPU 温度和显存,每个节点每几秒就报一次。ServerManager 用了三层数据架构来存这些东西:PostgreSQL 管业务数据,Redis 缓实时状态,TDengine 存历史时序。

为啥要用三种数据库?PostgreSQL 的表结构是给”一个用户一条记录”准备的,不是给”一个节点每秒一条记录连写三个月”准备的。我们试过把监控数据全写 PostgreSQL,一个月后那张表上千万行,查询直接卡死。

TDengine 天生就是干这个的——超级表(stable)定义结构,子表(table)按节点自动分片,数据保留策略设个 KEEP 180d,过期数据自动清理,查询毫秒级返回。

超级表和子表:一个节点一张表

TDengine 的超级表和子表模型跟关系型数据库的”一张大表 ALL_IN_ONE”思路不同。打个比方:超级表是模板,子表是从模板按标签实例化出来的。每个节点创建自己的子表,标签就是节点 UUID:

1
2
3
4
5
6
7
8
9
10
11
12
-- 超级表,定义数据结构
CREATE STABLE IF NOT EXISTS st_node_usage (
ts TIMESTAMP,
cpu_usage INT,
memory_used BIGINT,
swap_used BIGINT,
disk_io_read BIGINT,
disk_io_write BIGINT,
loadavg_1m DOUBLE,
loadavg_5m DOUBLE,
loadavg_15m DOUBLE
) TAGS (node_uuid NCHAR(37));

写数据时,st_node_usage_<uuid_safe> 是子表名,<uuid_safe> 是 UUID 去掉连字符后的版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
async def write_node_usage(cls, node_uuid, cpu_usage, memory_used, ...):
node_uuid = _validate_node_uuid(node_uuid)
await ensure_node_subtables(node_uuid) # 确保子表存在
uuid_safe = str(node_uuid).replace('-', '_')
subtable = f"{TD_SUBTABLE_NODE_USAGE}_{uuid_safe}"
ts = int(timestamp.timestamp() * 1000) if timestamp else "NOW"
sql = (
f"INSERT INTO {subtable} VALUES "
f"({ts}, {cpu_usage}, {memory_used}, {swap_used}, "
f"{disk_io_read}, {disk_io_write}, "
f"{loadavg_1m}, {loadavg_5m}, {loadavg_15m})"
)
return await tdengine_client.ainsert(sql)

ensure_node_subtables() 会检查子表是否存在,不存在就创建。之所以不在启动时全建,是因为节点可能动态上下线,按需创建更合理。

批量写入:一条 SQL 插多行

单个指标分开写太浪费了。TDengine 支持一条 INSERT 语句写多张子表的数据,write_node_usage_batch() 就是这么干的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async def write_node_usage_batch(cls, node_uuid, cpu_usage, memory_used, ...,
cpu_cores, network_io, accelerators=None,
timestamp=None):
uuid_safe = str(node_uuid).replace('-', '_')
ts = int(timestamp.timestamp() * 1000) if timestamp else "NOW"

results = []

# 1. 整机使用率
subtable = f"{TD_SUBTABLE_NODE_USAGE}_{uuid_safe}"
sql = f"INSERT INTO {subtable} VALUES ({ts}, {cpu_usage}, ...)"
results.append(await tdengine_client.ainsert(sql))

# 2. 每核 CPU —— 合并成一条 INSERT
if cpu_cores:
core_inserts = []
for core_data in cpu_cores:
await ensure_cpu_core_subtable(node_uuid, core_data['core_index'])
core_subtable = f"{TD_SUBTABLE_NODE_CPU_CORE}_{uuid_safe}_{core_data['core_index']}"
core_inserts.append(f"{core_subtable} VALUES ({ts}, {core_data['usage']})")
sql = "INSERT INTO " + " ".join(core_inserts)
results.append(await tdengine_client.ainsert(sql))

# 3. 网络流量 —— 同理合并
if network_io:
net_inserts = []
for port_name, io_data in network_io.items():
await ensure_network_subtable(node_uuid, port_name)
port_safe = port_name.replace('.', '_').replace('-', '_')[:40]
net_subtable = f"{TD_SUBTABLE_NODE_NETWORK}_{uuid_safe}_{port_safe}"
net_inserts.append(f"{net_subtable} VALUES ({ts}, {io_data['bytes_sent']}, {io_data['bytes_recv']})")
sql = "INSERT INTO " + " ".join(net_inserts)
results.append(await tdengine_client.ainsert(sql))

# 4. 加速器(GPU/NPU)
if accelerators:
# ... 同理

return all(results) if results else True

TDengine 的 INSERT 语法允许同时插入多张子表的数据,用空格隔开就行。这样做减少了网络往返次数,一个有 16 核 CPU 和 2 个网口的节点,一次批量写入只需要 4 条 SQL(整机、CPU 核心、网络、加速器),而不是 19 条。

SQL 注入防护:UUID 和网卡名的白名单校验

子表名是拼接的(TDengine 的 REST API 不支持参数化查询),所以拼表名之前要做严格校验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
_UUID_RE = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.IGNORECASE)
_PORT_NAME_RE = re.compile(r'^[A-Za-z0-9_.\-]{1,40}$')

def _validate_node_uuid(node_uuid: str) -> str:
uuid_str = str(node_uuid or '').strip()
if not _UUID_RE.match(uuid_str):
raise ValueError(f"无效的 node_uuid 格式: {uuid_str}")
return uuid_str

def _validate_port_name(port_name: str) -> str:
name = str(port_name or '').strip()
if not _PORT_NAME_RE.match(name):
raise ValueError(f"无效的网卡名: {name}")
return name

UUID 必须是标准格式 xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx,网卡名只允许字母数字点连字符下划线,长度不超过 40。标签值通过 _escape_tdengine_string() 转义单引号。表名里的连字符换成下划线(TDengine 表名不允许连字符),长度限制在 60 字符。

这些校验不是做样子——在节点端就开始了(因为节点是跑在远端机器上的 Agent,数据也是它报上来的),但服务端再校验一遍是深度防御。

Redis 缓实时:查询先看缓存

历史数据查 TDengine,但”最近一次上报”的实时数据查 TDengine 就太慢了——你要 SELECT LAST_ROW(*) 扫一遍超级表。所以实时数据缓到 Redis 里,键名 node_usage_<uuid>,节点每次上报时更新:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@staticmethod
async def get_last_node_usage(node_uuid: str) -> Optional[dict]:
# 先查 Redis
# Redis 层没在 query.py 里,而是在 Node Manager 的 cache 里
# 这里是兜底:Redis 没命中时查 TDengine
uuid_safe = str(node_uuid).replace('-', '_')
subtable = f"{TD_SUBTABLE_NODE_USAGE}_{uuid_safe}"
sql = f"SELECT LAST_ROW(*) FROM {subtable}"
try:
rows = await tdengine_client.aquery(sql)
if rows:
row = rows[0]
row['timestamp'] = _ms_to_datetime(row.get('ts', 0))
return row
except Exception:
pass
return None

实际使用中,仪表盘展示当前状态时直接走 Redis 缓存(毫秒级),展示历史趋势图时走 TDengine 查询(秒级返回 7 天数据)。两种数据库各司其职,不互相抢活。

时间范围查询

历史数据查询都接受 start_timeend_time 参数,内部转成毫秒时间戳:

1
2
3
4
5
6
7
8
def _parse_to_ms(time_str: str) -> int:
for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d'):
try:
dt = datetime.strptime(time_str, fmt)
return int(dt.timestamp() * 1000)
except ValueError:
continue
return int(datetime.now().timestamp() * 1000)

支持四种时间格式,都不行就用当前时间。SQL 里用 ts >= start_ms AND ts <= end_ms 做范围过滤,走的是 TDengine 的时间分区索引,查询基本是 O(分区数) 而不是 O(总行数)。

数据生命周期:180 天自动过期

TDengine 的超级表在创建时可以指定 KEEP 参数:

1
CREATE STABLE IF NOT EXISTS st_node_usage (...) TAGS (node_uuid NCHAR(37)) KEEP 180d;

这意味着写入的数据 180 天后自动删除,不需要写 cron job 去清理。对比 PostgreSQL 方案要手动 DELETE FROM node_usage WHERE ts < now() - interval '180 days',还得担心删除大表锁表的问题。

查询聚合:一次返回全维度数据

query_performance_record() 把整机使用率、每核 CPU、网络流量、加速器数据按时间戳对齐,合并成一条查询结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@staticmethod
async def query_performance_record(node_uuid, start_time, end_time,
device="_all", processor_count=0, memory_total=0):
usage_rows = await TDengineQuery.query_node_usage(node_uuid, start_time, end_time)
if not usage_rows:
return []

# 按设备类型筛选
cpu_core_map = {} # {ts: [{core, usage}, ...]}
net_map = {} # {ts: [{name, bytes_sent, bytes_recv}, ...]}
accelerator_map = {}

if device in ("_all", "cpu"):
cpu_core_rows = await TDengineQuery.query_cpu_core_usage(...)
for r in cpu_core_rows:
cpu_core_map.setdefault(r.get('ts', 0), []).append({...})

# ... network, accelerator 同理

result = []
for row in usage_rows:
ts = row.get('ts', 0)
item = {"timestamp": row['timestamp']}
if device == "_all" or device == "cpu":
item["cpu_usage"] = row.get('cpu_usage', 0)
item["cpu_cores_usage"] = cpu_core_map.get(ts, [])
if device == "_all" or device == "network":
item["network_usage"] = net_map.get(ts, [])
# ...
result.append(item)
return result

device 参数控制查询维度——"cpu" 只查 CPU 相关,"network" 只查网络,"_all" 全查。前端仪表盘可以按需请求不同维度的数据,不需要一次全拉。


三种数据库不是炫技,是各管各最能管的:PostgreSQL 管用户、权限、节点信息这种关系型数据;Redis 管在线状态、实时监控这些 TTL 短、读写频率高的数据;TDengine 管按时间排列的历史监控数据,自动过期、自动聚合。这种分层让每种数据库都在自己最擅长的场景下工作,而不是逼着 PostgreSQL 做它不擅长的事。

欢迎关注我的其它发布渠道