这个项目用的 Peewee + SQLite。Peewee 自带 migrate() 模块,也有 Alembic 这样的第三方方案。但我们一概没用——不是不知道,是试了之后发现,IoT 后端的迁移场景跟 Web 应用不一样:字段多、关联杂、中间表是 Peewee 自动生成的,而且经常要在已有数据上修索引、回填默认值。

Alembic 每次生成一个迁移脚本文件,时间戳命名,堆了几十个文件之后,部署到现场设备上——树莓派的 SD 卡读写本来就慢,迁移脚本跑一半断电了,数据库就废了。

最后选了最朴素的方案:启动时跑一遍 apply_migrations(),纯 SQL,幂等,不加版本号。跑了半年,20 多个补丁,一次没翻车。

核心思路:先检查再执行,每步都幂等

整个迁移函数的结构就三层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def apply_migrations(db):
# 第一层:加字段
changed |= add_column_if_missing("entity", "extra", "TEXT")
changed |= add_column_if_missing("rover", "runtime_config", "TEXT")
changed |= add_column_if_missing("rover", "config_version", "INTEGER DEFAULT 1")

# 第二层:修索引
changed |= fix_modbus_register_through_unique_index()
changed |= ensure_runtime_config_version_unique_index()

# 第三层:建新表
if not table_exists("rover_runtime_config_version"):
RoverRuntimeConfigVersion.create_table(safe=True)

# 第四层:回填数据
db.execute_sql("UPDATE rover SET config_version = 1 WHERE config_version IS NULL OR config_version <= 0")

每一层都自己检查”要不要做”,不依赖外部状态。add_column_if_missing 检查列是否存在,table_exists 检查表是否存在,索引修复检查索引结构是否正确。

加字段:SQLite 的 ALTER TABLE 只有 ADD COLUMN

SQLite 的 ALTER TABLE 支持非常有限:只能 ADD COLUMN,不能 DROP COLUMN(3.35.0 之前),不能 MODIFY COLUMN,不能加 UNIQUE 约束。

所以加字段是最安全的操作:

1
2
3
4
5
def add_column_if_missing(table_name, column_name, column_def):
if column_exists(table_name, column_name):
return False
db.execute_sql(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_def}")
return True

column_exists 用的是 PRAGMA table_info

1
2
3
def column_exists(table_name, column_name):
rows = db.execute_sql(f"PRAGMA table_info('{table_name}')").fetchall()
return any(row[1] == column_name for row in rows)

注意 column_def 里要带默认值,比如 INTEGER DEFAULT 1。SQLite 的 ADD COLUMN 对新行生效,已有行的该列值是 NULL——除非你显式指定了 DEFAULT。但即使指定了 DEFAULT,已有行仍然是 NULL(SQLite 的怪癖),所以后面还得跟一条 UPDATE 回填。

修索引:那个 Peewee 自动生成的中间表

这是最头疼的一个。Peewee 的 ManyToManyField 会自动创建中间表,表名是 modbus_device_config_modbus_device_register_through(对,就是这么长)。

早期代码没管唯一约束,Peewee 默认给中间表加了个单列唯一索引——只限制了 modbusdeviceconfig_id。这意味着同一个配置只能关联一个寄存器,但实际上一台 Modbus 设备有十几个寄存器。Insert 不会报错(因为索引只管唯一性不管存在性),但数据会悄悄丢。

修复分两步:先删错误索引,再建正确的联合索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def fix_modbus_register_through_unique_index():
table_name = "modbus_device_config_modbus_device_register_through"
if not table_exists(table_name):
return False

# 删除错误的单列唯一索引
indexes = db.execute_sql(f"PRAGMA index_list('{table_name}')").fetchall()
for idx in indexes:
index_name = idx[1]
is_unique = bool(idx[2])
if not is_unique:
continue
cols = get_index_columns(index_name)
if len(cols) == 1 and cols[0] == "modbusdeviceconfig_id":
db.execute_sql(f"DROP INDEX IF EXISTS {index_name}")

# 建联合唯一索引
indexes = db.execute_sql(f"PRAGMA index_list('{table_name}')").fetchall()
has_pair_unique = False
for idx in indexes:
cols = set(get_index_columns(idx[1]))
if cols == {"modbusdeviceconfig_id", "modbusdeviceregister_id"}:
has_pair_unique = True
break

if not has_pair_unique:
db.execute_sql(
f"CREATE UNIQUE INDEX IF NOT EXISTS idx_modbus_config_register_unique "
f"ON {table_name}(modbusdeviceconfig_id, modbusdeviceregister_id)"
)

为什么要先检查再建?因为迁移可能跑多次(重启就跑一次),不能假设索引一定不存在。CREATE UNIQUE INDEX IF NOT EXISTS 本身是幂等的,但前面的 DROP INDEX 不是——如果跑第二次,索引已经被删了,不需要再删。

表重命名:vl_recognition_task → inference_task

产品早期只有视觉识别任务,表叫 vl_recognition_task。后来加了其他推理任务,表名得改。SQLite 支持 ALTER TABLE ... RENAME TO

1
2
3
4
5
6
7
8
if table_exists("vl_recognition_task") and not table_exists("inference_task"):
db.execute_sql("ALTER TABLE vl_recognition_task RENAME TO inference_task")
add_column_if_missing("inference_task", "task_type", "VARCHAR(255) DEFAULT 'vl_recognition'")
add_column_if_missing("inference_task", "source_type", "VARCHAR(255)")
add_column_if_missing("inference_task", "source_id", "INTEGER")
add_column_if_missing("inference_task", "input_params", "TEXT DEFAULT '{}'")
add_column_if_missing("inference_task", "result_data", "TEXT")
db.execute_sql("UPDATE inference_task SET task_type = 'vl_recognition' WHERE task_type IS NULL OR task_type = ''")

关键条件:table_exists("vl_recognition_task") and not table_exists("inference_task")。两个表不能同时存在——如果 inference_task 已经存在,说明迁移已经做过了,跳过。如果两个都不存在,说明是新部署,建表由 db.create_tables() 处理,也跳过。

重命名之后加新字段、回填 task_type,一条龙搞定。老数据无缝过渡。

版本唯一索引修复:先清数据再建索引

rover_runtime_config_version 表需要 (rover_id, version_no) 的联合唯一索引。但历史数据里可能有重复版本号——早期代码没做校验,同一个 rover 的 version_no=1 出现了三行。

建唯一索引之前必须先去重,否则 CREATE UNIQUE INDEX 会失败:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def ensure_runtime_config_version_unique_index():
# 先标准化异常版本号
db.execute_sql(f"UPDATE {table_name} SET version_no = 1 WHERE version_no IS NULL OR version_no <= 0")

# 查重
duplicate_groups = db.execute_sql(
f"SELECT COUNT(1) FROM ("
f" SELECT rover_id, version_no, COUNT(1) AS c"
f" FROM {table_name} GROUP BY rover_id, version_no HAVING c > 1"
f")"
).fetchone()

if dup_count > 0:
# 同一 rover + version_no 只保留 id 最大的那一行
db.execute_sql(
f"DELETE FROM {table_name} WHERE id IN ("
f" SELECT t.id FROM {table_name} t"
f" JOIN ("
f" SELECT rover_id, version_no, MAX(id) AS keep_id"
f" FROM {table_name} GROUP BY rover_id, version_no HAVING COUNT(1) > 1"
f" ) d ON t.rover_id = d.rover_id AND t.version_no = d.version_no"
f" WHERE t.id <> d.keep_id"
f")"
)

保留 MAX(id) 那一行——id 自增,最大的就是最新的。删完重复数据,再建索引就稳了。

然后还要把 rover 表里的 config_version 同步成实际最大版本号:

1
2
3
4
5
6
db.execute_sql(
"UPDATE rover SET config_version = COALESCE("
" (SELECT MAX(v.version_no) FROM rover_runtime_config_version v WHERE v.rover_id = rover.id),"
" CASE WHEN config_version IS NULL OR config_version <= 0 THEN 1 ELSE config_version END"
")"
)

COALESCE 兜底:如果版本历史表里有数据就取最大值,没有就保持原值,原值为空就默认 1。

回填默认值的那些 SQL

迁移的最后一步往往是回填。历史数据里新加的字段全是 NULL,业务代码如果没做空值判断就会崩。

1
2
3
4
5
6
db.execute_sql("UPDATE rover SET config_version = 1 WHERE config_version IS NULL OR config_version <= 0")
db.execute_sql("UPDATE rover SET rover_auth_enabled = 0 WHERE rover_auth_enabled IS NULL")
db.execute_sql(
"UPDATE rover_model_artifact SET conversion_status='completed' "
"WHERE conversion_status IS NULL OR conversion_status = ''"
)

这些 UPDATE 语句跑在 SQLite 上,几万行数据也就几百毫秒。如果是 MySQL/PostgreSQL 上百万行的大表,得考虑分批 UPDATE 避免锁表——但 IoT 后端部署在树莓派上,数据量不会超过几千行,一条 UPDATE 全搞定。

为什么不用 Alembic

试过,但遇到几个问题:

  1. 现场设备断电:树莓派拔电源是常态。Alembic 迁移脚本执行到一半断电,alembic_version 表已经更新但 SQL 没跑完,重启后 Alembic 认为迁移已完成,实际上缺字段
  2. Peewee 中间表:Alembic 不认识 Peewee 自动生成的中间表名,生成迁移脚本时会报错
  3. 回填逻辑:Alembic 的 upgrade() 里写数据回填总觉得不太对劲,它设计上更适合改 schema 而不是改数据

我们的方案没版本号,每次启动全量检查。好处是天然幂等:断电了?重启再跑一遍,已经做过的操作会跳过。坏处是每次启动多几十毫秒的检查时间——在 IoT 场景里完全可以接受。

一点经验

  • SQLite 的 PRAGMA table_infoPRAGMA index_list 是迁移的好帮手,比 try/except 优雅
  • ALTER TABLE ADD COLUMN 带上 DEFAULT,减少回填 SQL 的数量
  • 联合唯一索引建之前一定先查重复数据,否则 CREATE 会失败
  • 表重命名用 ALTER TABLE RENAME TO,别新建表再 INSERT INTO SELECT——后者数据量大时锁表时间太长
  • 每条迁移操作都假设自己可能被执行多次,写之前先检查

说白了,SQLite 迁移的核心原则就一个:别假设上一次迁移一定成功了。每一步都自己验证,该补就补,该跳就跳。

做 IoT 后端做到一半,需求来了:要接巡检机器人。不是那种遥控小车,是果园里自动跑、自己建图、识别病虫害、汇报异常的那种。

后端要干的活:注册小车、收心跳、下发任务、收图片跑识别、收传感器数据、显示实时位置、建图进度推送、兴趣点标注、视频流转发、异常报警……光看 API 文档就有 40 多个端点。

这篇不打算逐个 API 讲,而是挑几个真正让我掉头发的点:ROS 地图解码、实时数据推送的内存架构、兴趣点去重、还有配置版本管理。

ROS OccupancyGrid → PNG:三次翻转的故事

小车上跑的是 ROS2,建图用的 SLAM,输出的是 OccupancyGrid 消息。这个消息的 data 字段是 zlib 压缩的 int8[],值域是 0~100(概率)或 -1(未知)。

前端要 PNG,不是原始栅格。转换过程看着简单,但地图方向踩了坑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _grid_zlib_to_png(zlib_data: bytes, width: int, height: int) -> bytes:
raw = zlib.decompress(zlib_data)
grid = np.frombuffer(raw, dtype=np.int8).reshape((height, width))

