这台车是给农田巡检用的——装上小型激光雷达和深度相机,自己在田垄之间走、自己建图、检测到的异常上报回服务端。主控选的树莓派 5 16G,跑 ROS2 Humble。
实话说,树莓派的算力是真紧张——SLAM 跑起来 CPU 长期 60%+,Nav2 的代价地图刷新跟着拉满,再叠加摄像头流和 WebSocket 客户端,一不留神就热到降频。这篇就记一下为了让它”勉强能跑顺”踩过的一堆坑。
用 Docker 部署 ROS2
不在树莓派上直接装。理由不是 Docker 多优雅,而是直接装 ROS2 Humble + colcon + 一堆 nav2 包,整个系统会被污染得跟战场似的;编译还慢。Docker 至少能把环境隔住、版本钉死:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| FROM ros:humble-ros-base
RUN apt-get update && apt-get install -y \ ros-humble-slam-toolbox \ ros-humble-navigation2 \ ros-humble-nav2-bringup \ ros-humble-robot-localization \ && rm -rf /var/lib/apt/lists/*
COPY src/ /opt/rover_ws/src/ COPY scripts/ /opt/rover_ws/scripts/
RUN cd /opt/rover_ws && \ . /opt/ros/humble/setup.sh && \ colcon build --symlink-install
COPY scripts/entrypoint.sh /entrypoint.sh RUN chmod +x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"]
|
compose 配置要注意两件事:
network_mode: host——ROS2 的 DDS 必须用主机网络,桥接模式下节点之间互相发现不到,这个坑能让你查一下午devices 映射——激光雷达和深度相机的设备文件得显式挂进去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| version: '3.8'
services: rover-edge: build: . container_name: rover-edge privileged: true network_mode: host volumes: - ./src:/opt/rover_ws/src - ./data/dev_ws:/opt/rover_ws/data - /dev:/dev environment: - ROS_DOMAIN_ID=0 - FORCE_COLCON_BUILD=1 devices: - /dev/ttyUSB0:/dev/ttyUSB0 - /dev/video0:/dev/video0
|
privileged: true 其实不一定真需要,但实际部署中我没空一个个细抠 cap,加上图省事——不是好习惯,但够用。
slam_toolbox 的 async 模式在 ARM 上明显比 cartographer 稳——后者在 Humble 这边的构建脚本一直有点幺蛾子,能不碰就不碰。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| from launch import LaunchDescription from launch_ros.actions import Node
def generate_launch_description(): return LaunchDescription([ Node( package='slam_toolbox', executable='async_slam_toolbox_node', name='slam_toolbox', output='screen', parameters=[{ 'use_sim_time': False, 'slam_toolbox.map_file_name': '', 'slam_toolbox.mode': 'mapping', 'slam_toolbox.scan_topic': '/scan', 'slam_toolbox.publish_map': True, }], ), ])
|
建图流程很常规——先遥控走一圈,再保存:
1 2 3 4
| ros2 launch agri_rover_edge slam.launch.py ros2 run teleop_twist_keyboard teleop_twist_keyboard ros2 service call /slam_toolbox/save_map slam_toolbox/srv/SaveMap \ "{name: {data: 'farm_map'}}"
|
农田场景里有个反复出现的坑——田间路径又长又直,回环少。SLAM 在长直线上累计误差大,回到起点时可能错位半米以上——地图直接画歪了。
后来加了几个明显的人工路标(停车桩、铁丝架),故意让车经过、回扫,靠这些路标让算法触发 loop closure,地图才慢慢稳下来。
Nav2 在算力紧张时的参数
Nav2 默认配置在 PC 上跑得飞起,搬到树莓派上等同自爆——costmap 刷新和路径规划都太勤快。把几个频率拍下来才能喘口气:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| local_costmap: local_costmap: ros__parameters: update_frequency: 2.0 width: 3 height: 3 resolution: 0.05
global_costmap: global_costmap: ros__parameters: update_frequency: 1.0 width: 20 height: 20 resolution: 0.05
planner_server: ros__parameters: GridBased: tolerance: 0.5
controller_server: ros__parameters: FollowPath: max_vel_x: 0.3 max_vel_theta: 0.5
|
tolerance: 0.5 这个比较关键——默认值是 0.25。农田地面凹凸不平,定位误差天然就在 20-30cm,容差给太小,Nav2 反复重规划,一直觉得”还没到、还没到”——车在原地疯狂调整角度,看着像跳广场舞。
最大速度限到 0.3 m/s 倒不是为了省算力,是因为车的底盘抖动跟速度强相关。再快一点,激光雷达点云的畸变肉眼可见——SLAM 这边就要骂街了。
覆盖巡检路径生成
农田巡检不是 A 到 B 那种简单导航,而是要把整块田走一遍——类似扫地机器人的”弓字形”。手写一段路径生成,输入是地图栅格 + 行间距:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import numpy as np from typing import List, Tuple
def generate_coverage_route( map_data: np.ndarray, resolution: float, origin: Tuple[float, float], line_spacing: float = 1.0, ) -> List[Tuple[float, float]]: traversable = map_data == 0 rows, cols = np.where(traversable) min_row, max_row = rows.min(), rows.max() min_col, max_col = cols.min(), cols.max()
def to_world(row, col): x = origin[0] + col * resolution y = origin[1] + row * resolution return (x, y)
route = [] direction = 1 current_row = min_row
while current_row <= max_row: if direction == 1: cols_range = range(min_col, max_col + 1) else: cols_range = range(max_col, min_col - 1, -1)
for col in cols_range: if traversable[current_row, col]: route.append(to_world(current_row, col))
current_row += int(line_spacing / resolution) direction *= -1
return route
|
这写法对凸多边形田块够用了。但要是地块是凹的(中间有障碍物),生成的路径会”穿障碍”——实际跑的时候 Nav2 会绕开重规划,体验就有点拉胯。
比较正确的做法是按可达区域分块再分别覆盖,但目前业务上还用不到,先这样将就着。
与服务端的 WebSocket
车上线 → 跟服务端建长连接 → 服务端下指令、车上报状态。WebSocket 客户端写得朴素,关键是断线重连和”消息按 action 分发”:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import asyncio import json import websockets
class RoverEdgeClient: def __init__(self, server_url: str): self.server_url = server_url self._ws = None
async def connect(self): while True: try: async with websockets.connect(self.server_url) as ws: self._ws = ws logger.success("已连接到服务端") await self._message_handler() except Exception as e: logger.error(f"连接失败: {e}") await asyncio.sleep(5)
async def _message_handler(self): async for message in self._ws: data = json.loads(message) action = data.get("action") if action == "navigate": await self._handle_navigate(data["data"]) elif action == "start_mapping": await self._handle_start_mapping() elif action == "stop_mapping": await self._handle_stop_mapping() elif action == "update_config": await self._handle_update_config(data["data"])
async def _handle_navigate(self, payload): target_x = payload["x"] target_y = payload["y"] goal_pose = self._create_goal_pose(target_x, target_y) self._nav_client.send_goal(goal_pose) await self._report_navigation_status("navigating")
async def report_position(self, x, y, theta): await self._ws.send(json.dumps({ "action": "position_update", "data": {"x": x, "y": y, "theta": theta} }))
async def report_anomaly(self, anomaly_type, details): await self._ws.send(json.dumps({ "action": "anomaly", "data": {"type": anomaly_type, "details": details} }))
|
asyncio.sleep(5) 是个偷懒的固定间隔。讲究的话应该用指数退避,避免雷击效应——不过这个场景只有一台车,无所谓。
异常检测
巡检过程中要持续盯几件事:“我没卡死、我还能动、电池还能撑、激光雷达还在出数据”。一个简单的注册表把这些检查项管起来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| class AnomalyDetector: def __init__(self): self._anomaly_types = { "low_battery": self._check_low_battery, "stuck": self._check_stuck, "motor_stall": self._check_motor_stall, "lidar_timeout": self._check_lidar_timeout, "depth_stream_error": self._check_depth_stream, "nav2_timeout": self._check_nav2_timeout, }
def check(self, rover_state: dict) -> List[dict]: anomalies = [] for anomaly_type, checker in self._anomaly_types.items(): if checker(rover_state): anomalies.append({ "type": anomaly_type, "position": rover_state["position"], "timestamp": time.time(), "details": rover_state, }) return anomalies
def _check_low_battery(self, state: dict) -> bool: return state.get("battery_voltage", 0) < 3.3
def _check_stuck(self, state: dict) -> bool: if state.get("speed", 0) > 0.01: self._last_moving_time = time.time() return time.time() - self._last_moving_time > 30
|
检测到异常不只是发条消息,还会在地图上打一个 marker——农户后续可以照着这个地图位置直接走过去查看。这个比”发邮件告诉你东 35 北 12 处异常”实用太多了。