IoT 设备跟后端通信,最怕三件事:别人冒充你的设备发假数据、数据在半路被改了、连着连着断了没人管。
我们的巡检车跑在果园里,通过 WiFi 连后端。WiFi 不靠谱,网络随时断;果园不是机房,谁都能插根网线进来。所以通信层得自己管安全。
这套通信客户端大概 600 行,但把证书生命周期、请求签名、WebSocket 自动重连全包了。这篇拆开讲。
自签证书:首次开机自动生成
巡检车出厂时没有证书。如果要求人工配证书,部署 50 台车得配 50 次——不现实。
解法:首次启动自动生成 RSA 2048 密钥 + 自签证书。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| def _ensure_certificate_material(self): cert_path = Path(self.client_cert_file) key_path = Path(self.client_key_file)
if cert_path.is_file() and key_path.is_file(): return
if not self.cert_auto_bootstrap: raise RuntimeError("巡检车证书材料缺失")
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) key_pem = private_key.private_bytes( encoding=Encoding.PEM, format=PrivateFormat.TraditionalOpenSSL, encryption_algorithm=NoEncryption(), ) self._write_bytes(str(key_path), key_pem)
cert_pem = self._generate_bootstrap_certificate(private_key) self._write_text(str(cert_path), cert_pem)
|
证书的 CN(Common Name)设为 rover_code,比如 cane_rover_001。后端收到注册请求时,用证书里的公钥验签名——如果签名对得上,说明请求确实来自持有私钥的设备。
证书自动续签:到期前 15 天换新
证书有效期 365 天。如果等过期了再换,车就再也连不上后端了——因为签名验不过去。
1 2 3 4 5 6 7 8 9 10 11 12
| def _maybe_renew_certificate(self): current_cert = x509.load_pem_x509_certificate(self._client_cert_pem.encode("utf-8")) now = datetime.datetime.now(datetime.timezone.utc) deadline = now + datetime.timedelta(days=15) if current_cert.not_valid_after_utc > deadline: return False
renewed_pem = self._generate_self_signed_certificate(current_cert) self._write_text(self.client_cert_file, renewed_pem) self._cert_sync_required = True return True
|
续签后设 _cert_sync_required = True,下次注册/心跳时把新证书的 PEM 和指纹上报给后端,后端更新数据库里的记录。全程自动,不需要人介入。
请求签名:不是 OAuth,不是 mTLS,是自己搓的
每个 HTTP 请求都带签名,签的内容覆盖五个字段:
1 2 3 4 5 6 7 8 9 10
| def _auth_headers(self, method, path, payload): ts = int(time.time()) payload_hash = hashlib.sha256(self._payload_bytes(payload)).hexdigest() message = f"{method.upper()}\n{path}\n{payload_hash}\n{ts}\n{self.rover_code}".encode("utf-8") signature = base64.b64encode(self._sign_message(message)).decode("ascii") return { "X-Rover-Code": self.rover_code, "X-Rover-Timestamp": str(ts), "X-Rover-Signature": signature, }
|
签名消息的格式:METHOD\nPATH\nSHA256(PAYLOAD)\nTIMESTAMP\nROVER_CODE,换行分隔。
为什么签这么多东西?
- METHOD + PATH:防止把 POST 请求的签名挪到 DELETE 上用
- SHA256(PAYLOAD):防止篡改请求体(payload 可能很大,签 hash 而不是原文)
- TIMESTAMP:防重放攻击——后端检查时间戳,超过 5 分钟的请求直接拒
- ROVER_CODE:防止 A 车的签名被挪到 B 车的请求上
签名算法支持四种密钥类型:
1 2 3 4 5 6 7 8 9
| def _sign_message(self, message): if isinstance(self._private_key, rsa.RSAPrivateKey): return self._private_key.sign(message, padding.PKCS1v15(), hashes.SHA256()) if isinstance(self._private_key, ec.EllipticCurvePrivateKey): return self._private_key.sign(message, ec.ECDSA(hashes.SHA256())) if isinstance(self._private_key, ed25519.Ed25519PrivateKey): return self._private_key.sign(message) if isinstance(self._private_key, ed448.Ed448PrivateKey): return self._private_key.sign(message)
|
RSA + PKCS1v15 是最通用的,Ed25519 是最快的。现场部署哪种都行,代码自动适配。
WebSocket:断了自动重连,消息不丢
WebSocket 是小车和后端的双向通道——后端下发任务、控制指令,小车上报状态。WiFi 断了是常态,重连必须自动。
1 2 3 4 5 6 7
| def _ws_loop(self): while not self._stop_event.is_set(): self._run_single_ws_session() if self._stop_event.is_set(): break self._connected.clear() time.sleep(2.0)
|
外层循环管重连,内层 _run_single_ws_session 管单次会话。
发送端用有界队列(maxsize=300),单独线程消费:
1 2 3 4 5 6 7 8 9 10 11 12
| def _ws_sender_loop(self, ws_app, session_stop_event): while not session_stop_event.is_set(): payload = self._send_queue.get(timeout=0.3) if self._ws_app is not ws_app: self._send_queue.put_nowait(payload) break if not self._connected.is_set(): self._send_queue.put_nowait(payload) time.sleep(0.5) continue ws_app.send(json.dumps(payload))
|
关键设计:发送失败的消息塞回队列。不是丢了,是等重连成功后再发。这意味着短暂的断线不会丢失心跳和状态上报。
WebSocket 连接也带签名——在握手 header 里加 X-Rover-Signature,跟 HTTP 请求同一个签名逻辑:
1 2 3 4
| def _build_ws_headers(self): ws_path = urlparse(self.ws_url).path or "/" headers = self._auth_headers("WS", ws_path, {}) return [f"{k}: {v}" for k, v in headers.items()]
|
mTLS 支持:如果后端要求双向证书
签名和 mTLS 不是二选一。签名是应用层防护,mTLS 是传输层防护。两个都开的话,即使网络被中间人劫持,没有客户端证书 TLS 握手就过不去;即使证书被偷了,没有私钥也签不出合法的请求。
1 2 3 4 5
| if self.security_enabled: self._http_client_cert = (self.client_cert_file, self.client_key_file) if ws_secure: self._ws_sslopt["certfile"] = self.client_cert_file self._ws_sslopt["keyfile"] = self.client_key_file
|
HTTP 用 requests 的 cert 参数,WebSocket 用 sslopt 字典,都是标准接口。
一个踩过的坑:路径尾部斜杠
后端有个路由是 /api/rover/map/upload,某些部署(Nginx 反代)会 301 重定向到 /api/rover/map/upload/(带尾斜杠)。重定向后请求路径变了,但签名里还是原路径——后端验签失败,返回 403。
解法:如果 403 且错误信息包含”签名校验失败”,自动用带尾斜杠的路径重试一次:
1 2 3 4 5 6 7 8 9 10 11
| def upload_map(self, payload): response = self._request("POST", "/api/rover/map/upload", payload) if response is not None: return response
last_error = (self._last_error or "").lower() if ("http 403" in last_error) and ("签名校验失败" in last_error or "signature" in last_error): retry = self._request("POST", "/api/rover/map/upload/", payload) if retry is not None: self._logger.info("地图上传重试(带尾斜杠)成功") return retry
|
丑,但管用。根本解法是 Nginx 配置里关掉路径规范化,或者后端路由同时注册两个路径。