# 三色映射:0=空闲(254), >50=障碍(0), 其余=未知(205)
image = np.full(grid.shape, 205, dtype=np.uint8)
image[grid == 0] = 254
image[grid > 50] = 0

# ROS 地图的坐标系原点在左下角,图像在左上角,得上下翻转
image_to_save = np.flipud(image)

ok, buf = cv2.imencode('.png', image_to_save)
return buf.tobytes()
  • zlib.decompress → 解压
  • np.frombuffer(..., dtype=np.int8).reshape(...) → 一维转二维
  • np.flipud → 翻转 Y 轴

三色映射的阈值 50 是 ROS 的惯例:0 是确定空闲,100 是确定障碍,中间值是”可能”。阈值 50 以上画成黑色障碍,0 画成白色通道,其他画成灰色未知区域。

一开始没做 flipud,前端显示的地图是上下颠倒的——因为 ROS 的 OccupancyGrid 原点在左下角(跟数学坐标系一样),但 PNG 的像素从左上角开始排。翻了之后才正常。

实时数据推送:五套内存缓存

巡检车后端有五种需要实时推送的数据,每种都有独立的内存缓存和订阅者列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 视频帧
_frame_cache: Dict[int, bytes] = {}
_stream_subscribers: Dict[int, list] = defaultdict(list)

# 实时位置
_rover_position_cache: Dict[int, Dict[str, Any]] = {}

# 建图进度
_map_progress_chunks: Dict[str, Dict[int, bytes]] = {}
_map_progress_subscribers: Dict[int, list] = defaultdict(list)

# 兴趣点
_interest_points_cache: Dict[int, List[dict]] = defaultdict(list)
_interest_points_subscribers: Dict[int, list] = defaultdict(list)

# 任务状态
_task_subscribers: Dict[int, list] = defaultdict(list)

为什么不用 Redis?因为这些数据的生命周期就是”当前在线的连接”,小车下线就作废了,存 Redis 反而多一次序列化开销。内存里直接存 bytes/dict,SSE 端点直接 yield,延迟最低。

推送模式统一是 SSE(视频帧之外),每个 SSE 端点长这样:

1
2
3
4
5
6
7
8
9
10
@router.get("/rover/{rover_id}/stream/position")
async def stream_position(rover_id: int):
queue = asyncio.Queue(maxsize=64)
_position_subscribers[rover_id].append(queue)
async def generate():
while True:
payload = await asyncio.wait_for(queue.get(), timeout=30)
if payload is None: break
yield f"data: {json.dumps(payload)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")

数据源(心跳上报、建图进度等)在收到数据时往订阅者队列里 put_nowait

1
2
3
4
5
for q in _map_progress_subscribers.get(rover_id, []):
try:
q.put_nowait({"type": "chunk", ...})
except asyncio.QueueFull:
pass # 队列满了就丢,实时数据不怕丢帧

实时数据的哲学:宁可丢帧,不能卡住。所以队列都有 maxsize,满了就 pass。前端收不到一帧位置更新无所谓,下一帧马上就来了。

兴趣点去重:HTTP 和 WebSocket 的双通道问题

兴趣点(比如识别到病害的位置)有两个上报通道:HTTP REST 和 WebSocket。小车走 HTTP 上报稳定可靠,WebSocket 走实时推送延迟低。问题是同一个兴趣点可能两个通道都上报一次,前端就显示两个点。

解法:近邻去重。

1
2
3
4
5
6
7
8
9
10
11
12
13
def _append_interest_point(rover_id, payload):
point = _normalize_interest_point_payload(rover_id, payload)
with _interest_points_lock:
bucket = _interest_points_cache[rover_id]
# 只检查最近 5 个点
for last in reversed(bucket[-5:]):
if (
abs(float(last.get("x", 0.0)) - point["x"]) < 1e-4
and abs(float(last.get("y", 0.0)) - point["y"]) < 1e-4
and str(last.get("label") or "") == point["label"]
):
return last # 重复,返回已有条目
bucket.append(point)

三个条件全满足才判重:x 坐标差小于 0.0001、y 坐标差小于 0.0001、label 相同。0.0001 在地图坐标系里大约是 0.1 毫米——足够区分两个不同位置,又不会因为浮点精度误判。

只检查最近 5 个而不是全部,是因为兴趣点有时间顺序,很早之前的点不可能和当前点重复,遍历全部纯属浪费。

任务协调器:收拾烂摊子

小车任务有五种状态:pending → dispatched → running → completed/failed。现实比状态机复杂得多:

  • 小车断线了,任务卡在 running
  • 小车重启了,任务卡在 dispatched
  • 新任务建图,但上一个建图任务还没标记完成

解法:一个 30 秒执行一次的协调器 _periodic_rover_reconciliation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def _periodic_rover_reconciliation(interval_sec=30):
while True:
rovers = Rover.select()
for rover in rovers:
if rover.current_status in ("idle", "offline"):
# 检查是否有卡住的任务
active_tasks = RoverTask.select().where(
RoverTask.rover == rover,
RoverTask.task_status.in_(["pending", "dispatched", "running"])
)
for task in active_tasks:
if task.task_status == "dispatched" and _task_age(task) > 60:
_mark_task_auto_failed(task, reason="调度超时")
elif task.task_status == "running" and _task_age(task) > 60:
_mark_task_auto_failed(task, reason="执行超时,小车已离线")
await asyncio.sleep(interval_sec)

任务自动失败后,前端 SSE 推送一条 refresh 事件,前端重新拉取任务列表,用户看到的是任务从”运行中”变成”失败(超时)“,而不是永远卡在那里转圈。

配置版本管理:YAML 导入导出 + 回滚

小车的运行参数(速度、相机参数、建图分辨率等)是个嵌套 JSON。需求要求:支持版本历史、模板、YAML 导入导出、回滚。

版本表设计:

1
2
3
4
5
class RoverRuntimeConfigVersion(BaseModel):
rover = ForeignKeyField(Rover)
version_no = IntegerField() # 递增版本号
config_data = TextField() # JSON 快照
created_at = DateTimeField()

回滚操作其实就是”创建一个新版本,内容是旧版本的快照”:

1
2
3
4
5
6
7
8
def rollback_config(rover_id, target_version):
target = RoverRuntimeConfigVersion.get(rover=rover_id, version_no=target_version)
current_max = max_version(rover_id)
new_version = current_max + 1
RoverRuntimeConfigVersion.create(
rover=rover_id, version_no=new_version,
config_data=target.config_data # 复制旧版本内容
)

不做物理回退(DELETE + UPDATE),只做逻辑追加。这样版本历史永远不会断,审计也方便。

YAML 导入导出是个意外的坑:小车的配置字段命名风格是 ROS 的 snake_case,但前端要 camelCase。导出时做了一次转换,导入时又转回来。中间出了个 bug,一个布尔值 "true" 字符串没被正确转回 True,导致小车把 true 当成文件路径去读了……后来加了个深度遍历的类型修复函数才搞定。

视频流:帧缓存 + WebSocket 广播

视频流没走 SSE,走了 WebSocket + JPEG 帧的方式。小车端每帧发送一个 base64 编码的 JPEG,后端缓存最新一帧,同时广播给所有订阅的前端 WebSocket 连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
_frame_cache: Dict[int, bytes] = {}          # rover_id → 最新 JPEG bytes
_stream_subscribers: Dict[int, list] = defaultdict(list) # rover_id → [asyncio.Queue]

@router.websocket("/ws/rover/{rover_id}/stream")
async def rover_stream_ws(websocket: WebSocket, rover_id: int):
await websocket.accept()
queue = asyncio.Queue(maxsize=8)
_stream_subscribers[rover_id].append(queue)
try:
while True:
frame = await asyncio.wait_for(queue.get(), timeout=30)
if frame is None: break
await websocket.send_bytes(frame)
except WebSocketDisconnect:
pass
finally:
_stream_subscribers[rover_id].remove(queue)

为什么用 JPEG 帧而不是 RTSP/HLS?因为果园里的网络经常只有 2G 信号,RTSP 需要持续连接,断线重连很麻烦。JPEG 帧模式天然容错——丢一帧不影响下一帧,延迟也更可控。后端还配置了 mediamtx 作为 RTSP 中转(给延迟要求不高的监控场景用),两种模式共存。

写在最后

巡检车后端最花精力的不是单个功能的实现,而是状态一致性。小车随时可能断线、重连、换 IP,后端要能正确处理所有中间状态。上面那些协调器、去重、缓存清理,本质上都是在处理”分布式系统中的最终一致性”问题——只不过这个分布式系统只有两台机器(后端和小车),网络还特别烂。

经验总结就一条:永远不要假设小车还活着。每个操作都要考虑”如果小车下一秒就断线了,这个状态还能不能正确收敛?”

项目做到中期,设备数据在 DeviceManager 里进进出出,自动化引擎要监听变化,前端要实时推送,InfluxDB 要存历史——所有人都在直接读 DeviceManager 的字典,耦合得一塌糊涂。

有天我翻 Home Assistant 的源码,发现它有个很优雅的抽象:一切皆实体。温湿度传感器是实体,继电器开关是实体,连”今天的日出时间”都是实体。上层逻辑只认实体 ID 和状态值,不关心底层是 Modbus 还是 MQTT。

我想了想,这个东西我得有一个。

四个单例撑起的骨架

实体系统拆成四个核心组件,全部单例:

  • StateMachine:存所有实体的当前状态,是整个系统的单一真相源
  • EventBus:事件发布/订阅,状态变了就广播
  • EntityRegistry:实体注册表,管生命周期和索引
  • ServiceRegistry:服务注册表,turn_onturn_off 这类动作走这里调

数据流长这样:

1
2
3
4
5
驱动层读到数据 → DeviceManager → StateMachine.set() → EventBus.fire(STATE_CHANGED)

┌────────┼────────┐
↓ ↓ ↓
自动化引擎 SSE推送 InfluxDB存储

所有消费者通过 EventBus 订阅,谁也不直接调谁。

StateMachine:那个 NaN 陷阱

StateMachine 的核心就一个 set() 方法——设状态、判变化、发事件。但”判变化”这三个字,坑比想象的深。

1
2
3
4
5
6
7
8
9
10
11
12
13
def _states_equal(a, b):
# 坑1:float('nan') != float('nan') 是 True
# 但语义上,两个 NaN 应该视为"相等"
if isinstance(a, float) and isinstance(b, float):
if math.isnan(a) and math.isnan(b):
return True

# 坑2:True == 1 是 True(Python 的"贴心"设计)
# 但开关状态 True 和数值 1 完全是两码事
if type(a) is bool or type(b) is bool:
return type(a) is type(b) and a == b

return a == b

第一个坑:Modbus 传感器偶尔返回 NaN(比如断线时寄存器值是 0x7FFF)。Python 里 NaN != NaN 返回 True,如果直接拿 == 判,系统会认为状态每秒都在变——事件风暴,前端疯狂刷新,InfluxDB 疯狂写入。

第二个坑:继电器状态是 True/False,某个寄存器原始值恰好是 1/0。Python 觉得 True == 1,但语义上一个表示”开”,一个表示”1 摄氏度”。如果判等通过了,状态变化事件就不触发,前端就卡在那儿不变。

set() 方法里还有个细节——状态没变时不发事件,只更新 last_reported 时间戳。这避免了轮询场景下每秒重复广播相同值的问题:

1
2
3
4
5
6
7
state_changed = old_state is None or not _states_equal(old_state.state, new_state)
attributes_changed = old_state is None or old_state.attributes.to_dict() != new_attributes.to_dict()

