一般的 Vue 项目,组件是在构建时打包进去的——import MyComponent from './MyComponent.vue',Webpack/Vite 把它打进 chunk,页面刷新就加载。这是最常见也最简单的模式。

ServerManager 的插件系统有另一个需求:用户安装一个插件,插件的 Vue 页面要在不重新构建前端的情况下出现在界面上——侧边栏多一个菜单项、仪表盘多一个卡片、设置页多一个配置面板。

这就需要运行时加载 Vue 单文件组件(.vue 文件),也就是把”编译 → 打包 → 部署”这个流程从构建时挪到运行时。技术上靠的是 vue3-sfc-loader

插件元数据的声明式注册

插件后端在 on_load() 生命周期钩子里声明自己有哪些前端组件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class MyPlugin(BasePlugin):
def on_load(self):
# 注册一个带路由的页面组件
self.register_frontend_component(
component_name='MyDashboard',
component_type='dashboard',
entry_file='MyDashboard.vue',
title='我的仪表盘',
icon='mdi-chart-line',
dashboard={
'default_size': 'large',
'allowed_sizes': ['medium', 'large', 'wide'],
'order': 10,
},
)

# 注册一个配置页面
self.register_config_entry(
component_name='MyConfig',
entry_file='MyConfig.vue',
title='我的插件配置',
)

register_frontend_component()PluginRegistry 里做了一系列校验:

1
2
3
4
5
6
7
8
9
10
11
12
13
def normalize_frontend_entry_file(entry_file: str, allowed_extensions=None) -> str:
allowed_extensions = tuple(allowed_extensions or ('.vue', '.js', '.ts'))
entry_file = str(entry_file or '').strip().replace('\\', '/')
if not entry_file:
raise ValueError("前端入口文件不能为空")
if os.path.isabs(entry_file):
raise ValueError("前端入口文件不能使用绝对路径")
parts = [part for part in entry_file.split('/') if part]
if not parts or any(part == '..' for part in parts):
raise ValueError("前端入口文件不能包含路径穿越")
if not entry_file.endswith(allowed_extensions):
raise ValueError(f"前端入口文件必须是: {', '.join(allowed_extensions)}")
return '/'.join(parts)

绝对路径?拒绝。父目录引用 ..?拒绝。扩展名不对?拒绝。这是插件的输入,不能让它搞出任意文件读取。

四种组件类型

component_type 决定了组件在界面上的呈现位置:

  • page:完整的路由页面,出现在侧边栏菜单里,有 route_path
  • widget:小的挂件,可以嵌入到其他页面
  • dashboard:仪表盘卡片组件,有尺寸约束(small/medium/large/wide)
  • config:配置页面,出现在系统设置的插件配置区域

每种类型的注册参数略有不同,但核心数据结构是一样的。注册完成后,API 端点 /api/plugins/frontend/ 会返回所有已启用插件的前端组件列表。

仪表盘布局系统

dashboard 类型组件的元数据最丰富,因为它需要布局约束:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def normalize_dashboard_definition(definition: Dict[str, Any]) -> Dict[str, Any]:
allowed_sizes = _normalize_dashboard_sizes(definition.get('allowed_sizes'))
default_size = _normalize_dashboard_size(definition.get('default_size'))
min_size = _normalize_dashboard_size(definition.get('min_size'))
if default_size not in allowed_sizes:
default_size = allowed_sizes[0]
if min_size not in allowed_sizes:
min_size = allowed_sizes[0]
return {
'title': str(definition.get('title') or '').strip(),
'icon': str(definition.get('icon') or '').strip(),
'default_size': default_size,
'min_size': min_size,
'allowed_sizes': allowed_sizes,
'order': int(definition.get('order') or 0),
'default_hidden': bool(definition.get('default_hidden', False)),
}

尺寸有四种:small(1 格宽)、medium(2 格宽)、large(2 行 2 列)、wide(3 格宽)。插件声明自己支持哪些尺寸,前端布局引擎从中选一个最合适的。

还有 layoutConstraints(最小/最大宽高)和 displayModes(卡片、简略、详细三种展示模式的切换),这些校验委托给 normalize_layout_constraints()normalize_display_modes() 来保证数据格式正确。

插件之间的依赖声明

插件可以声明自己依赖其他插件:

1
2
3
4
5
6
7
8
9
10
11
def register_plugin_dependencies(self, plugin_id: str, dependencies: List[Dict]):
normalized = []
for dep in dependencies or []:
if not isinstance(dep, dict) or not dep.get('id'):
raise ValueError(f"无效的插件依赖定义: {dep}")
normalized.append({
'id': str(dep['id']).strip(),
'version': str(dep.get('version') or '').strip(),
'optional': bool(dep.get('optional', False)),
})
self._plugin_dependencies[plugin_id] = normalized

依赖不只是声明——在跨插件 API 调用时,系统会检查调用方是否声明了对目标插件的依赖:

1
2
3
4
5
6
def _assert_cross_plugin_access(self, caller_plugin_id, target_plugin_id, permission, callable_name=''):
if caller_plugin_id != target_plugin_id:
if not self.plugin_depends_on(caller_plugin_id, target_plugin_id):
raise PermissionError(
f"插件 {caller_plugin_id} 与插件 {target_plugin_id} 未声明依赖关系"
)

你想调别人的 API?先在 manifest 里声明依赖。这防止了隐式耦合——没有”恰好能用”这回事,依赖关系是白纸黑字写好的。

跨插件通信:API 和 Hook

插件之间的通信分两种模式:

API 调用——一个插件主动调用另一个插件暴露的方法:

1
2
3
4
5
# 插件 A 注册 API
self.register_plugin_api('send_sms', handler)

# 插件 B 调用插件 A 的 API
result = self.call_plugin_api('plugin_a_id', 'send_sms', phone, message)

Hook 触发——一个插件触发事件,另一个插件响应:

1
2
3
4
5
# 插件 A 注册 Hook
self.register_plugin_hook('on_order_created', handler)

# 插件 B 触发插件 A 的 Hook
result = self.trigger_plugin_hook('plugin_a_id', 'on_order_created', order_data)

两者都经过沙箱。调用方和被调用方的代码都在 PluginSandbox.execute_safe_async() 里执行,权限检查在两端都做。

运行时加载 Vue 组件

后端注册完了,前端的活怎么干?

插件的 .vue 文件存在服务端的 plugins/<plugin_id>/frontend/ 目录下。前端通过 API 拿到组件列表后,对每个 entry_file 发起 HTTP 请求获取源码,然后用 vue3-sfc-loader 在浏览器里编译:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 前端伪代码(简化版)
import { loadModule } from 'vue3-sfc-loader'

async function loadPluginComponent(pluginId, entryFile) {
const url = `/api/plugins/${pluginId}/frontend/${entryFile}`
const component = await loadModule(url, {
modules: {
vue: Vue,
'vue-router': VueRouter,
vuetify: Vuetify,
// ... 框架需要的全局依赖
},
})
return defineAsyncComponent(() => component)
}

vue3-sfc-loader 会在浏览器里解析 .vue 文件的 <template><script><style>,编译成可渲染的组件对象。这样一来,插件不需要走主项目的构建流程——装了就在,卸了就消失,前端不需要重新构建。

安全考量

运行时加载的代码天然有不信任的问题。后端已经通过 AST 校验和沙箱做了代码层面的安全限制(另一篇文章讲过),前端这边也有防护:

  1. 入口文件路径校验——绝对路径、路径穿越都被拒绝
  2. 扩展名白名单——只允许 .vue.js.ts
  3. 权限控制——每个组件都可以声明 required_permission,前端拿到 API 返回的组件列表后,只渲染当前用户有权限访问的组件
  4. 样式隔离——插件组件的样式影响范围需要控制,Vuetify 的 scoped 样式部分解决了这个问题

热插拔:装了就在,卸了就消失

插件卸载时,PluginRegistry.unregister_plugin() 会清理所有注册信息——API 路由、前端组件、配置入口、仪表盘模块、节点动作、数据库模型引用,统统删干净:

1
2
3
4
5
6
7
8
9
10
11
def unregister_plugin(self, plugin_id: str):
self._plugins.pop(plugin_id, None)
self._plugin_contexts.pop(plugin_id, None)
self._node_actions.pop(plugin_id, None)
self._api_routes.pop(plugin_id, None)
self._frontend_components.pop(plugin_id, None)
self._config_entries.pop(plugin_id, None)
# 连 Python 模块都从 sys.modules 里删掉
module_name = self._plugin_modules.pop(plugin_id, None)
if module_name and module_name in sys.modules:
del sys.modules[module_name]

sys.modules 里的引用都被清除,确保 Python 解释器不会缓存旧版代码。前端下次拉取组件列表时,卸载的插件就不在了——get_frontend_components_for_menu()get_dashboard_modules_for_enabled_plugins() 都只返回 status='enabled' 的插件。


运行时加载 Vue 组件不是银弹——每次加载都有编译开销,首屏会比静态打包慢一点,SSR 也变得更复杂(需要服务端有 vue3-sfc-loader 的 Node.js 实例)。但在插件系统这种场景下,灵活性比极致性能更重要:管理员装一个插件就能用,不用改代码、不用重新构建、不用等发版。

