巡检机器人的后端全栈——ROS 地图解码、实时视频流和那一堆内存缓存

做 IoT 后端做到一半,需求来了:要接巡检机器人。不是那种遥控小车,是果园里自动跑、自己建图、识别病虫害、汇报异常的那种。

后端要干的活:注册小车、收心跳、下发任务、收图片跑识别、收传感器数据、显示实时位置、建图进度推送、兴趣点标注、视频流转发、异常报警……光看 API 文档就有 40 多个端点。

这篇不打算逐个 API 讲,而是挑几个真正让我掉头发的点:ROS 地图解码、实时数据推送的内存架构、兴趣点去重、还有配置版本管理。

ROS OccupancyGrid → PNG:三次翻转的故事

小车上跑的是 ROS2,建图用的 SLAM,输出的是 OccupancyGrid 消息。这个消息的 data 字段是 zlib 压缩的 int8[],值域是 0~100(概率)或 -1(未知)。

前端要 PNG,不是原始栅格。转换过程看着简单,但地图方向踩了坑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _grid_zlib_to_png(zlib_data: bytes, width: int, height: int) -> bytes:
raw = zlib.decompress(zlib_data)
grid = np.frombuffer(raw, dtype=np.int8).reshape((height, width))

# 三色映射:0=空闲(254), >50=障碍(0), 其余=未知(205)
image = np.full(grid.shape, 205, dtype=np.uint8)
image[grid == 0] = 254
image[grid > 50] = 0

# ROS 地图的坐标系原点在左下角,图像在左上角,得上下翻转
image_to_save = np.flipud(image)

ok, buf = cv2.imencode('.png', image_to_save)
return buf.tobytes()
  • zlib.decompress → 解压
  • np.frombuffer(..., dtype=np.int8).reshape(...) → 一维转二维
  • np.flipud → 翻转 Y 轴

三色映射的阈值 50 是 ROS 的惯例:0 是确定空闲,100 是确定障碍,中间值是”可能”。阈值 50 以上画成黑色障碍,0 画成白色通道,其他画成灰色未知区域。

一开始没做 flipud,前端显示的地图是上下颠倒的——因为 ROS 的 OccupancyGrid 原点在左下角(跟数学坐标系一样),但 PNG 的像素从左上角开始排。翻了之后才正常。

实时数据推送:五套内存缓存

巡检车后端有五种需要实时推送的数据,每种都有独立的内存缓存和订阅者列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 视频帧
_frame_cache: Dict[int, bytes] = {}
_stream_subscribers: Dict[int, list] = defaultdict(list)

# 实时位置
_rover_position_cache: Dict[int, Dict[str, Any]] = {}

# 建图进度
_map_progress_chunks: Dict[str, Dict[int, bytes]] = {}
_map_progress_subscribers: Dict[int, list] = defaultdict(list)

# 兴趣点
_interest_points_cache: Dict[int, List[dict]] = defaultdict(list)
_interest_points_subscribers: Dict[int, list] = defaultdict(list)

# 任务状态
_task_subscribers: Dict[int, list] = defaultdict(list)

为什么不用 Redis?因为这些数据的生命周期就是”当前在线的连接”,小车下线就作废了,存 Redis 反而多一次序列化开销。内存里直接存 bytes/dict,SSE 端点直接 yield,延迟最低。

推送模式统一是 SSE(视频帧之外),每个 SSE 端点长这样:

1
2
3
4
5
6
7
8
9
10
@router.get("/rover/{rover_id}/stream/position")
async def stream_position(rover_id: int):
queue = asyncio.Queue(maxsize=64)
_position_subscribers[rover_id].append(queue)
async def generate():
while True:
payload = await asyncio.wait_for(queue.get(), timeout=30)
if payload is None: break
yield f"data: {json.dumps(payload)}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")

数据源(心跳上报、建图进度等)在收到数据时往订阅者队列里 put_nowait

1
2
3
4
5
for q in _map_progress_subscribers.get(rover_id, []):
try:
q.put_nowait({"type": "chunk", ...})
except asyncio.QueueFull:
pass # 队列满了就丢,实时数据不怕丢帧

实时数据的哲学:宁可丢帧,不能卡住。所以队列都有 maxsize,满了就 pass。前端收不到一帧位置更新无所谓,下一帧马上就来了。

兴趣点去重:HTTP 和 WebSocket 的双通道问题

兴趣点(比如识别到病害的位置)有两个上报通道:HTTP REST 和 WebSocket。小车走 HTTP 上报稳定可靠,WebSocket 走实时推送延迟低。问题是同一个兴趣点可能两个通道都上报一次,前端就显示两个点。

解法:近邻去重。

1
2
3
4
5
6
7
8
9
10
11
12
13
def _append_interest_point(rover_id, payload):
point = _normalize_interest_point_payload(rover_id, payload)
with _interest_points_lock:
bucket = _interest_points_cache[rover_id]
# 只检查最近 5 个点
for last in reversed(bucket[-5:]):
if (
abs(float(last.get("x", 0.0)) - point["x"]) < 1e-4
and abs(float(last.get("y", 0.0)) - point["y"]) < 1e-4
and str(last.get("label") or "") == point["label"]
):
return last # 重复,返回已有条目
bucket.append(point)

