做 IoT 后端做到一半,需求来了:要接巡检机器人。不是那种遥控小车,是果园里自动跑、自己建图、识别病虫害、汇报异常的那种。
后端要干的活:注册小车、收心跳、下发任务、收图片跑识别、收传感器数据、显示实时位置、建图进度推送、兴趣点标注、视频流转发、异常报警……光看 API 文档就有 40 多个端点。
这篇不打算逐个 API 讲,而是挑几个真正让我掉头发的点:ROS 地图解码、实时数据推送的内存架构、兴趣点去重、还有配置版本管理。
ROS OccupancyGrid → PNG:三次翻转的故事
小车上跑的是 ROS2,建图用的 SLAM,输出的是 OccupancyGrid 消息。这个消息的 data 字段是 zlib 压缩的 int8[],值域是 0~100(概率)或 -1(未知)。
前端要 PNG,不是原始栅格。转换过程看着简单,但地图方向踩了坑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| def _grid_zlib_to_png(zlib_data: bytes, width: int, height: int) -> bytes: raw = zlib.decompress(zlib_data) grid = np.frombuffer(raw, dtype=np.int8).reshape((height, width))
image = np.full(grid.shape, 205, dtype=np.uint8) image[grid == 0] = 254 image[grid > 50] = 0
image_to_save = np.flipud(image)
ok, buf = cv2.imencode('.png', image_to_save) return buf.tobytes()
|
zlib.decompress → 解压np.frombuffer(..., dtype=np.int8).reshape(...) → 一维转二维np.flipud → 翻转 Y 轴
三色映射的阈值 50 是 ROS 的惯例:0 是确定空闲,100 是确定障碍,中间值是”可能”。阈值 50 以上画成黑色障碍,0 画成白色通道,其他画成灰色未知区域。
一开始没做 flipud,前端显示的地图是上下颠倒的——因为 ROS 的 OccupancyGrid 原点在左下角(跟数学坐标系一样),但 PNG 的像素从左上角开始排。翻了之后才正常。
实时数据推送:五套内存缓存
巡检车后端有五种需要实时推送的数据,每种都有独立的内存缓存和订阅者列表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| _frame_cache: Dict[int, bytes] = {} _stream_subscribers: Dict[int, list] = defaultdict(list)
_rover_position_cache: Dict[int, Dict[str, Any]] = {}
_map_progress_chunks: Dict[str, Dict[int, bytes]] = {} _map_progress_subscribers: Dict[int, list] = defaultdict(list)
_interest_points_cache: Dict[int, List[dict]] = defaultdict(list) _interest_points_subscribers: Dict[int, list] = defaultdict(list)
_task_subscribers: Dict[int, list] = defaultdict(list)
|
为什么不用 Redis?因为这些数据的生命周期就是”当前在线的连接”,小车下线就作废了,存 Redis 反而多一次序列化开销。内存里直接存 bytes/dict,SSE 端点直接 yield,延迟最低。
推送模式统一是 SSE(视频帧之外),每个 SSE 端点长这样:
1 2 3 4 5 6 7 8 9 10
| @router.get("/rover/{rover_id}/stream/position") async def stream_position(rover_id: int): queue = asyncio.Queue(maxsize=64) _position_subscribers[rover_id].append(queue) async def generate(): while True: payload = await asyncio.wait_for(queue.get(), timeout=30) if payload is None: break yield f"data: {json.dumps(payload)}\n\n" return StreamingResponse(generate(), media_type="text/event-stream")
|
数据源(心跳上报、建图进度等)在收到数据时往订阅者队列里 put_nowait:
1 2 3 4 5
| for q in _map_progress_subscribers.get(rover_id, []): try: q.put_nowait({"type": "chunk", ...}) except asyncio.QueueFull: pass
|
实时数据的哲学:宁可丢帧,不能卡住。所以队列都有 maxsize,满了就 pass。前端收不到一帧位置更新无所谓,下一帧马上就来了。
兴趣点去重:HTTP 和 WebSocket 的双通道问题
兴趣点(比如识别到病害的位置)有两个上报通道:HTTP REST 和 WebSocket。小车走 HTTP 上报稳定可靠,WebSocket 走实时推送延迟低。问题是同一个兴趣点可能两个通道都上报一次,前端就显示两个点。
解法:近邻去重。
1 2 3 4 5 6 7 8 9 10 11 12 13
| def _append_interest_point(rover_id, payload): point = _normalize_interest_point_payload(rover_id, payload) with _interest_points_lock: bucket = _interest_points_cache[rover_id] for last in reversed(bucket[-5:]): if ( abs(float(last.get("x", 0.0)) - point["x"]) < 1e-4 and abs(float(last.get("y", 0.0)) - point["y"]) < 1e-4 and str(last.get("label") or "") == point["label"] ): return last bucket.append(point)
|
三个条件全满足才判重:x 坐标差小于 0.0001、y 坐标差小于 0.0001、label 相同。0.0001 在地图坐标系里大约是 0.1 毫米——足够区分两个不同位置,又不会因为浮点精度误判。
只检查最近 5 个而不是全部,是因为兴趣点有时间顺序,很早之前的点不可能和当前点重复,遍历全部纯属浪费。
任务协调器:收拾烂摊子
小车任务有五种状态:pending → dispatched → running → completed/failed。现实比状态机复杂得多:
- 小车断线了,任务卡在
running - 小车重启了,任务卡在
dispatched - 新任务建图,但上一个建图任务还没标记完成
解法:一个 30 秒执行一次的协调器 _periodic_rover_reconciliation:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| async def _periodic_rover_reconciliation(interval_sec=30): while True: rovers = Rover.select() for rover in rovers: if rover.current_status in ("idle", "offline"): active_tasks = RoverTask.select().where( RoverTask.rover == rover, RoverTask.task_status.in_(["pending", "dispatched", "running"]) ) for task in active_tasks: if task.task_status == "dispatched" and _task_age(task) > 60: _mark_task_auto_failed(task, reason="调度超时") elif task.task_status == "running" and _task_age(task) > 60: _mark_task_auto_failed(task, reason="执行超时,小车已离线") await asyncio.sleep(interval_sec)
|
任务自动失败后,前端 SSE 推送一条 refresh 事件,前端重新拉取任务列表,用户看到的是任务从”运行中”变成”失败(超时)“,而不是永远卡在那里转圈。
配置版本管理:YAML 导入导出 + 回滚
小车的运行参数(速度、相机参数、建图分辨率等)是个嵌套 JSON。需求要求:支持版本历史、模板、YAML 导入导出、回滚。
版本表设计:
1 2 3 4 5
| class RoverRuntimeConfigVersion(BaseModel): rover = ForeignKeyField(Rover) version_no = IntegerField() config_data = TextField() created_at = DateTimeField()
|
回滚操作其实就是”创建一个新版本,内容是旧版本的快照”:
1 2 3 4 5 6 7 8
| def rollback_config(rover_id, target_version): target = RoverRuntimeConfigVersion.get(rover=rover_id, version_no=target_version) current_max = max_version(rover_id) new_version = current_max + 1 RoverRuntimeConfigVersion.create( rover=rover_id, version_no=new_version, config_data=target.config_data )
|
不做物理回退(DELETE + UPDATE),只做逻辑追加。这样版本历史永远不会断,审计也方便。
YAML 导入导出是个意外的坑:小车的配置字段命名风格是 ROS 的 snake_case,但前端要 camelCase。导出时做了一次转换,导入时又转回来。中间出了个 bug,一个布尔值 "true" 字符串没被正确转回 True,导致小车把 true 当成文件路径去读了……后来加了个深度遍历的类型修复函数才搞定。
视频流:帧缓存 + WebSocket 广播
视频流没走 SSE,走了 WebSocket + JPEG 帧的方式。小车端每帧发送一个 base64 编码的 JPEG,后端缓存最新一帧,同时广播给所有订阅的前端 WebSocket 连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| _frame_cache: Dict[int, bytes] = {} _stream_subscribers: Dict[int, list] = defaultdict(list)
@router.websocket("/ws/rover/{rover_id}/stream") async def rover_stream_ws(websocket: WebSocket, rover_id: int): await websocket.accept() queue = asyncio.Queue(maxsize=8) _stream_subscribers[rover_id].append(queue) try: while True: frame = await asyncio.wait_for(queue.get(), timeout=30) if frame is None: break await websocket.send_bytes(frame) except WebSocketDisconnect: pass finally: _stream_subscribers[rover_id].remove(queue)
|
为什么用 JPEG 帧而不是 RTSP/HLS?因为果园里的网络经常只有 2G 信号,RTSP 需要持续连接,断线重连很麻烦。JPEG 帧模式天然容错——丢一帧不影响下一帧,延迟也更可控。后端还配置了 mediamtx 作为 RTSP 中转(给延迟要求不高的监控场景用),两种模式共存。
写在最后
巡检车后端最花精力的不是单个功能的实现,而是状态一致性。小车随时可能断线、重连、换 IP,后端要能正确处理所有中间状态。上面那些协调器、去重、缓存清理,本质上都是在处理”分布式系统中的最终一致性”问题——只不过这个分布式系统只有两台机器(后端和小车),网络还特别烂。
经验总结就一条:永远不要假设小车还活着。每个操作都要考虑”如果小车下一秒就断线了,这个状态还能不能正确收敛?”