前面写过一篇关于 WebSocket 节点管理的文章,聊了连接保活、重连、认证这些事。但连接建立只是开始——消息在 WebSocket 连接上明文飞来飞去,中间人要是能插一脚,就能伪造指令让节点执行任意 shell 命令、删文件、拉镜像……后果不寒而栗。

TLS 能防窃听,但防不了应用层的伪造——万一节点 token 泄露了呢?万一有人拿到了 WebSocket 连接的权限呢?所以 ServerManager 在应用层又加了一道签名。不是所有消息都签,只签关键操作——那些真正有破坏力的。

三个级别,不该签的不浪费

先看动作注册表。每个 WebSocket 动作都有一个安全级别:

1
2
3
4
class SecurityLevel(Enum):
UNSIGNED = "unsigned" # 不需要签名
CHANNEL_AUTH = "channel_auth" # 连接认证过了就行
SIGNED = "signed" # 必须 HMAC 签名

心跳这种消息——node:heartbeat——级别是 CHANNEL_AUTH,只要 WebSocket 连接本身认证过了就够,不签名。但下面这些动作全是 SIGNED

  • execute:run_shell——跑 shell 命令
  • terminal:create_session——开终端
  • file_manager:write——写文件
  • docker:container:exec——容器里执行命令
  • node:close——关掉节点

光是看动作名就觉得脊背发凉的操作,全都需要签名验证。顺便说一下,这套注册表注册了 77+ 个动作,光 Docker 相关的就三十多个——镜像拉取删除、容器创建销毁重启、网络增删、卷管理、Compose 全家桶。每一个都被标注了方向(Panel→Node 还是 Node→Panel 还是双向)和安全级别。

1
2
3
4
5
6
7
8
# 注册表的设计——frozen dataclass,不可变
@dataclass(frozen=True)
class ActionSpec:
name: str
direction: Direction
security: SecurityLevel
payload_schema: Optional[Type] = None
description: str = ""

frozen=True 是有意为之的——注册表一旦建好就不应该被谁偷偷改掉。这跟整条链路的设计哲学一致:权限声明是静态的、可审计的、不可运行时篡改的。

签名的计算:把消息揉成一根绳