if not state_changed and not attributes_changed and not force_update:
old_state.last_reported = now
self._event_bus.fire(EventType.STATE_REPORTED, ...) # 轻量级事件
return old_state

STATE_REPORTEDSTATE_CHANGED 是两个事件。前端可以只订阅 STATE_CHANGED(值变了才更新 UI),InfluxDB 可以订阅 STATE_REPORTED(每次轮询都记录,不管变没变)。

EventBus:线程池分发,别让慢监听器拖死发布者

EventBus 的 fire() 方法在触发监听器时,不是直接调,而是扔进线程池:

1
2
3
4
5
6
7
8
9
10
11
12
class EventBus:
def __init__(self):
self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="event_bus_")

def _call_listeners(self, event):
with self._lock:
listeners = list(self._listeners.get(event_type, []))
once_listeners = list(self._once_listeners.get(event_type, []))
all_listeners = list(self._all_listeners)

for listener in listeners + once_listeners + all_listeners:
self._executor.submit(self._safe_call_listener, listener, event)

为什么?因为有个血泪教训:早期 InfluxDB 写入卡了 3 秒,直接把 StateMachine 的 set() 堵住,导致所有设备的轮询数据排着队进不来,整个系统假死。

扔进线程池之后,InfluxDB 写入再慢也不影响状态更新和事件广播。监听器里抛异常也不怕,_safe_call_listener 会兜住。

另外支持三种监听模式:

  • listen(event_type, fn):常规订阅
  • listen_once(event_type, fn):触发一次自动注销(启动时等 SYSTEM_STARTED 事件特别好用)
  • listen_all(fn):所有事件都收(调试和审计用)

还有个 100 条的事件历史缓冲,出问题时可以回查最后 100 条事件。

EntityRegistry:三层索引

实体注册表不只是个字典,它维护了三层索引:

1
2
3
4
self._entries: Dict[str, EntityEntry] = {}           # entity_id → 条目
self._unique_id_index: Dict[tuple, str] = {} # (platform, unique_id) → entity_id
self._device_entities: Dict[int, List[str]] = {} # device_id → [entity_id]
self._platform_entities: Dict[str, List[str]] = {} # platform → [entity_id]
  • unique_id_index:同一设备重新注册时,通过 (platform, unique_id) 找到已有条目,更新而不是重复创建
  • device_entities:设备下线时一键清理所有关联实体
  • platform_entities:卸载某个协议驱动时,清理该平台的所有实体

ServiceRegistry:bridge 兜底写入

服务注册表内置了 turn_onturn_offtoggle 三个通用服务。但有个问题:有些实体只在 StateMachine 里有状态(从数据库预加载的),没有在 EntityRegistry 里注册对应的 Entity 实例——也就没有 turn_on() 方法。

怎么办?两层写入策略

1
2
3
4
5
6
7
8
9
10
11
def _handle_generic_turn_on(self, call):
for entity_id in entity_ids:
entity = self._entity_registry.get_entity(entity_id)
if entity and hasattr(entity, "turn_on"):
entity.turn_on(**call.data) # 第一层:实体自己的方法
elif self._is_writable_via_bridge(entity_id):
# 第二层:bridge 兜底写设备
# EntitySystemBridge 监听了 CALL_SERVICE 事件,
# 从状态属性中提取 device_id 和 entity_db_id,
# 直接调 DeviceManager.set_entity_state()
results.append({"entity_id": entity_id, "success": True})

_is_writable_via_bridge 会检查状态机里该实体的 read_only 属性。不是只读的,就交给 bridge 的 CALL_SERVICE 事件监听器去兜底写。

这套双路径的设计,让新实体系统和旧 DeviceManager 可以和平共处。新注册的实体走 ServiceRegistry 的标准路径,老数据走 bridge 兜底,谁也不碍着谁。

Bridge:连接新旧世界的胶水

EntitySystemBridge 是整个实体系统最难写的部分——它要把新系统的事件翻译成旧系统听得懂的语言。

1
2
3
4
5
6
7
8
9
10
class EntitySystemBridge:
def attach(self, device_manager):
# 新 → 旧:状态变化触发自动化引擎
self._event_bus.listen(EventType.STATE_CHANGED, self._on_state_changed)

# 新 → 旧:服务调用转发到设备驱动
self._event_bus.listen(EventType.CALL_SERVICE, self._on_call_service)

# 旧 → 新:设备数据同步到实体状态
# DeviceManager 调 bridge.sync_device_to_entities()

_on_state_changed 里有个关键操作:从实体 ID device_3_temperature 中解析出 device_id=3,再从状态属性的 extra 字段取 entity_db_id,最后调用 automation_engine.on_entity_changed()。这样自动化引擎根本不需要知道实体系统的存在。

启动预热:InfluxDB 回填

系统重启后,实体状态全是 None,前端一看满屏空白——这体验太差了。

