小车怎么跟后端安全通信——自签证书、请求签名和那个 WebSocket 重连

IoT 设备跟后端通信,最怕三件事:别人冒充你的设备发假数据、数据在半路被改了、连着连着断了没人管。

我们的巡检车跑在果园里,通过 WiFi 连后端。WiFi 不靠谱,网络随时断;果园不是机房,谁都能插根网线进来。所以通信层得自己管安全。

这套通信客户端大概 600 行,但把证书生命周期、请求签名、WebSocket 自动重连全包了。这篇拆开讲。

自签证书:首次开机自动生成

巡检车出厂时没有证书。如果要求人工配证书,部署 50 台车得配 50 次——不现实。

解法:首次启动自动生成 RSA 2048 密钥 + 自签证书

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def _ensure_certificate_material(self):
cert_path = Path(self.client_cert_file)
key_path = Path(self.client_key_file)

if cert_path.is_file() and key_path.is_file():
return # 已有,跳过

if not self.cert_auto_bootstrap:
raise RuntimeError("巡检车证书材料缺失")

# 生成 RSA 2048 私钥
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
key_pem = private_key.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=NoEncryption(),
)
self._write_bytes(str(key_path), key_pem)

# 用私钥签发自签证书
cert_pem = self._generate_bootstrap_certificate(private_key)
self._write_text(str(cert_path), cert_pem)

证书的 CN(Common Name)设为 rover_code,比如 cane_rover_001。后端收到注册请求时,用证书里的公钥验签名——如果签名对得上,说明请求确实来自持有私钥的设备。

证书自动续签:到期前 15 天换新

证书有效期 365 天。如果等过期了再换,车就再也连不上后端了——因为签名验不过去。

1
2
3
4
5
6
7
8
9
10
11
12
def _maybe_renew_certificate(self):
current_cert = x509.load_pem_x509_certificate(self._client_cert_pem.encode("utf-8"))
now = datetime.datetime.now(datetime.timezone.utc)
deadline = now + datetime.timedelta(days=15) # 到期前 15 天
if current_cert.not_valid_after_utc > deadline:
return False # 还早着呢

# 续签:用同一个私钥,新有效期
renewed_pem = self._generate_self_signed_certificate(current_cert)
self._write_text(self.client_cert_file, renewed_pem)
self._cert_sync_required = True # 标记需要同步给后端
return True

续签后设 _cert_sync_required = True,下次注册/心跳时把新证书的 PEM 和指纹上报给后端,后端更新数据库里的记录。全程自动,不需要人介入。

请求签名:不是 OAuth,不是 mTLS,是自己搓的

每个 HTTP 请求都带签名,签的内容覆盖五个字段:

1
2
3
4
5
6
7
8
9
10
def _auth_headers(self, method, path, payload):
ts = int(time.time())
payload_hash = hashlib.sha256(self._payload_bytes(payload)).hexdigest()
message = f"{method.upper()}\n{path}\n{payload_hash}\n{ts}\n{self.rover_code}".encode("utf-8")
signature = base64.b64encode(self._sign_message(message)).decode("ascii")
return {
"X-Rover-Code": self.rover_code,
"X-Rover-Timestamp": str(ts),
"X-Rover-Signature": signature,
}

签名消息的格式:METHOD\nPATH\nSHA256(PAYLOAD)\nTIMESTAMP\nROVER_CODE,换行分隔。

为什么签这么多东西?

  • METHOD + PATH:防止把 POST 请求的签名挪到 DELETE 上用
  • SHA256(PAYLOAD):防止篡改请求体(payload 可能很大,签 hash 而不是原文)
  • TIMESTAMP:防重放攻击——后端检查时间戳,超过 5 分钟的请求直接拒
  • ROVER_CODE:防止 A 车的签名被挪到 B 车的请求上

签名算法支持四种密钥类型:

1
2
3
4
5
6
7
8
9
def _sign_message(self, message):
if isinstance(self._private_key, rsa.RSAPrivateKey):
return self._private_key.sign(message, padding.PKCS1v15(), hashes.SHA256())
if isinstance(self._private_key, ec.EllipticCurvePrivateKey):
return self._private_key.sign(message, ec.ECDSA(hashes.SHA256()))
if isinstance(self._private_key, ed25519.Ed25519PrivateKey):
return self._private_key.sign(message)
if isinstance(self._private_key, ed448.Ed448PrivateKey):
return self._private_key.sign(message)

RSA + PKCS1v15 是最通用的,Ed25519 是最快的。现场部署哪种都行,代码自动适配。

WebSocket:断了自动重连,消息不丢

WebSocket 是小车和后端的双向通道——后端下发任务、控制指令,小车上报状态。WiFi 断了是常态,重连必须自动。

1
2
3
4
5
6
7
def _ws_loop(self):
while not self._stop_event.is_set():
self._run_single_ws_session()
if self._stop_event.is_set():
break
self._connected.clear()
time.sleep(2.0) # 断了等 2 秒再连

外层循环管重连,内层 _run_single_ws_session 管单次会话。

发送端用有界队列(maxsize=300),单独线程消费:

1
2
3
4
5
6
7
8
9
10
11
12
def _ws_sender_loop(self, ws_app, session_stop_event):
while not session_stop_event.is_set():
payload = self._send_queue.get(timeout=0.3)
# 如果连接断了或者换了新 session,把消息塞回队列
if self._ws_app is not ws_app:
self._send_queue.put_nowait(payload)
break
if not self._connected.is_set():
self._send_queue.put_nowait(payload) # 塞回去,等重连
time.sleep(0.5)
continue
ws_app.send(json.dumps(payload))

关键设计:发送失败的消息塞回队列。不是丢了,是等重连成功后再发。这意味着短暂的断线不会丢失心跳和状态上报。

WebSocket 连接也带签名——在握手 header 里加 X-Rover-Signature,跟 HTTP 请求同一个签名逻辑:

1
2
3
4
def _build_ws_headers(self):
ws_path = urlparse(self.ws_url).path or "/"
headers = self._auth_headers("WS", ws_path, {})
return [f"{k}: {v}" for k, v in headers.items()]

mTLS 支持:如果后端要求双向证书

签名和 mTLS 不是二选一。签名是应用层防护,mTLS 是传输层防护。两个都开的话,即使网络被中间人劫持,没有客户端证书 TLS 握手就过不去;即使证书被偷了,没有私钥也签不出合法的请求。

1
2
3
4
5
if self.security_enabled:
self._http_client_cert = (self.client_cert_file, self.client_key_file)
if ws_secure:
self._ws_sslopt["certfile"] = self.client_cert_file
self._ws_sslopt["keyfile"] = self.client_key_file

HTTP 用 requestscert 参数,WebSocket 用 sslopt 字典,都是标准接口。

一个踩过的坑:路径尾部斜杠

后端有个路由是 /api/rover/map/upload,某些部署(Nginx 反代)会 301 重定向到 /api/rover/map/upload/(带尾斜杠)。重定向后请求路径变了,但签名里还是原路径——后端验签失败,返回 403。

解法:如果 403 且错误信息包含”签名校验失败”,自动用带尾斜杠的路径重试一次:

1
2
3
4
5
6
7
8
9
10
11
def upload_map(self, payload):
response = self._request("POST", "/api/rover/map/upload", payload)
if response is not None:
return response

last_error = (self._last_error or "").lower()
if ("http 403" in last_error) and ("签名校验失败" in last_error or "signature" in last_error):
retry = self._request("POST", "/api/rover/map/upload/", payload)
if retry is not None:
self._logger.info("地图上传重试(带尾斜杠)成功")
return retry

丑,但管用。根本解法是 Nginx 配置里关掉路径规范化,或者后端路由同时注册两个路径。

欢迎关注我的其它发布渠道