签名算法其实简单,关键是覆盖面要对:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def generate_signature(token: str, action: str, payload: dict) -> dict:
timestamp = str(time.time())
nonce = secrets.token_hex(16)
payload_hash = hashlib.sha256(
json.dumps(payload, sort_keys=True, separators=(',', ':')).encode('utf-8')
).hexdigest()
message = f"{action}:{timestamp}:{nonce}:{payload_hash}"
signature = hmac.new(
token.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
return {
"timestamp": timestamp,
"nonce": nonce,
"signature": signature,
}

几点设计考量:

载荷单独 hash。签名的消息不直接包含 payload JSON,而是包含 payload 的 SHA-256 摘要。这样签名跟载荷可以分开传输——WebSocket 消息里 payload 放 data 字段,签名放 _sign 字段,结构清晰。

sort_keys=True + separators=(',', ':')。JSON 序列化在不同的 Python 版本、不同的解释器里,key 的顺序可能不一样。加 sort_keys 确保无论谁算出来的 JSON 字符串都一样。separators=(',', ':') 去掉空格,避免空白字符的差异。

nonce 是 32 字节的随机十六进制。每次签名都不同,哪怕 payload 一模一样。

验证:时间漂移 + 重放防御

收到消息后,验证这步比生成要关键得多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def verify_signature(token, action, payload, sign_data, nonce_scope=None):
timestamp = sign_data.get("timestamp")
nonce = sign_data.get("nonce")
signature = sign_data.get("signature")

# 1. 时间窗校验——5分钟容差
req_time = float(timestamp)
if abs(time.time() - req_time) > ALLOWED_TIME_DRIFT: # 300秒
return False

# 2. 重算签名并比对
payload_hash = hashlib.sha256(
json.dumps(payload, sort_keys=True, separators=(',', ':')).encode('utf-8')
).hexdigest()
message = f"{action}:{timestamp}:{nonce}:{payload_hash}"
expected = hmac.new(
token.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(expected, signature):
return False

# 3. Nonce 唯一性——同一个 scope 内 nonce 只能用一次
if nonce_scope is not None and not _claim_nonce(nonce_scope, action, nonce):
return False

return True

三道关,一道比一道狠:

时间漂移——节点和服务端的时钟不可能完全同步,5 分钟的容差覆盖了 NTP 偶尔漂移的场景。超过 5 分钟的消息直接拒绝。

签名比对——hmac.compare_digest() 而不是 ==,前者是恒定时间比较,防时序攻击。用 == 的话,攻击者可以通过比较响应时间逐字符爆破签名。

Nonce 防重放——_claim_nonce() 用 Redis 的 cache.add() 实现原子性 nonce 消耗。add 只在 key 不存在时成功,刚好可以判断 nonce 是否已经被用过。nonce 的 key 包含 action 的哈希和 scope(一般是 Consumer类名:节点UUID),所以不同节点的同动作 nonce 不会冲突,但同一节点同一动作的同一个 nonce 只能用一次:

1
2
3
4
5
def _claim_nonce(nonce_scope, action, nonce):
scope_hash = hashlib.sha256(str(nonce_scope or "default").encode("utf-8")).hexdigest()[:16]
action_hash = hashlib.sha256(str(action or "").encode("utf-8")).hexdigest()[:16]
key = f"{NONCE_CACHE_PREFIX}:{scope_hash}:{action_hash}:{nonce}"
return bool(cache.add(key, "1", timeout=ALLOWED_TIME_DRIFT + 30))

TTL 设成 330 秒(比 300 秒时间窗多 30 秒),确保过期 nonce 的 key 自动清理,Redis 不会堆积无用数据。

在 WebSocket 消费者里自动签名/验签

WebSocket Consumer 的基类 AsyncBaseConsumer 把签名和验证做成了自动行为——正常业务代码完全不用管:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class AsyncBaseConsumer(AsyncWebsocketConsumer, ABC):
verify_incoming_signatures = True # 入站方向验签
sign_outgoing_actions = True # 出站方向签名

async def receive(self, text_data=None, bytes_data=None):
data = json.loads(text_data)
action = data.get('action')
payload = data.get('data', None)

# 如果这个动作需要签名,但消息里没有 _sign 字段——直接拒绝
if self.verify_incoming_signatures and needs_signature(action):
sign_data = data.get('_sign')
if not sign_data:
return await self.send(json.dumps({
"action": "error:signature_required",
"data": {"action": action}
}))
node_token = self.__get_node_token()
if not verify_signature(node_token, action, payload or {}, sign_data,
nonce_scope=self.__signature_nonce_scope()):
return await self.close(3001) # 签名验证失败——断开连接

# 分发到对应的 action handler
if action in self.actions:
await self.actions[action](self, payload)

async def send_action(self, action, payload=None):
data = {"action": action, "data": deepcopy(payload)}
if self.sign_outgoing_actions and needs_signature(action):
node_token = self.__get_node_token()
if node_token:
data['_sign'] = generate_signature(node_token, action, payload or {})
await self.send(json.dumps(data, cls=ComplexEncoder))

两处亮点:

签名验证失败断开连接(code 3001),不是返回错误消息然后继续。这很重要——既然签名对不上,说明对方要么不是合法节点,要么通信已经出了问题,再聊下去没有意义。

缺少签名时返回 error:signature_required 而不是断连,这是一个阶梯式的安全策略:可能对方还没实现签名功能(比如旧版节点),给个提示比直接踢掉更友好。但合法的伪造签名?那就是攻击了,断连没商量。

日志也要脱敏

顺带一提,WebSocket 消息进日志的时候,密码、token、密钥这些字段会被脱敏:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SENSITIVE_LOG_KEYS = {
'password', 'passwd', 'pwd', 'token', 'node_token',
'client_token', 'server_token', 'access', 'refresh',
'authorization', 'signature', '_sign', 'secret', 'secret_key',
}

def sanitize_ws_payload(value, depth=0):
if isinstance(value, dict):
result = {}
for key, item in value.items():
if str(key).lower() in SENSITIVE_LOG_KEYS:
result[key] = '***'
else:
result[key] = sanitize_ws_payload(item, depth + 1)
return result
# ...

签名值本身(signature_sign)也被脱敏了。日志里留下的只是一堆星号,攻击者就算拿到日志也反推不出 node_token。

小结

这套 HMAC 签名机制不是在造轮子——它是在 WebSocket 这种无状态长连接上,用最小的开销实现了:

  1. 消息完整性——payload 被篡改会导致签名校验失败
  2. 消息来源认证——只有持有 node_token 的一方才能生成合法签名
  3. 防重放——同一个 nonce 只能用一次,截获消息重发无效
  4. 时间窗保护——过期消息自动作废
  5. 阶梯式响应——缺少签名给提示,伪造签名直接断连

配合 TLS,就形成了两层保护:TLS 保证传输加密,HMAC 保证即使连接被劫持,关键指令也无法伪造或重放。

巡检车上的 YOLO 推理节点跑在树莓派上,每秒处理 2 帧。检测到病虫害就往后端上传图片。问题在于:车停着不动的时候,相机拍到的画面几乎一样,YOLO 检测到的框也几乎一样——但每帧都上传,后端收到的全是重复图片。

一帧 JPEG 85 质量大约 200KB,每秒 2 帧 = 400KB/s。果园 WiFi 带宽有限,YOLO 上传把其他数据(心跳、传感器、地图)全挤掉了。

解法:两重变化检测——SSIM 图像相似度 + bbox 签名去重。相似的帧不传。

SSIM 变化检测:20 行实现结构相似度

SSIM(Structural Similarity Index)衡量两张图的相似程度,范围 0~1,1 表示完全相同。阈值设为 0.985——低于这个值才认为”画面变了”。

关键是:不拿原图算 SSIM,先把图缩到 160×90 灰度再算。640×400 的 RGB 图算 SSIM 大概 20ms,160×90 灰度图只要 0.5ms——在树莓派上差别很大。

1
2
3
def _to_change_gray(self, frame):
small = cv2.resize(frame, (160, 90), interpolation=cv2.INTER_AREA)
return cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)

SSIM 的计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@staticmethod
def _compute_ssim(prev_gray, curr_gray):
a = prev_gray.astype(np.float32)
b = curr_gray.astype(np.float32)
c1 = (0.01 * 255) ** 2
c2 = (0.03 * 255) ** 2

mu_a = cv2.GaussianBlur(a, (11, 11), 1.5)
mu_b = cv2.GaussianBlur(b, (11, 11), 1.5)
mu_a2 = mu_a * mu_a
mu_b2 = mu_b * mu_b
mu_ab = mu_a * mu_b

sigma_a2 = cv2.GaussianBlur(a * a, (11, 11), 1.5) - mu_a2
sigma_b2 = cv2.GaussianBlur(b * b, (11, 11), 1.5) - mu_b2
sigma_ab = cv2.GaussianBlur(a * b, (11, 11), 1.5) - mu_ab

num = (2 * mu_ab + c1) * (2 * sigma_ab + c2)
den = (mu_a2 + mu_b2 + c1) * (sigma_a2 + sigma_b2 + c2)
den = np.where(den == 0.0, 1e-6, den)
ssim_map = num / den
return float(np.mean(ssim_map))

这是 SSIM 的标准公式,用 GaussianBlur 替代显式窗口卷积——OpenCV 的 GaussianBlur 底层做了优化,比手写卷积快得多。11×11 高斯核、σ=1.5 是论文推荐值。

整个计算没有调用 skimage 或其他重型库,纯 NumPy + OpenCV,部署简单。

bbox 签名去重:框没变就不传

SSIM 检测画面变化,但有时候画面变了框没变——车稍微挪了一下,SSIM 降到了 0.98,但 YOLO 检测到的病虫害位置完全一样。这种帧传上去也没用。

bbox 签名:把检测框的位置归一化后编码成 tuple,跟上次上传的签名比对。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@staticmethod
def _build_detection_signature(detections, frame_w, frame_h):
inv_w = 1.0 / float(max(1, frame_w))
inv_h = 1.0 / float(max(1, frame_h))
signature = []
for det in detections:
x1, y1, x2, y2 = det.box_xyxy
signature.append((
int(det.class_id),
round(float(det.confidence), 2),
round(float(x1) * inv_w, 3),
round(float(y1) * inv_h, 3),
round(float(x2) * inv_w, 3),
round(float(y2) * inv_h, 3),
))
signature.sort()
return tuple(signature)

归一化到 [0, 1] 范围是因为:同一场景在不同分辨率下检测,框的像素坐标不同但归一化位置相同。round(..., 3) 精度是千分之几像素,足够区分不同位置但容许微小抖动。

双重判定逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _should_upload_changed(self, frame, detections):
curr_gray = self._to_change_gray(frame)
curr_sig = self._build_detection_signature(detections, frame_w, frame_h)

if self._last_uploaded_gray_small is None:
return True, curr_gray, curr_sig # 第一帧,必须传

boxes_changed = curr_sig != self._last_uploaded_signature
if self.upload_change_method == 'boxes':
return boxes_changed, curr_gray, curr_sig # 纯框签名模式

ssim = self._compute_ssim(self._last_uploaded_gray_small, curr_gray)
changed = boxes_changed or (ssim < 0.985)
return changed, curr_gray, curr_sig

两种模式可选:

  • boxes 模式:只在检测框变化时上传。适合检测目标很少、位置变化明显的场景
  • ssim 模式(默认):框变了或画面变了都上传。覆盖面更广,适合画面变化频繁的场景

两个条件是 OR 关系:只要有一个变化就传。宁可多传几张,也不能漏掉真正的新检测结果。

上传队列:满了丢旧的

变化检测之后,符合条件的帧进入上传队列。队列 maxsize=32,单独线程消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def _enqueue_upload(self, payload):
try:
self._upload_queue.put_nowait(payload)
return
except queue.Full:
pass

if self._upload_drop_oldest:
try:
self._upload_queue.get_nowait() # 丢弃最旧的
self._upload_queue.task_done()
except queue.Empty:
pass
try:
self._upload_queue.put_nowait(payload)
except queue.Full:
pass

upload_queue_drop_oldest=True:队列满了丢弃最旧的帧。为什么不丢最新的?因为最新帧包含最新的位置信息——位置对了才能在地图上正确标注病虫害。旧帧的位置信息已经过时了。

模型热替换:换模型不重启

后端可以通过运行时配置下发新的 ONNX 模型 URL,推理节点下载并替换模型,不用重启 Docker:

1
2
3
4
5
6
def _download_file(self, uri, target_path, label):
temp = target.with_suffix(target.suffix + '.tmp')
resp = requests.get(uri, timeout=20)
temp.write_bytes(resp.content)
temp.replace(target) # 原子替换
return True

tempfile + os.replace 保证原子性——下载中途断电,.tmp 文件是不完整的,但原始模型文件不受影响。下次启动用旧模型继续跑。

下载完后重建 ONNX session:

1
2
3
def _rebuild_engine(self, model_path, conf_threshold, iou_threshold):
engine = YoloOnnxEngine(model_path=model_path, ...)
self.engine = engine

_rebuild_engine_on_image 之外被调用,不会中断正在执行的推理。下一次 _on_image 回调时自动用新模型。

实际效果

指标无变化检测SSIM + 签名去重
上传帧率(静止时)2 fps~0.1 fps
上传帧率(移动时)2 fps~1.5 fps
带宽占用~400 KB/s~30 KB/s(静止时)
漏检率0≈0(OR 逻辑兜底)

静止时带宽从 400KB/s 降到 30KB/s——降了 92%。心跳和传感器数据再也不被挤掉了。

0.985 的 SSIM 阈值是调出来的:0.99 太敏感(光照微变就传),0.97 太迟钝(病虫害移出画面了还不传)。0.985 在树莓派上的实际测试效果最好。

Nav2 启动后偶尔导航不工作。看日志,bt_navigator 报 “backup action server not available”,然后加载 BT XML 失败,节点进入 inactive 状态。重启又好了——典型的竞态条件。

这个问题困扰了我们两周。后来写了个 200 行的脚本彻底解决了。

问题根因

Nav2 的启动流程是这样的:

  1. Lifecycle Manager 按配置顺序激活节点:controller_serverplanner_serverbehavior_serverbt_navigatorwaypoint_follower
  2. bt_navigator 激活时加载 BT XML 文件,BT XML 里引用了 BackUpSpinWait 等 action
  3. bt_navigator 尝试注册这些 action client,发现 behavior_server 的 action server 还没就绪
  4. 加载失败 → bt_navigator 变成僵尸节点

为什么 action server 没就绪?因为 Lifecycle Manager 激活 behavior_server 后,节点状态虽然变成了 active,但 action server 的注册和发布需要几百毫秒。Lifecycle Manager 不等这个——它立即去激活下一个节点。

这个时序在性能好的机器上几乎不出现,在树莓派上(CPU 慢、DDS 发现延迟大)出现率超过 30%。

解法:把 bt_navigator 从 Lifecycle Manager 里摘出来

关键思路:Lifecycle Manager 只管除了 bt_navigator 以外的节点。等所有 action server 确实就绪了,再手动 configure + activate bt_navigator

第一步:Launch 配置排除 bt_navigator

1
2
3
4
5
6
7
8
9
# bringup.launch.py
lifecycle_nodes = [
'controller_server',
'planner_server',
'behavior_server',
# 'bt_navigator', ← 注释掉,不归 lifecycle_manager 管
'waypoint_follower',
'smoother_server',
]

第二步:等 action server 就绪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
ACTION_SERVERS = {
'back_up': BackUp,
'spin': Spin,
'wait': Wait,
'drive_on_heading': DriveOnHeading,
'assisted_teleop': AssistedTeleop,
'follow_path': FollowPath,
'compute_path_to_pose': ComputePathToPose,
'compute_path_through_poses': ComputePathThroughPoses,
'smooth_path': SmoothPath,
}

class ActivateBtNavigator(Node):
def __init__(self):
self._action_clients = {}
for name, action_type in ACTION_SERVERS.items():
self._action_clients[name] = ActionClient(self, action_type, name)

self._phase = 'waiting'
self._timer = self.create_timer(2.0, self._tick)

def _check_servers(self):
ready, not_ready = [], []
for name, client in self._action_clients.items():
if client.server_is_ready():
ready.append(name)
else:
not_ready.append(name)

if not not_ready:
self._phase = 'configuring'

每 2 秒检查一次,9 个 action server 全部 server_is_ready() 才进入下一步。超时 120 秒放弃。

第三步:手动 configure + activate

1
2
3
4
5
6
7
8
9
10
11
12
13
def _do_configure(self):
req = ChangeState.Request()
req.transition.id = Transition.TRANSITION_CONFIGURE
future = self._change_state_cli.call_async(req)
future.add_done_callback(self._on_configure_done)
self._phase = 'configure_pending'

def _do_activate(self):
req = ChangeState.Request()
req.transition.id = Transition.TRANSITION_ACTIVATE
future = self._change_state_cli.call_async(req)
future.add_done_callback(self._on_activate_done)
self._phase = 'activate_pending'

通过 bt_navigator/change_state 服务调用 ROS2 生命周期状态机,先 CONFIGURE 再 ACTIVATE——跟 Lifecycle Manager 内部做的事一模一样,只是我们手动控制了时序。

第四步:启动脚本加入 launch

1
2
3
4
5
6
7
# navigation_base.launch.py
Node(
package='navigation',
executable='wait_for_action_servers',
name='activate_bt_navigator',
arguments=['--ros-args', ...],
),

在 lifecycle_manager 之后启动。它自己会等,不会阻塞其他节点。

为什么不用 sleep

你可能会想:在 launch 文件里加个 TimerAction(delay=10.0) 不就行了?

不行。10 秒在大部分时候够了,但在 CPU 占用高的时候不够——behavior_server 可能 12 秒才就绪。sleep 太短问题还在,太长拖慢启动。而且 sleep 不检查实际状态——即使 action server 1 秒就准备好了,也得干等 10 秒。

主动轮询 server_is_ready() 是唯一可靠的方法:action server 一好就继续,不好就一直等,有超时兜底。

还要等 service

除了 action server,bt_navigator 还依赖一些 service,比如 reinitialize_global_localization(AMCL 重定位服务)。如果这个 service 没好,BT 里的 ComputePathToPose 节点可能在运行时找不到服务客户端。

1
2
3
4
5
6
7
8
self.declare_parameter('required_services', ['/reinitialize_global_localization'])

for service_name in self._required_services:
self._service_clients[service_name] = self.create_client(Empty, service_name)

# 在 _check_servers 里一起检查
if (not not_ready) and (not not_ready_services):
self._phase = 'configuring'

service 列表通过参数配置,不同部署可以加不同的依赖。

效果

改之前:树莓派上 30%+ 概率启动失败,需要手动重启 Docker。改之后:100% 成功启动,额外等待时间 2~8 秒(取决于 action server 就绪速度)。

这个 workaround 不是最优雅的——理想情况下 Nav2 的 Lifecycle Manager 应该自己处理这个竞态。但在他们修之前,200 行脚本是性价比最高的解法。

巡检车上装了两个距离传感器:MS200 激光雷达和 Aurora 930 Pro 深度相机。激光雷达 360° 扫描但前方有盲区,深度相机只看前方但近距离精度高。SLAM 和 Nav2 需要一条统一的 /scan 话题——把两个传感器的数据合成一条扫描线,就是 Scan Fusion 要干的活。

听起来简单:把深度相机的点投影到雷达坐标系,重叠区域取最近的。但实际做起来有三个坑:坐标系变换、点数不一致、和那个让 SLAM 崩溃的可变长度问题。

融合逻辑:前方 ±70° 取最近

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def _fuse_scan_ranges(self, raw, depth, transform):
fused = list(raw.ranges) # 以雷达扫描为底

# 深度相机每个点:坐标变换 → 投影到雷达角度 → 取最近
for r in depth.ranges:
# 1. 深度相机坐标系下的点
x_d = r * math.cos(angle)
y_d = r * math.sin(angle)

# 2. 变换到雷达坐标系
x_r = c * x_d - s * y_d + tx
y_r = s * x_d + c * y_d + ty
range_r = math.hypot(x_r, y_r)
angle_r = math.atan2(y_r, x_r)

# 3. 超出前方 ±70° 丢弃
if abs(angle_r) > self._fuse_front_half_angle_rad:
continue

# 4. 投射到雷达对应的角度 bin
idx = int((angle_r - raw_angle_min) / raw_angle_inc + 0.5)

# 5. 取最近障碍物
current = fused[idx]
if (not math.isfinite(current)) or (range_r < current):
fused[idx] = float(range_r)

return fused

五步流水线:深度点 → 坐标变换 → 角度过滤 → bin 映射 → 最近优先。

min 融合策略很关键:如果雷达说前方 1.5m 有东西,深度相机说 0.3m 有东西,那 0.3m 是对的——深度相机在近距离更准。反过来,如果雷达说有东西但深度没检测到,雷达的值保留——深度相机可能因为反光丢了数据。

坐标变换:TF2 自动查

深度相机和雷达不在同一个位置,中间有个物理偏移和旋转。ROS2 的 TF2 系统维护了所有坐标系的变换关系:

1
2
3
4
5
6
7
8
9
10
11
try:
transform = self._tf_buffer.lookup_transform(
raw.header.frame_id, # 目标:雷达坐标系
depth.header.frame_id, # 源:深度相机坐标系
rclpy.time.Time(),
timeout=rclpy.duration.Duration(seconds=0.05),
)
except TransformException:
# TF 不可用,退化为纯雷达模式
self._scan_pub.publish(self._build_output_scan(raw, list(raw.ranges), ...))
return

TF 不可用时不融合,直接透传原始雷达数据——降级而不是崩溃。

固定点数:SLAM 的隐性要求

这是最大的坑。SLAM 算法(slam_toolbox)在初始化时记录第一条 LaserScan 的 angle_minangle_maxangle_incrementranges 长度,后续所有帧必须跟第一帧完全一致。如果某一帧的 ranges 少了几个点,SLAM 直接崩溃。

问题是:深度相机的点数会变。相机帧率波动时,深度转激光扫描的输出点数可能从 360 变成 358 或 362。融合后的点数跟着变——SLAM 就炸了。

解法:强制固定点数,用最近邻重采样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def _normalize_scan_payload(self, template, ranges, intensities):
if not self._enforce_fixed_scan_size:
return ranges, intensities

# 确定目标点数
if self._fixed_scan_size > 0:
target_size = self._fixed_scan_size
else:
# 第一帧的点数作为标准
if self._expected_scan_size <= 0:
self._expected_scan_size = max(1, len(ranges))
target_size = self._expected_scan_size

normalized_ranges = self._resample_nearest(ranges, target_size, fill=template.range_max)
normalized_intensities = self._resample_nearest(intensities, target_size, fill=0.0)
return normalized_ranges, normalized_intensities

最近邻重采样:

1
2
3
4
5
6
7
8
9
10
11
12
@staticmethod
def _resample_nearest(values, target_size, fill):
src = len(values)
if src == target_size:
return list(values)

scale = float(src - 1) / float(target - 1)
out = []
for i in range(target):
idx = int(round(i * scale))
out.append(float(values[idx]))
return out

从 358 点到 360 点:多出来的 2 个位置通过线性插值找最近的原始点填充。从 362 到 360:多余的 2 个点被均匀丢弃。变化在 ±2 点以内时,对精度几乎没有影响。

深度超时:相机卡了就别用

深度相机偶尔会卡帧——ROS topic 几百毫秒没有新消息。如果用过期的深度数据融合,车已经走了 0.5 米了,融合出来的障碍物位置全错。

1
2
3
if depth is None or depth_age > self._depth_timeout_sec:  # 0.4 秒
self._scan_pub.publish(self._build_output_scan(raw, list(raw.ranges), ...))
return

0.4 秒超时。深度数据超过 0.4 秒未更新就丢弃,退化为纯雷达模式。这个阈值比雷达扫描周期(~100ms)大不少,留了余量处理网络延迟。

运行时热更新

所有融合参数都可以通过 /agri/runtime_config 话题热更新——不用重启节点就能开关融合、调整角度范围、修改最小距离。

1
2
3
4
5
6
7
8
9
def _on_runtime_config(self, msg):
payload = json.loads(msg.data)
self._apply_runtime_config_payload(payload)

def _apply_runtime_config_payload(self, payload):
fusion_cfg = runtime_cfg.get('scan_fusion')
self._depth_fusion_enabled = bool(fusion_cfg.get('depth_fusion_enabled', ...))
self._fuse_front_half_angle_deg = max(0.0, float(fusion_cfg.get('fuse_front_half_angle_deg', ...)))
...

带版本号和签名去重,同一个配置不会重复应用。这在调试时特别有用——现场发现融合角度太窄导致漏检,改个配置就生效,不用 SSH 上去重启 Docker。

效果

指标纯雷达雷达 + 深度融合
前方最小检测距离~0.3m~0.1m
低矮障碍检测容易漏(激光扫过去)能检测到
玻璃/反光表面检测不到看深度相机
SLAM 稳定性稳定固定点数后稳定
CPU 开销0+3%(重采样+融合)

3% 的 CPU 换来前方近距离检测能力的质变,在树莓派上完全可以接受。

巡检车在甘蔗田里跑,卡住是家常便饭。可能是轮子陷进泥里,可能是两侧甘蔗秆夹住了,也可能是 Nav2 规划了一条窄缝路但车身太宽过不去。

Nav2 自带的恢复行为——原地旋转、后退、清除代价地图——在开阔环境还行,甘蔗田里经常越转越卡。我们需要一个更聪明的脱困策略:先看哪个方向能走,再往那个方向走。

核心思路:射线投射 + 楔形评分

算法分三步:

  1. 从 local costmap 出发,360° 分成 12 个楔形区域
  2. 每个楔形投射 5 条射线,算平均自由距离
  3. 选得分最高的方向:距离远 + 方向偏前方优先
1
2
3
4
5
6
7
        楔形 0
/ | \
/ 射线射线射线射线射线 \
楔形11 🤖 楔形 1
\ /
\ | /
楔形 6(正后方)

射线投射:逐格走,碰到障碍停下

1
2
3
4
5
6
7
8
9
10
11
def _cast_ray(self, grid, start_col, start_row, angle, max_cells, width, height, resolution):
dx = math.cos(angle)
dy = math.sin(angle)
for step in range(1, max_cells + 1):
col = int(start_col + dx * step)
row = int(start_row + dy * step)
if col < 0 or col >= width or row < 0 or row >= height:
return step * resolution # 出界,返回到边界的距离
if grid[row, col] >= self.lethal_threshold: # 200 = 致命障碍
return (step - 1) * resolution # 碰到障碍,返回前一格的距离
return max_cells * resolution # 射线全长无障碍

逐格步进,简单粗暴但可靠。max_cells = 1.0m / resolution,也就是最远看 1 米。在甘蔗田里 1 米够了——太远了看出去全是障碍,没有指导意义。

楔形评分:远 + 偏前方 = 好

1
2
3
4
5
6
7
8
9
10
11
12
for i in range(self.num_wedges):  # 12
wedge_center = -math.pi + (i + 0.5) * wedge_angle
ray_distances = []
for r in range(self.rays_per_wedge): # 5
ray_angle = wedge_center + (r - 2) * (wedge_angle / 5)
free_dist = self._cast_ray(grid, robot_col, robot_row, ray_angle, ...)
ray_distances.append(free_dist)

avg_distance = sum(ray_distances) / len(ray_distances)
angle_diff = abs(normalize_angle(wedge_center - robot_yaw))
penalty = 0.3 * (angle_diff / math.pi)
score = avg_distance - penalty

0.3 的航向惩罚权重意味着:正前方 0.7m 的通道和正后方 1.0m 的通道得分相同。如果前后都差不多远,优先往前走——因为车大概率是从后面来的,往前走是脱离当前困境最自然的方向。

全方向都堵死了?纯后退

如果所有楔形的平均自由距离都低于 0.15m,说明车被完全包围——360° 都是障碍。这时候任何转向都没用,唯一的选择是倒车。

1
2
3
4
5
6
7
if all_low:
return EscapeVector(
angle=normalize_angle(robot_yaw + math.pi), # 正后方
distance=best_distance,
score=best_score,
fallback=True, # 标记为降级模式
)

降级模式不旋转,直接以 0.1 m/s 的速度倒退最多 3 秒。低速倒退是为了安全——万一后面也有东西,撞上不至于太惨。

两步执行:先转向,再前进

选定方向后,脱困动作分两步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def execute_recovery(self, escape, robot_yaw, publish_twist_fn, stop_fn, is_safety_blocked_fn):
# 第一步:原地旋转到逃逸方向
yaw_error = normalize_angle(escape.angle - robot_yaw)
rotate_duration = abs(yaw_error) / 0.6 # 0.6 rad/s 角速度
if rotate_duration > 0.05:
publish_twist_fn(0.0, rotate_dir * 0.6, min(rotate_duration, 5.0))

# 第二步:低速前进到安全区域
forward_duration = min(escape.distance / 0.10, 5.0) # 0.10 m/s
if forward_duration > 0.05:
publish_twist_fn(0.10, 0.0, min(forward_duration, 5.0))

stop_fn()
return RecoveryResult.SUCCESS

两步之间都有安全检查——如果安全节点发了刹车信号,立即停止。超时 15 秒强制结束,防止脱困动作无限执行。

为什么不用 Nav2 的恢复行为

Nav2 的 BackUp 只会直线后退,Spin 只会原地旋转,ClearCostmap 只是清除障碍记录——三个行为互相独立,不感知代价地图的内容。

12 楔射线算法的优势在于看地图选方向。如果车卡在通道中间,左边是墙右边是缝,Nav2 的后退可能会把车推到更窄的地方;射线算法知道右边 30° 有 0.8m 的通道,会先转向再前进。

而且这个算法是纯计算模块,不依赖 ROS 节点——可以在任何地方调用,甚至可以在安全节点里用。

几个参数

参数默认值说明
num_wedges12360° / 12 = 30° 一个楔形
rays_per_wedge5每楔 5 条射线,抗噪声
max_ray_distance_m1.0看多远
heading_penalty0.3方向权重
escape_linear_vel0.10脱困线速度,慢一点安全
escape_angular_vel0.6脱困角速度
total_timeout_sec15.0总超时

30° 一个楔形是经验值——再细了(比如 10°)方向太多不好选,再粗了(比如 60°)精度不够。5 条射线抗单条射线噪声:万一某条射线正好穿过一个障碍物间隙,其他 4 条会把它拉回来。

IoT 设备跟后端通信,最怕三件事:别人冒充你的设备发假数据、数据在半路被改了、连着连着断了没人管。

我们的巡检车跑在果园里,通过 WiFi 连后端。WiFi 不靠谱,网络随时断;果园不是机房,谁都能插根网线进来。所以通信层得自己管安全。

这套通信客户端大概 600 行,但把证书生命周期、请求签名、WebSocket 自动重连全包了。这篇拆开讲。

自签证书:首次开机自动生成

巡检车出厂时没有证书。如果要求人工配证书,部署 50 台车得配 50 次——不现实。

解法:首次启动自动生成 RSA 2048 密钥 + 自签证书

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def _ensure_certificate_material(self):
cert_path = Path(self.client_cert_file)
key_path = Path(self.client_key_file)

if cert_path.is_file() and key_path.is_file():
return # 已有,跳过

if not self.cert_auto_bootstrap:
raise RuntimeError("巡检车证书材料缺失")

# 生成 RSA 2048 私钥
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
key_pem = private_key.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=NoEncryption(),
)
self._write_bytes(str(key_path), key_pem)