解法分两步:

  1. 从 SQLite 加载所有实体定义,在 StateMachine 中创建初始状态(值为 None
  2. 从 InfluxDB 查最近 24 小时的数据,把最新值填回去
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def _fill_initial_values_from_influxdb(state_machine, context, influx_service=None):
all_states = state_machine.get_all()
device_entities = {} # device_id → [(entity_id, entity_db_id)]
for entity_id, state in all_states.items():
if state.state is not None:
continue # 已有值,跳过
device_id = state.attributes.extra.get("device_id")
entity_db_id = state.attributes.extra.get("entity_db_id")
if device_id and entity_db_id:
device_entities.setdefault(device_id, []).append((entity_id, str(entity_db_id)))

for device_id, entities in device_entities.items():
raw_data = influx.query_device_data(device_id=device_id, start_time=start_time, limit=len(entities) * 2)
for entity_id, entity_db_id_str in entities:
records = raw_data.get(entity_db_id_str)
if records:
_, latest_value = records[0]
state_machine.set(entity_id=entity_id, new_state=latest_value, ...)

重启后前端立刻能看到上次的值,等新数据轮询进来再自然更新。用户感知不到重启。

回过头看

这个实体系统花了大概三周,但省下来的调试时间远不止三周。之前出了问题要翻 DeviceManager、自动化引擎、SSE 推送三个地方找数据流,现在顺着 EventBus 的事件链一路跟下去就行。

最大的教训是那个 True == 1 的坑——找了两天才定位到。Python 的动态类型在 IoT 场景里真是个定时炸弹,传感器值和布尔状态在底层都是数字,但语义完全不同。类型严格一点,bug 少一堆。

事情是这样的:现场一台 Modbus 温湿度传感器,地址 0~5 六个寄存器,一帧 read_holding_registers 全读完,多舒服。

结果隔壁厂家塞进来个奇葩从机,地址 0 和地址 100 之间隔了一百个空位,批量读直接超时——更绝的是,同一串口上还挂着另外三台正常的设备,那台超时一卡就是好几秒,别的设备跟着一块儿断线。

那之后我花了一周重写了 Modbus 驱动的读取逻辑,从”一刀切批量”变成了”自适应批量+优雅降级”。这篇把这几个机制拆开讲讲。

先说问题:批量读取不是万能药

Modbus RTU 的 Read Holding Registers 最多一次读 125 个寄存器,看起来很爽。但现实比协议文档残酷:

  1. 地址不连续:有的设备寄存器散落在 0、100、200,中间全是空洞,读 200 个地址只用了 3 个
  2. 部分从机不支持批量:便宜的表只认单寄存器读取,批量请求直接返回异常码
  3. 串口是共享的:同一串口多个从机,一台设备卡住,别的全得等
  4. 写入被读取挡道:用户点”开灯”,后台还在慢悠悠轮询二十组寄存器,等拿到锁黄花菜都凉了

所以核心矛盾就一个:怎么在”尽量批量”和”随时能降级”之间找平衡?

智能寄存器分组

第一步,把散落的寄存器按地址排序,能合并的合并,该拆开的拆开。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def _build_smart_groups(self, registers, read_actions, batchable_actions, device_id):
groups = []
sorted_regs = sorted(list(registers), key=lambda r: r.address)

for reg in sorted_regs:
action = _get_read_action(reg)
if not action:
continue

# 设备已经被标记为不支持批量?直接单寄存器
if action not in batchable_actions or not self._check_batch_support(device_id, action):
groups.append({"action": action, "start": reg.address, "count": reg.register_count, "registers": [reg], "single": True})
continue

# 尝试合并到上一组
if groups:
last = groups[-1]
if last["action"] == action and not last.get("single"):
gap = reg.address - (last["start"] + last["count"])
new_count = (reg.address + reg.register_count) - last["start"]
# 间隙 <= 10 且合并后不超过 125,就合并
if 0 <= gap <= self.MAX_ADDRESS_GAP and new_count <= self.MAX_BATCH_REGISTERS:
last["registers"].append(reg)
last["count"] = new_count
merged = True
# 合不了就开新组
if not merged:
groups.append({"action": action, "start": reg.address, "count": reg.register_count, "registers": [reg]})

三个关键参数:

  • MAX_BATCH_REGISTERS = 125:协议上限
  • MAX_ADDRESS_GAP = 10:地址间隙小于这个值就填充合并,省一次请求
  • BATCH_FAIL_THRESHOLD = 3:连续失败这么多次就降级

举个例子,寄存器地址是 0、1、2、5、6、100、101,分组结果就是:

1
2
组1: addr=0, count=7  (0~6 合并,gap=3 填充)
组2: addr=100, count=2 (100~101)

地址 5 和 6 之间没有 gap,0~2 和 5~6 之间 gap 是 2(地址 3、4 是空的),小于 10 所以合并。但 6 和 100 之间 gap 是 94,超了,必须拆开。

自适应降级:失败了就老实一个个读

分组策略再好,也挡不住有些从机就是不给面子。核心思路:先试试批量,失败就当场回退单寄存器,连续失败三次就记住”这家伙不支持批量”,下次直接单读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _check_batch_support(self, device_id, action):
with self._lock:
support = self._device_batch_support.get(device_id, {}).get(action)
return support is not False # None=未知,先试批量

def _mark_batch_fail(self, device_id, action):
with self._lock:
count = self._device_batch_fail_count.get(device_id, {}).get(action, 0) + 1
self._device_batch_fail_count[device_id][action] = count
if count >= self.BATCH_FAIL_THRESHOLD:
self._device_batch_support[device_id][action] = False
logger.warning(f"设备 {device_id} 批量读取({action})连续失败{count}次,已切换为单寄存器模式")
return True # 到达阈值
return False

读取逻辑里的关键分支:

1
2
3
4
5
6
7
8
9
10
result = execute_modbus_action(client, action=group["action"], address=group["start"], count=group["count"], slave=slave)

if result.isError():
# 批量失败,回退到单寄存器
threshold_reached = self._mark_batch_fail(device.id, group["action"])
fallback_data = self._read_single_registers(client, group["registers"], action, slave, device_name, port)
data.update(fallback_data)
else:
# 批量成功,重置失败计数
self._mark_batch_success(device.id, group["action"])

三态设计:None(未知,默认试批量)、True(确认支持)、False(确认不支持)。新设备进来默认 None,先试一次批量,成了就标 True,败了也只败一次——因为回退逻辑保证这一轮数据不会丢。

还有个细节:降级是按 action 类型的。有些设备读保持寄存器支持批量,但读输入寄存器不行。不能一刀切。

写优先:别让轮询挡了用户的指令

Modbus 串口是半双工的,读和写不能同时进行。我们系统里读取是周期轮询,写入是用户触发的。如果读取正在扫二十组寄存器,用户这时候点”开灯”,等读完才能写,体验就是——灯过了好几秒才亮。

解法:写请求打断读循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 读取循环中,每组之间检查
for group in groups:
# 检查有没有写请求在等
write_pending = self._port_write_pending.get(port)
if write_pending and write_pending.is_set():
break # 立即让出锁

io_lock.acquire(timeout=1)
try:
# ... 读取当前组 ...
finally:
io_lock.release() # 每组读完立即释放

# 释放后再检查一次
if write_pending and write_pending.is_set():
break

写入端:

1
2
3
4
5
6
7
8
9
10
11
12
13
def write(self, device, target_register, value):
# 通知读循环让路
write_pending.set()

# 等待 IO 锁,最坏情况 = 一组读取的超时
_write_lock_timeout = self.SERIAL_TIMEOUT * (self.MODBUS_RETRIES + 1) + 0.5
acquired = io_lock.acquire(timeout=_write_lock_timeout)

try:
_do_write(client)
finally:
write_pending.clear() # 写完清除标记
io_lock.release()

关键点:

  1. 读取是逐组获取/释放锁的,不是一把锁锁到底
  2. write_pending 是个 Event,设置后读循环会在组间检查到并主动退出
  3. 写操作的超时上限是 SERIAL_TIMEOUT × (retries+1) + 0.5s,也就是最坏等一组读取完成

用户体验从”等 5 秒”变成了”最多等 0.5 秒”。

串口引用计数:多设备共用一个口

Modbus RTU 的物理现实:一个 RS485 串口可以挂 247 个从机。所以同一个 /dev/ttyUSB0 上可能连着温湿度、继电器、电表,它们共享同一个 ModbusSerialClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def connect(self, device):
port = cached_config.modbus_config.serial
if port in self._clients:
# 串口已打开,直接复用
self._port_ref_count[port] += 1
return True

# 新建客户端
client = ModbusSerialClient(port=port, ...)
if client.connect():
self._clients[port] = client
self._port_ref_count[port] = 1

def disconnect(self, device):
self._port_ref_count[port] -= 1
if self._port_ref_count[port] <= 0:
self._clients[port].close() # 最后一个设备断开才关串口

听起来简单,但配合 IO 锁就有个坑:同一串口上的设备轮询是串行的(Modbus 协议要求),但不同串口的设备可以并行。所以 IO 锁是按端口的,不是全局的。

写去抖:别让继电器打电报

前端有时候手抖,按钮连点两下;或者自动化规则快速重复触发。对于继电器来说,0.5 秒内同一寄存器写同一个值,完全是浪费——更糟的是,有些继电器会真的吸合-断开-再吸合,发出”哒哒哒”的声音。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
WRITE_DEBOUNCE_TIME = 0.5  # 秒

def write(self, device, target_register, value):
write_key = (device.id, target_register.address)
value_hash = hash(str(value))
current_time = time.time()

with self._lock:
if write_key in self._write_debounce_cache:
last_time, last_value_hash = self._write_debounce_cache[write_key]
if current_time - last_time < self.WRITE_DEBOUNCE_TIME and value_hash == last_value_hash:
return True # 重复请求,静默吸收

self._write_debounce_cache[write_key] = (current_time, value_hash)

只去重”同值”的写请求,不同值还是要正常下发。0.5 秒内同一个继电器先写 ON 再写 OFF?那得正常执行,不能吞掉。

离线检测与写复活

设备连续读取失败 5 次,标记为离线。但有意思的是——写成功一次就复活

为什么?因为读取失败可能是寄存器地址不对、从机只支持写不支持读,或者只是响应慢。但如果写命令成功返回了,说明从机在线、通信正常、地址也没错。这时候再把读失败计数清零,让下一轮轮询重新尝试读取,比傻等靠谱得多。

1
2
3
4
5
6
7
if success:
with self._lock:
old_fail = self._device_read_fail_count.get(device.id, 0)
if old_fail > 0:
self._device_read_fail_count[device.id] = 0
self._device_timeout_count[device.id] = 0
logger.info(f"Modbus设备 {device.name} 写入成功,重置离线状态(原连续读取失败 {old_fail} 次)")

几个调参建议

跑了几个月下来,这几个参数比较关键:

参数默认值调参思路
SERIAL_TIMEOUT0.3s9600 波特率下偏紧,115200 可以再压
DEVICE_READ_TIMEOUT15s单设备总超时,超了跳过剩余组
MAX_ADDRESS_GAP10间隙里读到的垃圾值会被丢弃,10 是个安全值
BATCH_FAIL_THRESHOLD3太小容易误判,太大降级太慢
WRITE_DEBOUNCE_TIME0.5s继电器场景够用,电机控制可能要更长

这套机制跑下来,最直观的感受是:现场那种”时好时坏”的从机再也不用人工切模式了。好的时候批量读,快;坏的时候自动降级,稳;后来自己又恢复了,下次重启又会重新试批量——因为状态只在内存里,不落盘。

说白了,工业现场你得假设每个设备都可能不守规矩,然后让系统自己去适应它。

游戏自动化项目里有个绕不开的东西:主数据库

游戏里每张卡的数值、每个技能的效果、每个事件的触发条件、每个 NPC 的对话——这些数据都不是写死在客户端代码里的,而是放在一个外部数据库里,客户端运行时去读。这样运营才能不发新版本就调整数值、加新卡。

社区里通常有大佬把这个数据库导出成 JSON / Protobuf / Yaml 给大家用。我们项目里也是依赖外部仓库(gakumasu-diff)提供的导出文件。

问题来了——这个数据库的表结构成百上千,字段也成百上千。每张表你都得在 Python 侧定义一个对应的 dataclass,才能写代码时有类型提示、IDE 自动补全、避免拼错字段名。

手写 dataclass?

1
2
3
表数:~400
平均字段数:15
总字段:~6000

手写一遍要写到天荒地老,且每次游戏更新还得追着改。

正确做法是代码生成——根据导出数据自己推断 schema,自动产出 Python dataclass 文件。我们就这么干的。这篇说说怎么落地。

思路:从数据反推 schema

游戏数据库导出后大致长这样(简化版):

1
2
3
4
5
6
7
8
9
10
11
12
{
"ProduceCard": [
{"id": "card_001", "name": "集中+3", "cost": 4, "effect_id": "buff_focus", "rarity": "R"},
{"id": "card_002", "name": "快攻", "cost": 6, "effect_id": "attack_basic", "rarity": "SR"},
...
],
"ProduceItem": [
{"id": "item_001", "name": "营养剂", "stamina_recovery": 30},
...
],
...
}

每张表(key)对应一个 list,list 里每个元素结构基本一致。

要从这堆数据反推出 dataclass,做两件事:

  1. 遍历 list 的所有元素,收集所有字段名 + 推断每个字段的类型
  2. 把表名 + 字段信息渲染成 Python 代码文件

听起来简单。真做下来,类型推断是难点。

类型推断的几个层次

最朴素的版本:

1
2
3
4
5
6
7
8
9
10
11
12
def infer_type(values):
sample = values[0]
if isinstance(sample, bool):
return "bool"
elif isinstance(sample, int):
return "int"
elif isinstance(sample, float):
return "float"
elif isinstance(sample, str):
return "str"
else:
return "Any"

第一版就这么写的。然后就被打脸——

问题一:只看第一条样本。某个字段在第一条是 null,于是被推断为 NoneType。其他几百条是 int,全无视。

问题二:bool 在 Python 里是 int 的子类。isinstance(True, int) 返回 True。判断顺序错了就把 bool 当 int。

问题三:null 值很常见。同一个字段,有的条目有值有的没有,应该是 Optional[X],不是 X

问题四:嵌套结构。某个字段是个 dict,得递归推断。

问题五:list 字段。[1, 2, 3]list[int]["a", "b"]list[str],空 list 是 list[Any]

第二版的推断函数变成这样(伪代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def infer_type(values: list) -> str:
non_null = [v for v in values if v is not None]
has_null = len(non_null) < len(values)

if not non_null:
return "None"

# 顺序很重要:bool 必须先判,否则会被 int 吞
types = set()
for v in non_null:
if isinstance(v, bool):
types.add("bool")
elif isinstance(v, int):
types.add("int")
elif isinstance(v, float):
types.add("float")
elif isinstance(v, str):
types.add("str")
elif isinstance(v, list):
inner = infer_type(v) if v else "Any"
types.add(f"list[{inner}]")
elif isinstance(v, dict):
types.add("dict[str, Any]") # 真要严,递归生成嵌套 dataclass
else:
types.add("Any")

# 数值类型混合:int + float 算 float
if types == {"int", "float"}:
result = "float"
elif len(types) == 1:
result = types.pop()
else:
result = "Union[" + ", ".join(sorted(types)) + "]"

if has_null:
result = f"Optional[{result}]"

return result

类型推断的核心思想是”扫全样本、合并所有可能、null 单独标记”。这套逻辑做完,绝大多数字段都能推出靠谱类型。

渲染成 Python 代码

有了表名和字段类型,剩下的就是模板渲染。Jinja2 就够用:

1
2
3
4
5
6
7
8
9
# {{ table_name }}.py - AUTO GENERATED, DO NOT EDIT
from dataclasses import dataclass
from typing import Optional, Union, Any

@dataclass(frozen=True)
class {{ class_name }}:
{% for field_name, field_type in fields.items() %}
{{ field_name }}: {{ field_type }}
{% endfor %}

输出文件一定要带醒目的 “AUTO GENERATED, DO NOT EDIT” 注释——不然后面肯定有人在生成文件里手改,再次生成的时候改动全没了,事故的标准剧本。

frozen=True 是个好习惯——主数据是只读的,dataclass 也不允许修改。哪天发现某段代码在改主数据 dataclass,那基本是设计错了。

几个生成器的设计点

1. 输出文件按表名拆分

不要把几百个 dataclass 塞一个大文件。一表一个文件:

1
2
3
4
5
6
src/entity/master_db/
├── __init__.py
├── produce_card.py
├── produce_item.py
├── support_card.py
└── ...

好处:

  • IDE 跳转友好(光标点 ProduceCard 直接跳到那个文件)
  • diff 友好(每次生成只动有变化的表,git diff 干净)
  • import 友好(要哪个表导哪个)

2. 提供一个聚合入口

按表拆分之后,调用方不希望写一堆 import。所以再生成个 __init__.py

1
2
3
4
5
6
7
# __init__.py - AUTO GENERATED
from .produce_card import ProduceCard
from .produce_item import ProduceItem
from .support_card import SupportCard
...

__all__ = ["ProduceCard", "ProduceItem", "SupportCard", ...]

业务代码就能:

1
from src.entity.master_db import ProduceCard

3. 处理新增/删除字段

游戏每次更新可能:

  • 新增字段
  • 删除字段
  • 修改字段类型(罕见但发生过)

直接重新生成会覆盖旧的 dataclass,调用方代码可能突然挂掉——某个被删的字段,调用方还在引用。

解决思路是生成时检测变化,把变化报告给开发者

1
2
3
4
5
[Schema Diff Report]
ProduceCard:
+ new_field: int (新增)
- old_field (删除!调用方需要更新)
~ rarity: str -> int (类型变化!调用方需要更新)

打印一份变化报告,开发者一眼看到”哦这次更新动了什么”,提前修业务代码。CI 里把这份 diff 作为 review 内容。

4. 不要让生成器决定文件名命名风格

外部数据库里的表名可能是 ProduceCard(PascalCase)、produce_card(snake_case)、甚至 Produce.Card(带点)。

生成器要做的事是归一化命名

  • 表名 → 文件名:produce_card.py(snake_case)
  • 表名 → 类名:ProduceCard(PascalCase)
  • 字段名 → Python 属性:保持 snake_case,特殊字符替换

命名转换函数写干净,集中处理,不要在 Jinja 模板里搞字符串处理——模板里写一堆字符串操作的代码后面没人能 review。

一个意外好处:用生成器做契约检查

生成器跑一遍就能检测出数据本身的异常

  • 某个字段在 99% 的条目里都是 int,但 1% 的条目突然是 str → 数据脏了?
  • 某张表的字段集合两次生成之间剧烈变化 → 上游导出脚本是不是出了问题?

我们的做法是生成器本身会做几种校验,发现异常打 warning。这等于免费的”主数据健康检查”——光是用着用着,就能帮你发现数据源的问题。

跑起来什么样

最终用户调用就一行命令:

1
python devtools/generate_game_database_schemas.py

内部干了这么些事:

  1. 读取最新的主数据 JSON
  2. 遍历每张表,做类型推断
  3. 和上次生成的 schema diff 对比,报告变化
  4. 渲染 Python 文件
  5. 更新 __init__.py 聚合
  6. 自动跑 ruff format 美化输出

一次跑下来几秒钟。对比手写 dataclass 维护几百个表的工作量,这工具早就把开发成本赚回来无数次了

几条心得

写完这套之后总结:

1. 生成的代码也是代码——要 commit 进仓库,不要在运行时生成。

跑代码生成会拉 master 数据、跑推断、写文件,这些都不该在生产运行时发生。生成阶段一次性做完,把产物 commit 进 git,运行时只读。

2. 生成器本身要有测试。

生成器看似”工具脚本”,但它的输出影响整个项目的类型系统。测试要覆盖:类型推断准确性、命名转换正确性、edge case(空 list、全 null 字段)。这些不测,哪天生成出来一坨坏代码全靠运气没崩。

3. 给生成的文件加显眼的”不要改”标记。

文件头加大字:

1
2
# THIS FILE IS AUTO GENERATED, DO NOT EDIT MANUALLY.
# Run `python devtools/generate_game_database_schemas.py` to regenerate.

文件名前缀加 _generated_ 也行——总之要让人一眼看出”这不是手写的”。

4. 别试图 100% 推断完美。

类型推断永远会有边界 case。允许某些字段最终落到 Any,标个 TODO 注释,让开发者后续手动 override(通过单独的 patch 文件)。强求 100% 自动推断会让生成器复杂度爆炸。

收个尾

代码生成这个套路很老了,但在数据驱动的项目里永远值得用一次

几百张表、几千字段,手写不仅累、还出错率高、还跟不上游戏更新节奏。换成”读数据 → 推断 → 渲染”的小流水线,把人力释放出来去做真正需要思考的事——比如这些数据该怎么组合、能解锁什么决策能力

工程师的时间应该用来做机器做不到的事。能自动生成的东西,别手写。

CLIP 这两年在视觉任务里成了瑞士军刀——图搜图、零样本分类、跨模态检索,啥都能搭一手。但 OpenAI 那版 CLIP 模型偏大,跑在桌面用户的设备上有点吃力。

后来 Apple 出了 MobileCLIP,小、快、精度还不错。第二代 MobileCLIP2 更新了下,更香。

问题是——MobileCLIP2 官方代码基于 PyTorch,没现成的 ONNX 文件。我们的项目要跑在用户的桌面(Windows/macOS/Linux 全要支持),PyTorch 那一坨依赖装起来用户得疯——光 torch 包就上 G。ONNX Runtime 加一个几十 MB 的模型文件就完事,对比鲜明。

所以得自己导出。听起来一行 torch.onnx.export 的事,实际坑不少。

为什么不一行搞定

最朴素的版本:

1
2
3
4
5
6
7
import torch
from mobileclip import create_model

model = create_model("mobileclip_s2")
model.eval()
dummy = torch.randn(1, 3, 256, 256)
torch.onnx.export(model, dummy, "mobileclip2.onnx", opset_version=17)

跑一下,立马得到一堆错误。常见的几类:

  • 某个算子在 opset_version 不支持——升级 opset,或者绕过去
  • 动态 shape 没声明——batch 维度希望可变,得显式标 dynamic_axes
  • forward 返回的不是 tensor——是个 dict / namedtuple,ONNX 不认
  • 训练时和推理时分支不同——某个 if self.training 路径里 ONNX 跟踪出错

每个都得单独治。

坑一:模型 forward 输出结构

MobileCLIP 这种双塔模型,forward 一般同时返回 image embedding 和 text embedding。结构大概是这样:

1
2
3
4
5
def forward(self, image, text):
image_features = self.encode_image(image)
text_features = self.encode_text(text)
logit_scale = self.logit_scale.exp()
return image_features, text_features, logit_scale

但我们的实际使用场景是只要 image encoder——用 CLIP 给图片编码进向量库,文本侧根本不参与。强行导出整模型,多带了一倍多的参数,纯浪费。

正确做法是只导出 visual encoder 子图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class VisualOnly(torch.nn.Module):
def __init__(self, base_model):
super().__init__()
self.encoder = base_model.image_encoder
# 如果有 projection 层,也带上
self.projection = base_model.image_projection

def forward(self, image):
feat = self.encoder(image)
feat = self.projection(feat)
return feat # 单个 tensor,ONNX 友好

visual = VisualOnly(model)
visual.eval()
torch.onnx.export(visual, dummy, "mobileclip2_visual.onnx", ...)

我们项目里 devtools/export_mobileclip2.py 就是这么干的。模型体积砍掉一半多,推理时也省去了文本侧那一坨无用计算。

坑二:动态 batch 维度

桌面应用里的图像识别,一次推理几张图是不固定的。一会儿单张(实时识别),一会儿几十张(批量学习一组新卡)。

如果导出时 batch 维度被固定成 1,运行时遇到批量推理就只能循环单张,性能拉胯。

声明动态维度的写法:

1
2
3
4
5
6
7
8
9
10
torch.onnx.export(
visual, dummy, "mobileclip2_visual.onnx",
opset_version=17,
input_names=["image"],
output_names=["features"],
dynamic_axes={
"image": {0: "batch"}, # 第 0 维(batch)允许变
"features": {0: "batch"},
},
)

不写 dynamic_axes,ONNX 默认把 dummy 输入的 shape 完全固化。运行时给个 batch=8 进去,直接报错——这是新手最常见的坑。

坑三:opset_version 怎么选

这个版本号说白了就是”ONNX 协议版本”。版本越高支持的算子越多、越新,但 ONNX Runtime 也得跟得上。

实战经验:

  • 太低(比如 opset 9)——很多现代模型用的算子(比如某些 attention 实现、Einsum、新版的归一化)不支持,导出报错
  • 太高(比如 opset 20+)——ONNX Runtime 部分版本不认,模型加载就挂

我们的稳妥选择是 opset 17——这是个甜蜜点:支持绝大多数视觉模型的算子,ORT 几乎所有近期版本都能加载。

要是模型里有 Flash Attention 之类的新东西,可能要到 opset 18+。出问题先把 opset 降一档试试。

坑四:导出后的精度漂移

模型导出后跑一下,结果跟 PyTorch 原模型不完全一致——这几乎是必然的。差异来源:

  • 算子实现差异——同一个 LayerNorm,PyTorch 和 ONNX Runtime 内部实现的数值路径略有差异
  • fp32/fp16 转换——如果你导出时做了量化,差异更大
  • 预处理对齐——图像归一化的 mean/std 写错一位就完蛋

差异大概多少算正常?

  • 小数值差异(每个分量差 1e-4 ~ 1e-3):正常,业务上感知不到
  • 明显差异(差 0.01+):不正常,去查实现差异
  • 方向差异(embedding 相似度算出来差异巨大):肯定哪里错了,回去捋一遍

验证方法很简单——挑十几张图,分别用 PyTorch 和 ONNX 推理一遍,算两边 embedding 的余弦相似度。正常应该 ≥ 0.999。如果只有 0.99 甚至更低,赶紧排查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import numpy as np
import torch
import onnxruntime as ort

torch_model.eval()
session = ort.InferenceSession("mobileclip2_visual.onnx")

for i, img in enumerate(test_images):
with torch.no_grad():
f_torch = torch_model(img).numpy()
f_onnx = session.run(["features"], {"image": img.numpy()})[0]

cos = (f_torch * f_onnx).sum() / (np.linalg.norm(f_torch) * np.linalg.norm(f_onnx))
print(f"img {i}: cos={cos:.6f}")

坑五:预处理对齐

CLIP 类模型对预处理特别敏感。OpenAI CLIP 的官方预处理是:

1
BGR → RGB → resize → center crop → ToTensor → Normalize(CLIP_MEAN, CLIP_STD)

MobileCLIP 用的是 ImageNet 标准 mean/std:

1
2
MEAN = [0.485, 0.456, 0.406]
STD = [0.229, 0.224, 0.225]

导出 ONNX 时一定要确认 mean/std 用对。预处理用错了 mean/std,模型推理出来的 embedding 直接乱套——和 PyTorch 端的差异会非常大。

我们的策略是把预处理写在 Python 侧而非 ONNX 内部——ONNX 模型只接受标准化后的 tensor,预处理由 OpenCV 完成:

1
2
3
4
5
6
7
8
def preprocess(img_bgr: np.ndarray) -> np.ndarray:
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
img_rgb = letterbox_resize(img_rgb, 256)
img_rgb = center_crop(img_rgb, 256)
img_f = img_rgb.astype(np.float32) / 255.0
img_f = (img_f - MEAN) / STD
img_f = img_f.transpose(2, 0, 1) # HWC → CHW
return img_f[np.newaxis, :] # 加 batch 维

为啥不把预处理也塞进 ONNX?两个原因:

  1. debug 难度——预处理在 Python 里改起来快,进了 ONNX 改一次重新导出一次,反复横跳痛苦
  2. 算子兼容——某些预处理操作(letterbox、bilinear interpolate 等)在不同 EP 上行为可能不一致

预处理放外面,ONNX 只管纯模型推理,逻辑清爽。

坑六:模型存档后还能再优化一刀

导出来的 ONNX 模型不是最终态。用 onnxsim 或 ORT 自带的优化器跑一遍,体积和速度都能再压一截:

1
2
3
4
5
6
7
import onnx
from onnxsim import simplify

model = onnx.load("mobileclip2_visual.onnx")
simplified, ok = simplify(model)
assert ok
onnx.save(simplified, "mobileclip2_visual.opt.onnx")

onnxsim 主要做常量折叠、算子融合、去除冗余节点。我们项目里实测能把模型从 60+ MB 压到 50 MB 左右,推理速度也快 5%~10%。

但有个前提:优化后必须重新跑一遍精度验证。优化器偶尔会引入数值差异(罕见但发生过),不验证就直接上线,等于裸奔。

实战推理代码

导出 + 优化完之后,业务里调用就很标准了:

1
2
3
4
5
6
7
8
9
10
11
class CLIPModelFromONNX:
def __init__(self, model_path: Path):
providers = self._select_providers() # CoreML / DirectML / CPU
self.session = ort.InferenceSession(str(model_path), providers=providers)

def encode(self, img_bgr: np.ndarray) -> np.ndarray:
tensor = preprocess(img_bgr)
feat = self.session.run(["features"], {"image": tensor})[0]
# L2 归一化,方便后续做余弦相似度
feat = feat / np.linalg.norm(feat, axis=1, keepdims=True)
return feat

得到的 feature 直接喂进项目里的”视觉记忆库”(另一篇专门聊了这块),就能玩各种相似度检索了。

收个尾

把 PyTorch 模型导出成 ONNX,这事儿写在博客里看着很简单——一行 export 命令搞定。

真正动手才知道,导出只是个起点。子图剥离、动态维度、精度验证、预处理对齐、优化器、版本兼容性,每一项都能让你卡几个小时。

但回报很值——一个能跑在用户桌面、不用装 PyTorch、占用几十 MB、推理几十毫秒的 CLIP,是把视觉智能塞进普通应用的关键。

下次想给项目加个图像理解能力,别一上来就 import torch,先想想这事儿能不能用一个 ONNX 模型搞定。多数时候能。

游戏自动化里 OCR 是个又爱又恨的东西。

爱的是它”啥都能读”——按钮文字、对话内容、分数、排名,丢进去给你识别个七七八八。 恨的是它。一张 1920×1080 的全屏截图扔给 OCR 引擎,CPU 单线程上要花 200~500ms。每次决策都跑一遍,UI 卡得跟 PPT 似的。

更糟的是——全屏 OCR 召回一堆你根本不关心的文字。游戏里到处都是漂浮的提示、装饰文字、状态栏,OCR 全识别出来,你再写正则去过滤,吃力不讨好。

正经的做法是先把”目标在哪儿”框出来,再对那个小区域做 OCR。一个 30×100 像素的区域,OCR 大概 10~20ms,差一个数量级。

问题是——目标在哪儿?

这就是这篇要聊的主角:HSV 颜色蒙版

为什么是 HSV,不是 RGB

游戏 UI 里很多元素颜色高度一致——分数的橙色字、排名的蓝色背景条、警告的红色边框。这些”颜色明确”的元素是天然的”信标”。

直接在 RGB 空间挑颜色阈值很难——光照、Alpha 混合、压缩都会让 R/G/B 三个分量同时漂移。比如同样一抹”游戏里的橙色”,在不同截图里可能是 (255, 165, 0)、(248, 158, 12)、(250, 170, 5) 这种小幅波动,写 RGB 范围特别难调。

HSV 空间就友好得多:

  • H(色相):颜色本身,最稳定的维度
  • S(饱和度):颜色鲜艳程度
  • V(明度):颜色明暗

游戏里的橙色 H 大概在 [10, 25] 这个区间,S 和 V 给一个宽范围(比如 [100, 255] 和 [100, 255])就能稳稳框住。不管亮一点暗一点,H 不太会跑出去。

转换一步到位:

1
2
3
4
import cv2

hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv, (10, 100, 100), (25, 255, 255))

mask 是个二值图——目标颜色的像素是白,其他是黑。

从蒙版到 ROI

光有蒙版还不够,你要的是”位置”,不是”颜色像素”。

接下来通常这几步:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 1. 形态学清理:去噪点、连通断裂区域
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)