三个条件全满足才判重:x 坐标差小于 0.0001、y 坐标差小于 0.0001、label 相同。0.0001 在地图坐标系里大约是 0.1 毫米——足够区分两个不同位置,又不会因为浮点精度误判。

只检查最近 5 个而不是全部,是因为兴趣点有时间顺序,很早之前的点不可能和当前点重复,遍历全部纯属浪费。

任务协调器:收拾烂摊子

小车任务有五种状态:pending → dispatched → running → completed/failed。现实比状态机复杂得多:

  • 小车断线了,任务卡在 running
  • 小车重启了,任务卡在 dispatched
  • 新任务建图,但上一个建图任务还没标记完成

解法:一个 30 秒执行一次的协调器 _periodic_rover_reconciliation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def _periodic_rover_reconciliation(interval_sec=30):
while True:
rovers = Rover.select()
for rover in rovers:
if rover.current_status in ("idle", "offline"):
# 检查是否有卡住的任务
active_tasks = RoverTask.select().where(
RoverTask.rover == rover,
RoverTask.task_status.in_(["pending", "dispatched", "running"])
)
for task in active_tasks:
if task.task_status == "dispatched" and _task_age(task) > 60:
_mark_task_auto_failed(task, reason="调度超时")
elif task.task_status == "running" and _task_age(task) > 60:
_mark_task_auto_failed(task, reason="执行超时,小车已离线")
await asyncio.sleep(interval_sec)

任务自动失败后,前端 SSE 推送一条 refresh 事件,前端重新拉取任务列表,用户看到的是任务从”运行中”变成”失败(超时)“,而不是永远卡在那里转圈。

配置版本管理:YAML 导入导出 + 回滚

小车的运行参数(速度、相机参数、建图分辨率等)是个嵌套 JSON。需求要求:支持版本历史、模板、YAML 导入导出、回滚。

版本表设计:

1
2
3
4
5
class RoverRuntimeConfigVersion(BaseModel):
rover = ForeignKeyField(Rover)
version_no = IntegerField() # 递增版本号
config_data = TextField() # JSON 快照
created_at = DateTimeField()

回滚操作其实就是”创建一个新版本,内容是旧版本的快照”:

1
2
3
4
5
6
7
8
def rollback_config(rover_id, target_version):
target = RoverRuntimeConfigVersion.get(rover=rover_id, version_no=target_version)
current_max = max_version(rover_id)
new_version = current_max + 1
RoverRuntimeConfigVersion.create(
rover=rover_id, version_no=new_version,
config_data=target.config_data # 复制旧版本内容
)

不做物理回退(DELETE + UPDATE),只做逻辑追加。这样版本历史永远不会断,审计也方便。

YAML 导入导出是个意外的坑:小车的配置字段命名风格是 ROS 的 snake_case,但前端要 camelCase。导出时做了一次转换,导入时又转回来。中间出了个 bug,一个布尔值 "true" 字符串没被正确转回 True,导致小车把 true 当成文件路径去读了……后来加了个深度遍历的类型修复函数才搞定。

视频流:帧缓存 + WebSocket 广播

视频流没走 SSE,走了 WebSocket + JPEG 帧的方式。小车端每帧发送一个 base64 编码的 JPEG,后端缓存最新一帧,同时广播给所有订阅的前端 WebSocket 连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
_frame_cache: Dict[int, bytes] = {}          # rover_id → 最新 JPEG bytes
_stream_subscribers: Dict[int, list] = defaultdict(list) # rover_id → [asyncio.Queue]

@router.websocket("/ws/rover/{rover_id}/stream")
async def rover_stream_ws(websocket: WebSocket, rover_id: int):
await websocket.accept()
queue = asyncio.Queue(maxsize=8)
_stream_subscribers[rover_id].append(queue)
try:
while True:
frame = await asyncio.wait_for(queue.get(), timeout=30)
if frame is None: break
await websocket.send_bytes(frame)
except WebSocketDisconnect:
pass
finally:
_stream_subscribers[rover_id].remove(queue)

为什么用 JPEG 帧而不是 RTSP/HLS?因为果园里的网络经常只有 2G 信号,RTSP 需要持续连接,断线重连很麻烦。JPEG 帧模式天然容错——丢一帧不影响下一帧,延迟也更可控。后端还配置了 mediamtx 作为 RTSP 中转(给延迟要求不高的监控场景用),两种模式共存。

写在最后

巡检车后端最花精力的不是单个功能的实现,而是状态一致性。小车随时可能断线、重连、换 IP,后端要能正确处理所有中间状态。上面那些协调器、去重、缓存清理,本质上都是在处理”分布式系统中的最终一致性”问题——只不过这个分布式系统只有两台机器(后端和小车),网络还特别烂。

经验总结就一条:永远不要假设小车还活着。每个操作都要考虑”如果小车下一秒就断线了,这个状态还能不能正确收敛?”

欢迎关注我的其它发布渠道