# 用私钥签发自签证书
cert_pem = self._generate_bootstrap_certificate(private_key)
self._write_text(str(cert_path), cert_pem)

证书的 CN(Common Name)设为 rover_code,比如 cane_rover_001。后端收到注册请求时,用证书里的公钥验签名——如果签名对得上,说明请求确实来自持有私钥的设备。

证书自动续签:到期前 15 天换新

证书有效期 365 天。如果等过期了再换,车就再也连不上后端了——因为签名验不过去。

1
2
3
4
5
6
7
8
9
10
11
12
def _maybe_renew_certificate(self):
current_cert = x509.load_pem_x509_certificate(self._client_cert_pem.encode("utf-8"))
now = datetime.datetime.now(datetime.timezone.utc)
deadline = now + datetime.timedelta(days=15) # 到期前 15 天
if current_cert.not_valid_after_utc > deadline:
return False # 还早着呢

# 续签:用同一个私钥,新有效期
renewed_pem = self._generate_self_signed_certificate(current_cert)
self._write_text(self.client_cert_file, renewed_pem)
self._cert_sync_required = True # 标记需要同步给后端
return True

续签后设 _cert_sync_required = True,下次注册/心跳时把新证书的 PEM 和指纹上报给后端,后端更新数据库里的记录。全程自动,不需要人介入。