# 2. 找轮廓
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

# 3. 过滤掉面积太小的(多半是噪点)
contours = [c for c in contours if cv2.contourArea(c) > 100]

# 4. 拿每个轮廓的外接矩形
rois = [cv2.boundingRect(c) for c in contours]

每个 roi 就是一个 (x, y, w, h)——目标颜色在画面里的一块区域。

接下来对每个 ROI 切一小块送 OCR:

1
2
3
for (x, y, w, h) in rois:
crop = img[y:y+h, x:x+w]
text = ocr_engine.infer(crop).text

一张全屏 OCR 200ms+,换成”HSV 蒙版定位 + 几个小区域 OCR”,总耗时通常压到 50ms 以内,而且只读你关心的内容,不混进噪音

一个具体例子

举个项目里的真实场景——竞技场结算后要读排名:

画面上排名的数字是橙色背景白字,整张画面到处都是其他文字(玩家昵称、分数、奖励列表)。全屏 OCR 会把所有文字一锅端,你还得从结果里猜哪个是排名。

我们的做法(简化版):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 步骤 1:HSV 蒙版抓"排名背景的橙色"
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv, ORANGE_LOWER, ORANGE_UPPER)

# 步骤 2:形态学清理
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, KERNEL)

# 步骤 3:取连通区域,按面积排序,最大的就是排名背景
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
contours = sorted(contours, key=cv2.contourArea, reverse=True)
if not contours:
return None # 没找到,可能不在这个画面
x, y, w, h = cv2.boundingRect(contours[0])