请求签名:不是 OAuth,不是 mTLS,是自己搓的

每个 HTTP 请求都带签名,签的内容覆盖五个字段:

1
2
3
4
5
6
7
8
9
10
def _auth_headers(self, method, path, payload):
ts = int(time.time())
payload_hash = hashlib.sha256(self._payload_bytes(payload)).hexdigest()
message = f"{method.upper()}\n{path}\n{payload_hash}\n{ts}\n{self.rover_code}".encode("utf-8")
signature = base64.b64encode(self._sign_message(message)).decode("ascii")
return {
"X-Rover-Code": self.rover_code,
"X-Rover-Timestamp": str(ts),
"X-Rover-Signature": signature,
}

签名消息的格式:METHOD\nPATH\nSHA256(PAYLOAD)\nTIMESTAMP\nROVER_CODE,换行分隔。

为什么签这么多东西?

  • METHOD + PATH:防止把 POST 请求的签名挪到 DELETE 上用
  • SHA256(PAYLOAD):防止篡改请求体(payload 可能很大,签 hash 而不是原文)
  • TIMESTAMP:防重放攻击——后端检查时间戳,超过 5 分钟的请求直接拒
  • ROVER_CODE:防止 A 车的签名被挪到 B 车的请求上

签名算法支持四种密钥类型:

1
2
3
4
5
6
7
8
9
def _sign_message(self, message):
if isinstance(self._private_key, rsa.RSAPrivateKey):
return self._private_key.sign(message, padding.PKCS1v15(), hashes.SHA256())
if isinstance(self._private_key, ec.EllipticCurvePrivateKey):
return self._private_key.sign(message, ec.ECDSA(hashes.SHA256()))
if isinstance(self._private_key, ed25519.Ed25519PrivateKey):
return self._private_key.sign(message)
if isinstance(self._private_key, ed448.Ed448PrivateKey):
return self._private_key.sign(message)

RSA + PKCS1v15 是最通用的,Ed25519 是最快的。现场部署哪种都行,代码自动适配。

WebSocket:断了自动重连,消息不丢

WebSocket 是小车和后端的双向通道——后端下发任务、控制指令,小车上报状态。WiFi 断了是常态,重连必须自动。

1
2
3
4
5
6
7
def _ws_loop(self):
while not self._stop_event.is_set():
self._run_single_ws_session()
if self._stop_event.is_set():
break
self._connected.clear()
time.sleep(2.0) # 断了等 2 秒再连

外层循环管重连,内层 _run_single_ws_session 管单次会话。

发送端用有界队列(maxsize=300),单独线程消费:

1
2
3
4
5
6
7
8
9
10
11
12
def _ws_sender_loop(self, ws_app, session_stop_event):
while not session_stop_event.is_set():
payload = self._send_queue.get(timeout=0.3)
# 如果连接断了或者换了新 session,把消息塞回队列
if self._ws_app is not ws_app:
self._send_queue.put_nowait(payload)
break
if not self._connected.is_set():
self._send_queue.put_nowait(payload) # 塞回去,等重连
time.sleep(0.5)
continue
ws_app.send(json.dumps(payload))

关键设计:发送失败的消息塞回队列。不是丢了,是等重连成功后再发。这意味着短暂的断线不会丢失心跳和状态上报。

WebSocket 连接也带签名——在握手 header 里加 X-Rover-Signature,跟 HTTP 请求同一个签名逻辑:

1
2
3
4
def _build_ws_headers(self):
ws_path = urlparse(self.ws_url).path or "/"
headers = self._auth_headers("WS", ws_path, {})
return [f"{k}: {v}" for k, v in headers.items()]

mTLS 支持:如果后端要求双向证书

签名和 mTLS 不是二选一。签名是应用层防护,mTLS 是传输层防护。两个都开的话,即使网络被中间人劫持,没有客户端证书 TLS 握手就过不去;即使证书被偷了,没有私钥也签不出合法的请求。

1
2
3
4
5
if self.security_enabled:
self._http_client_cert = (self.client_cert_file, self.client_key_file)
if ws_secure:
self._ws_sslopt["certfile"] = self.client_cert_file
self._ws_sslopt["keyfile"] = self.client_key_file

HTTP 用 requestscert 参数,WebSocket 用 sslopt 字典,都是标准接口。

一个踩过的坑:路径尾部斜杠

后端有个路由是 /api/rover/map/upload,某些部署(Nginx 反代)会 301 重定向到 /api/rover/map/upload/(带尾斜杠)。重定向后请求路径变了,但签名里还是原路径——后端验签失败,返回 403。

解法:如果 403 且错误信息包含”签名校验失败”,自动用带尾斜杠的路径重试一次:

1
2
3
4
5
6
7
8
9
10
11
def upload_map(self, payload):
response = self._request("POST", "/api/rover/map/upload", payload)
if response is not None:
return response

last_error = (self._last_error or "").lower()
if ("http 403" in last_error) and ("签名校验失败" in last_error or "signature" in last_error):
retry = self._request("POST", "/api/rover/map/upload/", payload)
if retry is not None:
self._logger.info("地图上传重试(带尾斜杠)成功")
return retry

丑,但管用。根本解法是 Nginx 配置里关掉路径规范化,或者后端路由同时注册两个路径。

通用的前沿探索算法不管用——至少在甘蔗田里不管用。

甘蔗田的结构是平行的狭长通道,行距大约 1.5 米,行长约 100 米。通用算法选前沿点只看距离和簇大小,结果车走到一条通道尽头,转 180° 回头,走回起点,再进下一条——效率低到令人发指。更糟的是,它会在两条通道的交叉口反复选点,来回兜圈。

后来重写了探索算法,加了通道方向连续性和通道尽头检测。效果立竿见影:车走到通道尽头自然转入相邻通道,像蛇形走位一样逐行扫描,效率翻了三倍。

前沿提取:OpenCV 一把梭

前沿的定义:free 栅格(值=0)且相邻有 unknown 栅格(值=-1)。提取方法很暴力:

1
2
3
unknown_u8 = unknown.astype(np.uint8)
adj_unknown = cv2.dilate(unknown_u8, np.ones((3, 3), np.uint8), iterations=1).astype(bool)
frontier_mask = free & adj_unknown

膨胀一次 unknown,跟 free 做 AND——相邻有 unknown 的 free 格子就是前沿。然后用 cv2.connectedComponents 提取连通簇,每个簇就是一个候选探索区域。

BFS 可达性:别选到墙后面的前沿

前沿点可能在墙的另一面——看起来近,实际上到不了。通用算法不管这个,直接选最近的前沿,然后 Nav2 规划失败,重试,又失败。

解法:从机器人位置做 BFS,计算到所有 free 栅格的距离。只选可达的前沿点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def _bfs_distance(self, free, start_x, start_y):
dist = np.full((h, w), -1, dtype=np.int32)
queue = deque()
dist[start_y, start_x] = 0
queue.append((start_x, start_y))

while queue:
cx, cy = queue.popleft()
cd = dist[cy, cx]
for dx, dy in ((-1, 0), (1, 0), (0, -1), (0, 1)):
nx, ny = cx + dx, cy + dy
if 0 <= nx < w and 0 <= ny < h and free[ny, nx] and dist[ny, nx] < 0:
dist[ny, nx] = cd + 1
queue.append((nx, ny))
return dist

4 邻域 BFS,O(W×H) 时间。地图一般 200×200,毫秒级完成。

有个细节:如果机器人当前位置不在 free 上(比如刚好在 unknown 区域),算法会搜索周围 20 格内最近的 free 格子作为 BFS 起点。这在建图初期很常见——车刚启动,周围大部分都是 unknown。

通道方向连续性:顺着走,别回头

这是最关键的创新。评分公式:

1
2
3
score = reachable_dist
+ 0.4 * heading_penalty * reachable_dist # 方向偏差惩罚
- 0.03 * cluster_size # 大簇奖励

heading_penalty = |angle_to_goal - robot_yaw| / π,范围 0~1。正前方是 0,正后方是 1。

0.4 的权重意味着:正前方 10 米远的前沿和正后方 6 米远的前沿得分相同。车会优先往前走,而不是回头走已经走过的路。

大簇奖励 -0.03 × size 是因为:大前沿簇通常是未探索通道的入口,小簇可能是噪声或通道边缘的零碎前沿。给大簇加分,引导车进入新通道。

通道尽头检测:前方没路了,转隔壁

光有方向连续性还不够。车走到通道尽头,前方没有前沿了,但左右两侧可能有——那是相邻通道。如果方向惩罚太重,车会一直回头,永远不转入相邻通道。

解法:检测前方 ±45° 扇区内是否有前沿。如果没有,降低侧面前沿的方向惩罚:

1
2
3
4
5
6
7
8
9
10
11
12
13
has_forward = any(
abs(angle_diff) < π/4 for _, angle_diff in candidates
)

for goal, angle_diff in candidates:
heading_penalty = angle_diff / π
if not has_forward:
heading_penalty *= 0.3 # 通道尽头,大幅降低方向惩罚
score = (
world_dist
+ 0.4 * heading_penalty * world_dist
- 0.03 * goal.size
)

heading_penalty *= 0.3 把 90° 侧向的惩罚从 0.5 降到 0.15。侧面前沿的得分一下子就比回头的前沿高了——车自然转入相邻通道。

效果:车走到通道尽头,不回头,直接右转进下一条通道,像犁田一样逐行扫描。

黑名单:别再去失败的地方

Nav2 有时候规划失败——前沿点看起来可达,但路径上有窄缝过不去。不加处理的话,探索算法下一轮又选同一个点,又失败,死循环。

1
2
3
4
5
6
7
8
9
10
def add_failed_goal(self, x, y):
self._failed_goals.append((x, y))
if len(self._failed_goals) > 100:
self._failed_goals = self._failed_goals[-100:]

def _is_blacklisted(self, point):
for fg in self._failed_goals:
if distance(point, fg) < 0.5: # 0.5 米半径内
return True
return False

失败目标 0.5 米半径内的新候选全部跳过。最多记 100 个,防止内存泄漏。