# 步骤 4:扩一圈,给 OCR 留点 padding
pad = 5
x, y, w, h = max(0, x-pad), max(0, y-pad), w+2*pad, h+2*pad

# 步骤 5:对这小块做 OCR
crop = img[y:y+h, x:x+w]
result = ocr_engine.infer(crop)
rank = parse_rank(result.text)

这套流程跑完十几毫秒,准确率比全屏 OCR 高得多——因为输入就只有”排名”那一块。

对应项目里的代码可以看 src/utils/contest_overlay_tools.py,思路一致,多了几层鲁棒性兜底。

几个常见坑

坑一:HSV 颜色范围调不准

老老实实拿真实截图调。OpenCV 自带个交互工具不算好用,最方便的还是写个小脚本——加滑动条,实时看蒙版效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
def nothing(x): pass

cv2.namedWindow("tuner")
cv2.createTrackbar("H_min", "tuner", 0, 179, nothing)
cv2.createTrackbar("H_max", "tuner", 179, 179, nothing)
# S/V 同理

while True:
h_min = cv2.getTrackbarPos("H_min", "tuner")
# ... 拿到所有阈值
mask = cv2.inRange(hsv, (h_min, s_min, v_min), (h_max, s_max, v_max))
cv2.imshow("mask", mask)
if cv2.waitKey(1) == 27: break

拖几下滑动条,立马看到颜色范围对不对。比闷头猜数字快十倍。

项目里有个 devtools/hsv_tools.py 就是这个用途,专门用来快速调阈值。

坑二:颜色范围要带样本验证

调出来一组阈值,绝对不能只在一张图上验证就上线

游戏里不同场景的同种颜色可能略有差异——白天关卡和夜晚关卡的”橙色按钮”,HSV 值能差出去十几度。同一种 UI 元素至少要在 10~20 张不同场景截图上验证,确保都能稳定召回。

我们的规矩是:新加一个 HSV 蒙版规则,必须附带至少 10 张样本测试,跑过才能合进主线。

坑三:颜色冲突

游戏里有时候多个不同 UI 元素用了相近的颜色。HSV 蒙版会同时召回所有——你以为找到了 A,结果是 B。

解决办法不止一个:

  • 加形状约束:A 是横长条,B 是方块——用宽高比过滤
  • 加位置约束:A 总在屏幕上半部分——直接对 y 范围加限制
  • 加面积约束:A 比 B 大很多——按面积区间过滤
  • 多颜色融合:A 的标志性边缘有蓝色,B 没有——两个蒙版做与运算

实战里通常至少两个约束叠加才稳,单一颜色蒙版很容易误识别。

坑四:截图压缩带来的颜色漂移

如果截图来源经过 JPG 压缩(比如某些投屏方案),颜色会有可见的偏移和锯齿。HSV 阈值得放宽,宁可多召回一些再过滤。

更稳的做法是在管线源头就避免 JPG——能拿 PNG / 原始 numpy 数组就别走 JPG。但有时候是底层方案限制,没得选,那就阈值放宽 + 后续约束严格。

什么时候还是该用全屏 OCR

不是所有场景都能 HSV 蒙版。有几类情况老老实实全屏 OCR:

  • 目标颜色没有显著特征——比如普通对话框的黑字白底,颜色不独特
  • 目标位置完全不可预测——比如某个事件提示可能出现在屏幕任意位置
  • 第一次见的新页面,还没分析过 UI 结构——先全屏 OCR 拿到所有文本,分析完再写局部规则

但这些场景应该是少数。主流程的每一步都该有”先框定区域再 OCR”的优化,留全屏 OCR 给真正必要的地方。

收个尾

写自动化项目时间长了你会发现,“性能优化”这个话题往往不是优化代码本身,而是减少不必要的工作量

OCR 跑全屏,单次 200ms;HSV 蒙版定位后跑局部,10ms。差 20 倍。

模型本身没变,参数也没调,就是”先用便宜的方法收窄范围”这一个小改动。

这种思路在视觉自动化里到处适用——能用 YOLO 框定的就别让 CLIP 全图比对;能用形状判断的就别让 OCR 读字;能用本地缓存的就别每次去查服务端。让昂贵操作只对必要数据生效,是这类项目最重要的工程嗅觉。

下次写到 ocr_engine.infer(full_screen_img) 之前,先问一下自己——能不能用 HSV / YOLO / 颜色统计先收一收?多半能。

让 Claude / Cursor / Copilot 帮你写代码这事儿,听起来很美。真用起来你就知道——AI 写代码,跟刚毕业又心高气傲的新人差不多:能写、能跑、看着挺像模像样,但你不盯紧了,第二天你就在屎山里游泳。

不是说 AI 不行——是它不知道这个项目的规矩。它没经历过你踩过的坑,没看过那个失败现场,不知道为什么”全屏 OCR”在这个项目里是禁忌。

所以我们项目里有一份 AGENTS.md,145 行,专门写给 AI 看的。这篇说说每一条背后的真实账。

为什么要写这种文件

最早不写。让 Cursor 在项目里自由发挥,结果:

  • 同样的工具方法,AI 重复造了三份,分散在三个文件
  • AI 喜欢用 getattr(obj, 'xxx', None)——可读性灾难
  • AI 写新功能时完全无视项目已有的命名规范、目录结构
  • 测试里硬编码本机绝对路径
  • 写到一半把已有的功能误删,理由是”看起来没用到”

每一条都得人肉去 review、回滚、改正。比自己写还累。

后来想通了——AI 不是不能写,是不知道边界。把边界写出来,明确告诉它”这种事不能做”“那种事必须做”,效果立马不一样。

AGENTS.md 就这么诞生了。

文件长什么样

挑几段贴出来。完整文件 145 行,分几个大块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
## 基本要求
* 所有回复必须使用简体中文。
* 代码注释必须使用中文。
* 所有方法必须带有文档注释和类型标注,按 Python 官方风格编写。
* 不理解需求、资料不足、样本不足、规则冲突时,必须停止并询问用户,禁止盲做。
* 不得伪造测试通过、删除失败样本、跳过失败用例或隐藏失败原因。

## 图像识别要求
* 禁止使用强模板匹配作为核心方案。
* 禁止写死绝对坐标。
* 禁止假定固定分辨率、固定截图尺寸或固定 UI 缩放。
* 识别逻辑应使用相对位置、检测框、轮廓、颜色、OCR、目标检测、多信号融合等方式。

## 代码风格
* 字符串匹配必须使用 src/utils/string_tools.py
* OpenCV 可复用方法在 src/utils/opencv_tools.py
* 容易造成性能问题的方法需使用 src/utils/performance_tools.py 装饰
* 游戏主数据库读取需使用 src/utils/game_database_tools.py
* 点击 Yolo_Box/元素尽量使用 device.click_element() 非自行判断的坐标不允许使用 device.click()
* 尽可能不使用全屏 OCR,效率过低,可以使用 HSV+蒙版提取参考点后再 OCR

看着像古板老板娘列的车间规矩——其实每一条背后都有一次具体的”事故”。

几条规则的真实账

挑几条最有代表性的展开说说。

“禁止使用强模板匹配作为核心方案”

模板匹配是新手最容易上手的方案——截一张按钮的图,cv2.matchTemplate 一调,搞定。

我们早期就这么干的。然后游戏更新了一次,按钮换了配色——所有模板全废。再然后用户用 1920×1080 之外的分辨率——所有模板全废。再再然后某个皮肤更换——所有模板部分失效。

短短两个月模板库重做了三轮。最后下狠心全部迁到 YOLO + OCR + CLIP 的多信号融合。这条规则就是那段血泪的产物

不写在 AGENTS.md 里?AI 看你代码里有 cv2.matchTemplate 的痕迹,下次写新功能保准给你来一段——它觉得”这个项目已经在这么做了”。

“尽可能不使用全屏 OCR,效率过低”

OCR 跑全屏是个昂贵操作。一张 1920×1080 截图丢给 OCR,CPU 单线程上要花 200~500ms。每次决策都这么搞,UI 直接卡死。

正确做法是先用便宜的方式(HSV 蒙版、YOLO 框)定位到目标区域,再对小区域做 OCR。30×100 像素的 OCR 大概 10~20ms,差一个数量级。

AI 不知道这个性能差距。它写代码图省事,“管它呢,OCR 跑一遍最直接”。写在规则里,每次它要做识别都得想一下”是不是该先收窄区域”。

“点击 Yolo_Box/元素尽量使用 device.click_element(),非自行判断的坐标不允许使用 device.click()”

click_element 接受一个 YOLO 检测框,内部会算出框中心点再点击。click(x, y) 接受坐标。

为啥要分?因为坐标是相对于截图分辨率的。同一个像素坐标在不同分辨率下指向完全不同的 UI 元素。click_element 内部会做坐标系转换,click 不会。

AI 第一反应永远是 click(x, y)——它觉得直接。然后用户分辨率一换,所有点击全偏。

规则写明白,AI 至少会先尝试 click_element,找不到合适的 element 再考虑 click

“能复用的工具方法尽量写到 src/utils 中”

这条针对的是 AI 最喜欢的毛病——重复造轮子

让 AI 加个新功能,它需要个”模糊字符串匹配”。它不会去翻你 string_tools.py 看有没有,直接在新文件里写一个 def fuzzy_match(...)。下个月需要类似功能时,又写一个略有不同的 def similar_match(...)。半年后你的项目里散落着八个”模糊匹配”实现,谁都不知道该用哪个。

规则写明白,AI 会先去 src/utils 翻翻。即使它还是想新写一个,至少会留个”我没找到合适的现有方法”的说明,方便你 review 时纠正。

“不理解需求、资料不足、规则冲突时,必须停止并询问用户”

这条是反 AI 本能的——AI 天生爱”自圆其说”。你给它一个模糊需求,它会自己脑补一个合理的解释然后开始动手。代码写得很顺,但走的不是你要的方向。

写明白”不懂就问”,AI 至少在执行前会先确认。哪怕只是确认 30% 的细节,也比闷头干完后推倒重来强。

“不得伪造测试通过、删除失败样本、跳过失败用例或隐藏失败原因”

这条够直白,听起来还有点搞笑——但真的发生过。

AI 跑测试发现某个用例失败,第一反应是”分析失败原因然后修”。当它修不动的时候,它会非常自然地”修改测试本身让它通过”——把 assert 改宽松、把测试 skip 掉、甚至直接 mock 掉被测函数。

第一次发现这事儿的时候我盯着 diff 看了好几分钟才反应过来。AI 没”恶意”——它只是把”测试通过”当成 KPI 了,达成 KPI 的最短路径就是改测试。

写规则不能让你完全杜绝,但能极大降低发生率。每次 review 也得专门盯一下”测试本身有没有被改”。

反 AI 味的几条

除了”禁止做什么”,还有”必须怎么写”的约束。这部分针对的是 AI 特有的写作病。

1
2
3
4
5
## README 与文案约束
* README 应像真实项目维护者写的文档,禁止写成营销页、宣传稿或 AI 生成味很重的长文。
* README 只写用户真正需要的信息:项目用途、安装方式、基本用法、配置说明、常见问题、开发说明。
* 禁止把实现细节、内部调试过程、无关背景、过度优势描述、空泛愿景塞进 README。
* README 中的功能描述必须与当前代码真实能力一致,未完成能力必须明确标注状态,禁止夸大。

AI 写 README 的味儿很冲——

🌟 这是一个革命性的工具,重新定义了 XXX 的体验。本项目采用先进的 AI 技术……

谁看了不抠脚?写规则强制把这种 hype 内容删掉。

1
2
3
## UI 文案
* UI 文案、设置项说明、提示文本必须简短、明确、可操作。
* 错误提示应说明发生了什么、用户能做什么;不要输出堆栈、内部变量名或调试废话。

AI 写错误提示也是一绝——

在执行 task_runner.execute_step() 方法时遇到 ConnectionResetError, 详情:Connection reset by peer (errno 54)。请检查 src/core/device/Android/adb_client.py 第 234 行。

这种提示甩给普通用户,等于让他自己看堆栈。规则强制 AI 把堆栈隐藏到日志里,UI 上只显示”连不上设备,请检查 USB 线”。

写 AGENTS.md 的几条心得

写了几版之后总结的:

1. 规则要带”为什么”,至少在团队内部知识里要有。

AGENTS.md 本身要简洁,但每条规则背后的原因得有记录——可以是 commit message、可以是单独的 ADR、可以是内部文档。AI 偶尔会”挑战”规则(“我觉得这条不适用于当前场景”),有原因记录在你能直接反驳。

2. 规则越具体越好,避免空话。

“代码要清晰”是废话。“字符串匹配必须使用 src/utils/string_tools.py”是规则。前者 AI 当耳边风,后者它真的会查。

3. 规则数量要克制。

145 行已经接近上限——再长 AI 自己都注意力涣散。每加一条规则都要问:这条值不值得占 AI 的注意力预算?

4. 规则要定期 review。

项目演进过程中,有些规则会过时(比如某个工具方法被废弃),有些新坑要补上规则。每个季度过一遍 AGENTS.md,删过时的、加新的。

5. 把规则和实际审查结合。

写规则不等于强制执行。AI 还是会偶尔越界,PR review 时人要盯一下。我们的做法是 review checklist 直接对照 AGENTS.md 的几个关键条款——“有没有写死坐标”“测试有没有被改弱”。

一个意外收获

AGENTS.md 写完之后,我们发现一件有意思的事——这份文件不仅对 AI 有用,对新加入的人类同事也很有用

新人来了,让他先读一遍 AGENTS.md,半小时就能把项目的”边界规矩”理清楚。比口口相传”哦对了我们不许用模板匹配”靠谱得多。

某种意义上,写给 AI 的规范,最后变成了这个项目最浓缩的工程文化沉淀

收个尾

让 AI 写代码这事儿,未来肯定越来越普遍。但 AI 永远不会自己知道”这个项目的规矩”——这些规矩是你踩坑踩出来的,是你的工程文化,是你团队的共识。

把这些规矩显式写出来,是给 AI 一份”入职手册”。它不会替你做决策,但至少能在你的边界内做事。

不写 AGENTS.md 让 AI 写代码?就跟新人入职第一天没人带、没文档、没规矩,让他自由发挥一样——能用的产出占比,懂的都懂。

Python 应用要发给”不懂 Python 的人”用,绕不开打包这道坎。三种主流选择:

  • PyInstaller:最常见,资料多,但产物体积大、启动慢,杀软老把它当成病毒
  • cx_Freeze:老牌,稳,但生态比 PyInstaller 还小
  • Nuitka:把 Python 翻译成 C 再编译,启动快、产物小、反混淆难度高

我们最后选了 Nuitka。选了之后才发现入坑容易出坑难——跨三个平台、要支持便携版和合并版两种分发形态、还要兼顾用户体验,光打包脚本就写到了 35K 行。

这篇说说几个真坑。

为什么不用 PyInstaller

短话长说一下选型背景。

我们这个工具是给玩家用的桌面应用,主流程是抓屏 + 模型推理 + UI 控制。两个硬性诉求:

  1. 启动要快——用户双击 exe 转半天圈圈,体验立马劝退
  2. 杀软误报要少——PyInstaller 打的 Python 程序在 Windows Defender 那简直是常客

第一坑:portable 还是 merged

桌面应用的”数据放哪儿”在三个平台习惯完全不同:

  • Windows 用户习惯”软件目录里啥都有”——portable 风格
  • macOS 用户习惯 .app 包 + 数据在用户目录 ~/Library/...——合并风格
  • Linux 用户分裂,两种都有

我们最后给打包脚本加了两个开关:

1
2
python build_app.py --portable    # 便携:所有数据放程序同目录
python build_app.py --merged # 合并:数据放 ~/.gakumas-assistant/

代码里所有路径都不能写死,统一过一层路径解析:

1
2
3
4
5
# src/utils/runtime_paths.py
def get_data_dir() -> Path:
if is_portable_build():
return get_exe_dir() / "data"
return Path.home() / ".gakumas-assistant" / "data"

is_portable_build() 在打包时通过环境变量或者打包文件标记决定。所有读写数据、缓存、配置、日志的地方都走这一层——任何一个地方写死路径,跨模式就崩。

这条规矩在项目规范文档里也写死了:

禁止在代码中写死本机绝对路径、用户名、设备路径、端口或私有环境信息。

不是教条,是被坑出来的。

第二坑:macOS .app 包的 WebView

macOS 上桌面 UI 我们用 pywebview——简单说就是把本地 Web 页面嵌进原生窗口里。开发期完全无感,跨平台一致。

打包成 .app 的时候开始出妖了。

问题一:内置 WebView(WKWebView)需要一堆 macOS 框架,打到 .app 里体积膨胀几百 MB。
问题二:某些 macOS 版本上 .app 启动 WebView 会卡白屏,原因深挖到 PyObjC 绑定层,修起来代价巨大。
问题三:用户把 .app 拖到非标准位置(比如外置 U 盘),权限系统直接拒绝。

折腾几轮之后我们做了个分流决策:

1
2
macOS --portable → 不内置 WebView,跑浏览器版(默认浏览器打开 localhost)
macOS --merged → 内置 WebView,正经 .app 体验

--portable 模式下产物只是个文件夹,体积小、启动快、绕开所有 WebView 坑,缺点是用户得用浏览器看 UI。
--merged 模式给追求”原生体验”的用户,体积大但和系统融合更深。

让用户选,比强求一种方案要平衡得多。

第三坑:Windows 控制台窗口

Windows 上的 GUI 程序默认带一个黑色控制台窗口。开发者觉得没啥,普通用户看着像中毒:

我打开你的软件,黑框框弹出来一个,是不是在偷我密码?

Nuitka 提供了 --windows-console-mode=disable 关掉控制台。但完全关掉又有新问题——程序崩溃时的 stderr 没地方看,用户没法提供错误信息。

最终的策略:

1
2
Windows --portable → 带控制台(默认给会调试的用户用)
Windows --merged → 不带控制台(给小白用户用)

--merged 的版本里 stderr 全部重定向到日志文件,崩溃信息照样能拿到——就是用户得自己翻日志目录。

为了让用户能找到日志,我们在 UI 里加了个”打开日志目录”按钮。这种小工程才是真用心。

第四坑:ONNX Runtime 的 EP 依赖

模型推理用 ONNX Runtime。问题是 ORT 的”执行提供者(Execution Provider, EP)”在不同平台依赖完全不同:

  • Windows 想用 DirectML,得带 DirectML.dll
  • macOS 想用 CoreML,要链接 CoreML.framework
  • Linux 多半只能 CPU

Nuitka 默认不会把这些原生依赖全部识别并打包。手动指定:

1
2
3
4
5
# build_app.py 片段
nuitka_args += [
"--include-package-data=onnxruntime",
"--include-data-dir=path/to/onnxruntime/capi=onnxruntime/capi",
]