黑名单没有过期时间——因为如果这个点真的到不了,永远不该再去;如果后来地图更新了,通道变宽了,BFS 可达性会自然找到新的前沿点。

地图边界过滤

前沿点如果紧贴地图边缘,Nav2 的 worldToMap 容易越界抖动。过滤掉距离边缘 1 格以内的候选:

1
2
3
4
margin = 1
if goal_mx <= margin or goal_mx >= (w - 1 - margin) or \
goal_my <= margin or goal_my >= (h - 1 - margin):
continue

跟通用算法比

通用前沿探索通道感知探索
选点策略最近距离优先方向连续 + 通道尽头自适应
通道尽头掉头回头自然转入相邻通道
不可达前沿反复尝试直到超时BFS 预过滤 + 黑名单
100m 通道扫描~3 次回头0 次回头,蛇形扫完
兜圈概率高(交叉口反复选点)低(方向惩罚抑制回头)

甘蔗田里跑下来,同样 2000 平方米的区域,通用算法要 25 分钟,通道感知版 8 分钟。差距主要来自消除了回头和兜圈。

巡检车在甘蔗田里跑着跑着,有人走过来把它搬走了——系统完全不知道,导航还在傻乎乎地算路径,电机还在空转。更离谱的:车从台阶上摔下去了,IMU 数据疯狂跳动,但代码只是在日志里打了几行 warning,该走的路还在走。

这些事真发生过。后来我写了一套基于 IMU 的安全状态机,四个独立的检测器,每个都走”阈值 + 时间窗口 + 状态机 + 滞回恢复”的模式。这篇把这几个检测器的逻辑拆开讲。

统一模式:阈值 + 时间窗口 + 滞回

所有检测器共享同一种设计:

  1. 信号超过阈值 → 开始计时
  2. 持续超过阈值达 N 秒 → 触发事件
  3. 信号回落到(阈值 - 滞回值)以下 → 开始恢复计时
  4. 持续低于恢复阈值达 M 秒 → 解除事件

滞回是关键。没有滞回的话,信号在阈值附近抖动,状态就会反复触发/解除,下游的导航系统跟着反复启停,比不检测还糟糕。

拿起检测:你的车被人抱走了

这是最常见的场景——工作人员搬车换位置,或者熊孩子把车提起来看。

判断依据两条路,满足任一即进入候选状态:

姿态路径:倾斜角 ≥ 50° 且 IMU 静止(加速度在 0.85g~1.15g,角速度 ≤ 18°/s)。持续 0.8 秒确认。

动态路径:加速度超出了”正常站着”的范围(< 0.7g 或 > 1.3g),或者角速度 ≥ 90°/s。持续 0.35 秒确认。比姿态路径快,因为车被猛地拎起来时加速度变化先于姿态变化。

1
2
3
4
5
6
7
8
pickup_pose_candidate = tilt >= 50.0 and stationary
pickup_dynamic_candidate = (
accel_magnitude_g <= 0.70
or accel_magnitude_g >= 1.30
or gyro_dps >= 90.0
)
pickup_candidate = pickup_pose_candidate or pickup_dynamic_candidate
pickup_confirm_target = 0.8 if pickup_pose_candidate else 0.35

恢复条件:倾斜角降到 50° - 8° = 42° 以下,且静止 0.8 秒。8° 的滞回防止车刚放下还没放稳就解除刹车。

摔落检测:自由落体 + 冲击

这个是最戏剧性的:车从高处掉下来。

检测分两步——先检测自由落体,再等冲击:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 第一步:自由落体
if accel_magnitude_g < 0.3: # 加速度小于 0.3g = 几乎失重
freefall_start_t = now
in_freefall = True

# 失重结束,检查持续时间
freefall_duration = now - freefall_start_t
if freefall_duration >= 0.08: # 至少 80ms
waiting_impact = True
impact_deadline = now + 0.50 # 500ms 内等待冲击

# 第二步:冲击
if waiting_impact:
if accel_magnitude_g >= 2.8: # 2.8g 冲击 = 着地了
fall_triggered = True
elif now > impact_deadline:
waiting_impact = False # 没等到冲击,可能是假警报

为什么不是”加速度大就算摔”?因为车正常行驶过减速带时加速度也能到 2g+。自由落体 → 冲击的时序组合才有意义:先失重再砸地,这是摔落的物理特征,过个坑不具备。

恢复条件比拿起更严格:倾斜 ≤ 15°、加速度正常、角速度低,三个条件同时满足 0.8 秒。摔了之后得确认车真的稳了。

侧翻检测:70° 倒在那儿

最简单的检测器:倾斜 ≥ 70° 且静止 1 秒。

为什么还要加”静止”?因为车高速转弯时瞬时倾斜可能很大,但在动态中 IMU 的倾斜计算精度下降。静止 + 大倾斜 = 真的翻了。

1
2
3
4
if tilt >= 70.0 and stationary:
tipover_start_t = now
if now - tipover_start_t >= 1.0:
tipover_triggered = True

恢复:倾斜回到 70° - 8° = 62° 以下且静止。8° 滞回防止车被扶到 69° 又倒回去的抖动。

深度相机:障碍物和坑洞

IMU 检测的是”车出事了”,深度相机检测的是”前面有东西”。

从深度图中间裁一个 ROI,计算两个比例:

  • obstacle_ratio:距离小于 0.45m 的像素占比 ≥ 6% → 有障碍物
  • invalid_ratio:深度值无效(0 或 inf)的像素占比 ≥ 55% → 有坑洞

坑洞检测有个特别坑的问题:甘蔗叶子和反光表面也会让深度值无效。深度相机说”前面是坑”,但其实是甘蔗叶子挡住了。

解法:雷达交叉验证。激光雷达不受反光影响,如果雷达说前方 30° 扇区内最近障碍物 > 0.35m(也就是”前面没问题”),那深度相机的坑洞判定就要求更高的 invalid_ratio(从 55% 提高到 90%)才触发。

1
2
3
4
5
6
7
if (
is_pit
and lidar_cross_validation_enabled
and (not lidar_confirms_obstacle())
and invalid_ratio < 0.90
):
is_pit = False # 雷达说没事,深度大概率是误报

坡道检测:不是障碍物,是斜坡

深度图里障碍物和坡道看起来一样——都是”近处有东西”。区别在于:坡道的深度是从远到近单调递减的,障碍物是突然出现的。

把 ROI 从上到下分成 4 个条带,计算每个条带的中位深度。如果从上到下深度严格递减(远处深近处浅,符合透视),且梯度 > 0.25m,且 IMU 俯仰角 < 25°,那就是坡道而不是障碍物——不触发刹车。

1
2
3
4
5
6
7
8
for i in range(1, len(medians)):
if medians[i] >= medians[i - 1]:
return False # 不单调递减,不是坡道

gradient = medians[0] - medians[-1]
if gradient < 0.25: return False
if abs(pitch_deg) > 25.0: return False
return True

IMU 俯仰角的二次验证很关键:真坡道上俯仰角会变化,如果深度看起来像坡道但 IMU 说车是平的,那可能是深度噪声。

帧数确认:别被一帧噪声骗了

所有深度检测都走帧数确认:连续 2 帧检测到障碍物才触发,连续 3 帧没有才解除。触发比解除快——安全第一,宁可误刹不可漏刹。

1
2
3
4
5
6
7
8
9
10
11
def _update_block_state(self, blocked_candidate, reason, ...):
if blocked_candidate:
self._block_candidate_streak += 1
self._clear_candidate_streak = 0
if self._block_candidate_streak >= self.block_confirm_frames: # 2
self._set_blocked(True, reason, ...)
else:
self._clear_candidate_streak += 1
self._block_candidate_streak = 0
if self._clear_candidate_streak >= self.clear_confirm_frames: # 3
self._set_blocked(False, '', ...)

几个调参经验

参数默认值怎么调
拿起倾斜角50°车体重心低可降到 45°,高可提到 55°
坠落失重阈值0.3g太高会被颠簸误触发
坠落冲击阈值2.8g过减速带大约 2g,留点余量
侧翻确认时间1.0s太短会被急转弯误判
恢复滞回核心参数,太小抖动太大
坑洞无效比55%有雷达交叉验证可降到 45%

这套东西跑了几个月,最深的感触是:田野里没有”干净”的传感器数据。深度相机被甘蔗叶子骗,IMU 被减速带骗,雷达被地面反射骗。单一传感器做安全判断是不靠谱的,必须交叉验证 + 时间窗口 + 滞回,三层过滤把误报压到可接受的水平。

IoT 项目有个很实际的问题:后端的 IP 地址怎么告诉设备?

Web 应用有域名,配个 DNS 就完事。IoT 设备跑在内网,没域名,IP 还是 DHCP 动态分配的。你要在每台传感器、每个智能插座上硬编码后端地址?下次路由器重启 IP 一变,全部失联。

mDNS(Multicast DNS)就是干这个的——设备在局域网里喊一声”谁是 iot-backend?“,后端自动应答”我是,IP 是 192.168.1.100,端口 8800”。苹果的 Bonjour、安卓的 NsdManager、Linux 的 Avahi,全支持。