每次 ORT 升版本,依赖文件路径可能微调,打包脚本要跟着更新——这种”打包脚本耦合三方库结构”的痛苦无法根除,只能记好测试矩阵:每次升级 ORT 都跑一遍三平台打包验证。

第五坑:UPX 不是万能的

打包产物大,本能反应是”加 UPX 压缩”。Nuitka 自己也支持 --include-data-dir 配合 UPX。

实际上:

  • Windows 上压完更容易被杀软误报(UPX 是恶意软件的常用工具)
  • macOS 上 UPX 压完代码签名会失效,分发立马报错
  • Linux 上勉强能用

我们的做法是条件启用——只在 Linux 上开 UPX,其他平台不动。打包脚本里检测一下:

1
2
if platform.system() == "Linux" and shutil.which("upx"):
nuitka_args.append("--upx-binary=" + shutil.which("upx"))

没装 UPX 就跳过。开发者无感。

第六坑:模型文件 vs 资源文件

项目里两类大文件:

  • 模型文件:YOLO 的 .onnx、CLIP 的 .onnx、OCR 模型
  • 资源文件:图标、UI 模板、游戏术语数据库

Nuitka 把这些当 data file 打进去,产物体积爆炸(解压后 1 GB+)。

后来分了两条路:

  • 必需运行的小文件(图标、术语库 JSON):打进去
  • 大模型文件:不打进去,首次启动从用户缓存目录加载,找不到就从内置 CDN 下载

判断逻辑:

1
2
3
4
5
6
def ensure_model_available(model_name: str) -> Path:
local_path = get_user_cache_dir() / model_name
if local_path.exists():
return local_path
download_from_cdn(model_name, local_path) # 带进度条
return local_path

代价是用户首次启动需要联网下载几百 MB。换来的是发布包体积从 1 GB+ 降到 80 MB 左右,下载更新更轻。

几条经验

写完 35K 行打包脚本之后,沉淀的几条规矩:

一切路径走解析层,不允许散点写死。 否则一切跨平台、跨模式都翻车。

给打包脚本写真实的 CI 矩阵。 Windows / macOS / Linux × portable / merged,至少六种组合,每次发版前都跑一遍。靠人记忆是记不住的。

大文件不要打进去,用 lazy download。 既减小发布体积,又方便模型独立升级。

杀软友好性提前考虑。 用 Nuitka 而不是 PyInstaller、不加 UPX(除非必要)、可选代码签名——这些不是优化,是发布质量的硬指标。

打包脚本本身要带 --dry-run 真正打一次几十分钟,dry-run 帮你验证配置正确性,几秒搞定。

收个尾

跨平台打包这事儿,最难的不是技术——技术随便查随便有。难的是每个平台都有自己的”用户期待”和”系统约束”,强行用一套方案套三个平台,要么牺牲体验,要么牺牲简洁。

我们最后的 build_app.py 看起来很臃肿,但每一段都对应一个真实踩过的坑。把这种”为什么这么写”的注释留好,下个维护者才不会一上来就想推倒重写。

下次见到只支持单平台打包的 Python 项目,可以善意地理解——多平台不是抠两行参数就能搞定的,背后都是血泪。

自动化项目跑久了你会发现一个反直觉的事实——写主流程其实不难,难的是出问题之后能不能复现。

界面识别错了一次,到底是模型识别失败,还是当时画面卡了?决策错了一次,是局面本身刁钻,还是上游传了脏数据?这些问题,单看一行日志根本答不上来。

最早遇到这种问题的时候,处理方式很无脑——让用户截图发群里、问”刚才操作过什么”、隔一段时间自己也忘了细节。一来二去,复现成功率约等于零。

后来痛定思痛,搞了一套”失败现场打包”机制。出错的时候自动把那一刻能拿到的所有上下文一起冻起来,事后能完整复盘。

一个失败包里该装什么

先看实际跑出来的目录长这样:

1
2
3
4
5
6
7
8
9
10
logs/failure_2025-02-03_14-32-15_auto_purchase/
├── meta.json # 任务元数据:哪个任务、哪一步、哪个版本
├── exception.txt # 异常类型 + traceback
├── screenshot.png # 失败瞬间的屏幕截图
├── yolo_detections.json # YOLO 那一帧的所有检测框 + 置信度
├── ocr_results.json # OCR 当时识别到的所有文本
├── device_info.json # 设备分辨率、状态、ADB 连接信息
├── task_context.json # 任务执行到这一步时的内部状态
├── config_snapshot.json # 用户配置(脱敏后)
└── recent_logs.txt # 失败前最后 N 行日志

这一套打包出来通常 1~5 MB。对比起原来”日志里一行 ERROR + 用户截图模糊得没法看”,信息量差出几个数量级。

代码上对应一个工具类,封装在 src/utils/task_failure_package.py,配合 task_dumper.py 做现场冻结。

为什么不光记日志

最常见的问题:日志能解决一切吗?

不能。原因有几个:

1. 日志只记”已知重要”的信息

你日志里记的是你写代码时认为重要的字段。但一旦遇到没预想到的失败,那些”没被记进日志”的字段往往才是关键——比如某个 buff 状态、某个隐藏 modal 的出现。临时加日志?等你下次复现已经是一周后了。

2. 日志没有”那一刻的画面”

视觉自动化最怕的事就是”日志说找到了一个按钮,但你不知道那是个什么按钮”。截图一截,立马明白——哦,是个广告 popup,YOLO 误识别成正常按钮了。

3. 日志是文本,结构化数据丢了

YOLO 返回的检测框是结构化的(坐标、类别、置信度),日志只能字符串化。事后想重新跑一遍 NMS 或者过一遍分类,原始数据已经不在了。

失败包补的就是这些”日志补不到”的部分。

打包的时机

最容易踩坑的设计点——什么时候打包?

最朴素的想法:在最外层 try-except 里包一下。问题是这时候很多上下文已经被 GC 掉了。比如那一帧的截图,任务往下走了几步可能已经被新截图覆盖。

我们的做法是在关键步骤进入时记一个”快照点”,异常发生的时候,从最近的快照点开始打包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TaskDumper:
def snapshot(self, label: str, context: dict):
"""记录一个可能用于失败打包的现场快照"""
self._latest = {
"label": label,
"timestamp": time.time(),
"context": context,
"screenshot": self.device.get_screenshot(), # 复制一份
"yolo_results": self.engine.latest_results,
...
}

def dump_on_failure(self, exception: Exception):
"""异常时调用,把最近快照 + 异常信息打包到 logs/"""
if self._latest is None:
return
pkg_dir = self._make_dir()
self._write_meta(pkg_dir, exception)
self._write_screenshot(pkg_dir, self._latest["screenshot"])
self._write_detections(pkg_dir, self._latest["yolo_results"])
...

调用方在每个有意义的步骤开始时调用 snapshot(),失败时由外层 try-except 调用 dump_on_failure()。snapshot 的开销是个浅拷贝 + 一次截图,可控。

截图的几个坑

视觉自动化里截图是失败包的灵魂,但坑也最多。

坑一:截图被覆盖

主线程持续抓屏,每次都写到同一个 numpy buffer。snapshot 的时候不复制一份,过一帧你拿到的就是新画面,不是失败那一刻的画面。

解决:snapshot 时立即 .copy(),强制内存隔离。

坑二:截图过大

高分辨率全屏 PNG 一张能到 3~5 MB。一天攒几十个失败包,磁盘吃不消。

解决:写盘时缩放到 1280px 宽(短边等比),用 JPG 而非 PNG,单张降到 200~400 KB。但要注意——用于复现的截图必须能再喂给模型推理,所以缩放比例要可逆,或者干脆同时存一份原图、一份压缩图(原图设短保留期,压缩图长期保留)。

坑三:含隐私信息

游戏画面可能含玩家 ID、昵称、好友列表。失败包要发给开发者排查时,这些都得脱敏。

解决:截图打包前过一道 mask——把已知的隐私区域用纯色覆盖。config_snapshot.json 也走脱敏函数,token、cookie、密码一律 [REDACTED]

meta.json 是排查的入口

打包目录里第一个该看的就是 meta.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"version": "0.8.2",
"task_name": "auto_purchase",
"step": "click_purchase_confirm",
"platform": "macOS",
"device": {
"type": "PlayCover+MaaTools",
"resolution": "1920x1080"
},
"exception": {
"type": "ElementNotFoundError",
"message": "Expected button 'PurchaseConfirm' not found in current frame"
},
"snapshot_age_ms": 47,
"task_duration_ms": 12300
}

snapshot_age_ms 这个字段特别有用——告诉你失败那一刻距离最近的 snapshot 隔了多久。隔得短,说明截图就是案发瞬间;隔得久(比如几秒),可能这中间发生了别的事,要看 recent_logs.txt 补完。

task_duration_ms 是另一个隐性指标——任务正常完成只要几秒,但这次跑了几十秒还失败,多半是某个等待环节卡住了。

一次真实的排查

举个真实例子说明这套东西怎么用的。

用户反馈”自动购买偶尔会失败”。打开他发来的 failure 包:

  1. meta.json:异常是 ElementNotFoundError,找不到”购买确认”按钮
  2. screenshot.png:屏幕上确实没有按钮——出现了一个”商品已售罄”的提示框
  3. yolo_detections.json:YOLO 正确检测到了 “SoldOutModal”,置信度 0.94
  4. task_context.json:任务状态显示”等待购买确认按钮出现”
  5. recent_logs.txt:日志里有一行 “Skipping unknown modal: SoldOutModal”——原来代码里见过这个 modal,但没写关闭逻辑

诊断结论清清楚楚——商品售罄的情况没被处理。修起来 5 分钟,写测试用例验证完事。

如果只有日志,光看 “ElementNotFoundError” 这个异常名根本指不到方向。

落地的几条规矩

写一套失败包系统不难,难的是用得起来——很多类似系统造出来之后没人用,最后失活。我们这套能持续运转,靠的是几条死规矩:

1. 默认开启,不能依赖用户手动启用。

加个开关让用户”如果遇到问题再打开”,那等于没做——用户遇到问题的时候开关是关的,错过现场。常态运行,磁盘吃得起。

2. 失败包目录要简单可压缩。

logs/failure_* 一个目录一个失败,用户一键打包成 zip 发上来。别搞数据库、别搞分布式存储,普通文件夹最香。

3. 大小要有兜底。

设个上限——比如最多保留 50 个失败包,超过的按时间删旧的。免得磁盘吃光。

4. 失败包本身打包失败不能影响主流程。

dump_on_failure 内部全部 try-except 包住,自己挂了写个 ERROR 日志就完事,绝不能往上抛——主任务已经在异常路径上了,再来一波异常就乱了。

5. 脱敏机制提前写好。

不是出问题之后再想”哦对截图里有手机号”。脱敏函数随 dump 一起写,每个失败包出去前都过一遍。

收个尾

自动化项目跑久了你会发现,复现能力比解决问题的能力更稀缺

你能修任何 bug,前提是 bug 能复现。复现不了的 bug,再厉害的人也只能干瞪眼。失败包就是把”无法复现”这个最大的拦路虎搬走——出问题的那一刻发生了什么,全冻在那个目录里,你随时可以回去看。

写这套机制大概花了几天工夫,回头看是这个项目里 ROI 最高的几件事之一。每次有人在群里说”刚才出错了”,我都说”把 logs 目录里最新的那个失败包发我看看”,剩下的就好办了。

不打失败包的自动化项目,跟没装行车记录仪的车一样——平时没事,出事就抓瞎。