听起来很美好。但真正写代码注册 mDNS 服务的时候,坑比想象的多。

最简版:十行代码能跑

用 Python 的 zeroconf 库,注册一个 mDNS 服务就这点代码:

1
2
3
4
5
6
7
8
9
10
11
from zeroconf import Zeroconf, ServiceInfo
import socket

zeroconf = Zeroconf()
info = ServiceInfo(
type_="_http._tcp.local.",
name="iot-backend._http._tcp.local.",
addresses=[socket.inet_aton("192.168.1.100")],
port=8800,
)
zeroconf.register_service(info)

开发机上跑,没问题。部署到现场——崩了。

第一关:IP 地址怎么拿

硬编码 IP 显然不行。得自动检测本机的局域网 IP。

直觉是 socket.gethostbyname(socket.gethostname()),但这个方法在 Docker 容器里返回 127.0.0.1,在 macOS 上返回 127.0.0.1,在有些 Linux 上直接报错。

最后写了个三级探测链:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _detect_local_ipv4():
# 第一级:通过主机名解析
local = _detect_interface_ipv4()
if local:
return local

# 第二级:UDP 探测 8.8.8.8
probes = [("8.8.8.8", 53), ("1.1.1.1", 53)]
for host, port in probes:
ip = _detect_outbound_ipv4(host, port)
if ip:
return ip

return ""

第一级 gethostname / getfqdn 解析:在正常 Linux 服务器上管用。

第二级 UDP 探测是最靠谱的——创建一个 UDP socket,connect 到 8.8.8.8:53,然后 getsockname() 取本端地址。这个技巧妙在:UDP 的 connect 不会真的发包,只是让操作系统选一个出口网卡和 IP。所以即使在完全没网的环境里也不会卡住。

1
2
3
4
5
6
7
8
9
10
11
def _detect_outbound_ipv4(target_host, target_port):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(0.5)
try:
sock.connect((target_host, target_port))
candidate = sock.getsockname()[0]
return _normalize_ipv4(candidate)
except Exception:
return ""
finally:
sock.close()

拿到的 IP 还得过 _normalize_ipv4 的验证——排除 0.0.0.0127.0.0.1169.254.x.x(link-local)、组播地址,这些都不是合法的广播 IP:

1
2
3
4
5
def _normalize_ipv4(value):
ip_obj = ipaddress.ip_address(value)
if ip_obj.is_unspecified or ip_obj.is_loopback or ip_obj.is_multicast or ip_obj.is_link_local:
return ""
return str(ip_obj)

8.8.8.8 不通就试 1.1.1.1。两个都不通?那这个机器大概率没网,mDNS 注册了也没用,返回空。

第二关:Zeroconf 注册,三种姿势

拿到 IP 后,调用 zeroconf.register_service()。但在不同版本的 zeroconf 库和不同操作系统上,这个方法的行为不一致:

  • 有些版本要求 strict=True(严格模式,检查服务名合法性),有些没有这个参数
  • 有些版本 Zeroconf(ip_version=IPVersion.V4Only) 正常,有些抛 EventLoopBlocked
  • Linux 上 IPv6 组播可能没权限,macOS 上 IPv4-only 模式偶尔卡住

所以写了三重降级策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
strategies = [
{"label": "v4_strict", "ip_version": IPVersion.V4Only, "strict": True},
{"label": "v4_relaxed", "ip_version": IPVersion.V4Only, "strict": False},
{"label": "default_relaxed", "ip_version": None, "strict": False},
]

for strategy in strategies:
try:
self.stop()
self._zeroconf = self._create_zeroconf(strategy["ip_version"])
self._service_info = ServiceInfo(...)
self._register_service(self._zeroconf, self._service_info, bool(strategy.get("strict", True)))
return True
except Exception as exc:
self.stop()

第一种:IPv4-only + 严格模式。最规范,绝大多数 Linux 服务器走这条路。

第二种:IPv4-only + 宽松模式。有些 zeroconf 版本的 strict 检查太严格(比如服务名里有下划线就报错),放宽后能过。

第三种:默认 + 宽松。放弃 IPv4-only 限制,让 zeroconf 自己选协议。这招在 macOS 上最管用——苹果的网络栈对 IPv4-only 组播有限制。

_create_zeroconf_register_service 都做了参数兼容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@staticmethod
def _create_zeroconf(ip_version):
if ip_version is None:
return Zeroconf()
try:
return Zeroconf(ip_version=ip_version)
except TypeError:
return Zeroconf() # 老版本没有 ip_version 参数

@staticmethod
def _register_service(zc, info, strict):
try:
zc.register_service(info, strict=strict)
except TypeError:
zc.register_service(info) # 老版本没有 strict 参数

这种 try/except TypeError 的写法看着粗暴,但处理 optional dependency 的参数兼容特别好用——zeroconf 从 0.28 到 0.130 的 API 变了好几次,你不知道现场装的是哪个版本。

第三关:DNS 标签规范化

mDNS 的服务名必须符合 DNS 标签规范:只允许字母、数字、连字符,长度不超过 63 字节。

用户配置里可能写 IoT后端my serviceservice_name.with.dots——全是非法的。得洗一遍:

1
2
3
4
5
6
7
8
9
10
def _normalize_service_label(value):
text = str(value or "").strip().lower()
out = []
for ch in text:
if ch.isalnum() or ch == "-":
out.append(ch)
else:
out.append("-") # 非法字符替换成连字符
normalized = "".join(out).strip("-") # 去掉首尾连字符
return normalized[:63] # DNS 标签长度上限

"IoT后端""iot----""iot"(去掉尾部连字符) "my service""my-service" "service_name.with.dots""service-name-with-dots"

服务类型也要规范化,确保以 . 结尾:

1
2
3
4
5
6
7
def _normalize_service_type(value):
service_type = str(value or "").strip()
if not service_type:
return "_http._tcp.local."
if not service_type.endswith("."):
service_type += "."
return service_type

mDNS 的类型格式是 _<proto>._<transport>.local.,漏了最后的点会导致某些客户端解析失败。

客户端怎么发现

服务端注册好了,客户端怎么用?

Linux(Avahi)

1
avahi-browse -r _http._tcp.local.

macOS

1
2
dns-sd -B _http._tcp.local.
dns-sd -L iot-backend _http._tcp.local.

Python 客户端

1
2
3
4
5
6
7
8
9
10
11
from zeroconf import ZeroconfServiceBrowser, Zeroconf

def on_service_state_change(zeroconf, service_type, name, state_change):
if state_change == ServiceStateChange.Added:
info = zeroconf.get_service_info(service_type, name)
if info:
addresses = [socket.inet_ntoa(addr) for addr in info.addresses]
print(f"发现 {name}: {addresses}:{info.port}")

zeroconf = Zeroconf()
browser = ZeroconfServiceBrowser(zeroconf, "_http._tcp.local.", handlers=[on_service_state_change])

安卓(NsdManager)

1
2
val nsdManager = context.getSystemService(Context.NSD_SERVICE) as NsdManager
nsdManager.discoverServices("_http._tcp.", NsdManager.PROTOCOL_DNS_SD, discoveryListener)

iOS(Bonjour / NetServiceBrowser)

1
2
let browser = NetServiceBrowser()
browser.searchForServices(ofType: "_http._tcp.", inDomain: "local.")

所有平台都原生支持 mDNS,不需要装额外软件。设备上线后自动发现后端,不需要配 IP。

关机时要注销

这个容易被忽略。如果后端进程直接被 kill,mDNS 记录会在局域网里残留几分钟(TTL 默认 120 秒),导致设备尝试连接一个已经不存在的地址。

1
2
3
4
5
6
7
8
9
10
11
def stop(self):
if self._zeroconf and self._service_info:
try:
self._zeroconf.unregister_service(self._service_info)
except Exception:
pass
if self._zeroconf:
try:
self._zeroconf.close()
except Exception:
pass

unregister_service 发送 goodbye 包,告诉网络”我不在了”,再 close 释放资源。try/except 兜底是因为关机时网络可能已经不可用了。

在 FastAPI 的 lifespan 里配合使用:

1
2
3
4
5
@asynccontextmanager
async def lifespan(app):
mdns_advertiser.start()
yield
mdns_advertiser.stop()

几个踩坑总结

  1. Docker 里 mDNS 不通:Docker 默认用 bridge 网络,组播包出不去。要用 --network host 或者单独配组播路由
  2. 双网卡冲突:服务器有内网和外网两块网卡,mDNS 注册的 IP 可能是外网那个。需要指定 advertise_ipv4 参数,或者依赖 UDP 探测自动选对网卡
  3. zeroconf 版本碎片化:0.28 和 0.130 的 API 差异巨大。用 try/except 兜参数,别假设用户的版本和你开发时一样
  4. mDNS 不是万能发现:它只在局域网有效。跨网段需要配 mDNS reflector(Avahi 的 enable-reflector=yes)或者上 DNS

这个 mDNS 模块代码一共 240 行,但花了两整天调通各种环境。IoT 后端不像 Web 后端跑在云上网络可控,你面对的是机房、大棚、果园——网络环境千奇百怪,代码得比环境更宽容。