这个农业 IoT 项目最早只接 Modbus 传感器,业务代码到处都是 client.read_holding_registers(...)——那时候挺潇洒。
后来加了 MQTT 智能插座,又来了 ESPHome 的 DIY 板子,再后来还得支持 OpenMQTTGateway 转出来的 RF 设备……每多一种协议,业务层就得跟着改一圈。改到第三种我就受不了了——再这么下去,业务代码迟早成一锅粥。
最后还是绕回去,老老实实做了一层设备抽象。这篇记一下这层是怎么一点点长出来的,以及踩到的两个比较重要的坑:Modbus 串口阻塞,还有启动时的全量预读取。
Driver 注册表
不同协议各写一个 Driver,外面统一拿。注册表本身就是个字典:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from enum import Enum from typing import Dict, Type
class DeviceConfigType(str, Enum): MODBUS = "modbus" MQTT = "mqtt" ESPHOME_NATIVE = "esphome_native" ESPHOME_MQTT = "esphome_mqtt" OMG_GATEWAY = "omg_gateway"
class DeviceManager: def __init__(self): self.drivers: Dict[DeviceConfigType, BaseDriver] = { DeviceConfigType.MODBUS: ModbusDriver(), DeviceConfigType.MQTT: MQTTDriver(), DeviceConfigType.ESPHOME_NATIVE: ESPHomeNativeDriver(), DeviceConfigType.ESPHOME_MQTT: ESPHomeMQTTDriver(), DeviceConfigType.OMG_GATEWAY: OMGDriver(), }
|
每个 Driver 实现一组最小接口:
1 2 3 4 5 6 7 8 9 10 11 12
| class BaseDriver(ABC): @abstractmethod async def connect(self, config: dict) -> bool: ...
@abstractmethod async def read_entities(self, config: dict) -> list: ...
@abstractmethod async def write_entity(self, config: dict, entity_id: str, value: any) -> bool: ...
@abstractmethod async def disconnect(self): ...
|
新协议加进来,就是写一个新 Driver、在注册表里登记一下——业务层一行不用改。这种感觉爽到没朋友。
统一实体模型
每个 Driver 内部怎么访问设备,它自己的事。但对外吐出来的,必须是统一的 Entity:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from dataclasses import dataclass from typing import Any, Optional
@dataclass class Entity: entity_id: str device_id: int name: str entity_type: str value: Any unit: Optional[str] readable: bool writable: bool last_update: float
|
业务代码只跟 Entity 打交道,根本不知道底下是 Modbus 寄存器还是 MQTT topic:
1 2 3 4 5
| temp_entity = device_manager.get_entity("greenhouse_1.temperature") print(f"当前温度: {temp_entity.value} {temp_entity.unit}")
relay_entity = device_manager.get_entity("greenhouse_1.relay_1") await device_manager.write_entity(relay_entity.entity_id, True)
|
这套抽象后来接 Home Assistant 的时候也省了大事——HA 本身就是 Entity 概念,对得上号。
映射缓存
设备的协议地址映射(哪个 entity 对应哪个寄存器 / topic / key)属于典型的”读 1000 次、改 1 次”——每次都查数据库太亏。带个 5 分钟 TTL 的缓存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| from dataclasses import dataclass import time
@dataclass class CachedMappingInfo: entity_id: int entity_type: str scale: float register_address: Optional[int] mqtt_topic: Optional[str] esphome_key: Optional[str]
class DeviceManager: def __init__(self): self.mapping_cache: Dict[int, Dict[int, CachedMappingInfo]] = {} self.config_cache: Dict[int, CachedDeviceConfig] = {} self._cache_ttl = 300 self._cache_timestamps: Dict[str, float] = {}
def get_mapping(self, device_id: int, entity_id: int): cache_key = f"mapping_{device_id}" if self._is_cache_expired(cache_key): self._refresh_mapping_cache(device_id) return self.mapping_cache.get(device_id, {}).get(entity_id)
def _is_cache_expired(self, cache_key: str) -> bool: timestamp = self._cache_timestamps.get(cache_key, 0) return time.time() - timestamp > self._cache_ttl
|
5 分钟是拍脑袋定的——长一点会有”改了配置半天不生效”的体感问题;短一点意义又不大。后来加了主动 invalidate,改配置时直接清缓存,TTL 就只是个保底。
端口级独立轮询
这个坑是上线之后才暴露的。
Modbus 设备共用一根 RS-485 串口。某个设备响应慢(断线超时要等 1 秒),同一串口上的其他设备就全跟着卡。一台慢设备,能把整根串口的节奏拖垮——那一刻血压直接上来。
办法是按”端口”维度做隔离:每根串口/TCP 连接,配一个单线程的 ThreadPoolExecutor,互不干扰:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| from concurrent.futures import ThreadPoolExecutor, Future
class DeviceManager: def __init__(self): self._modbus_port_workers: Dict[str, ThreadPoolExecutor] = {} self._modbus_port_futures: Dict[str, Future] = {}
def _get_port_worker(self, port_key: str) -> ThreadPoolExecutor: if port_key not in self._modbus_port_workers: self._modbus_port_workers[port_key] = ThreadPoolExecutor( max_workers=1, thread_name_prefix=f"modbus-{port_key}", ) return self._modbus_port_workers[port_key]
def poll_device(self, device: Device): if device.config_type == DeviceConfigType.MODBUS: port_key = self._get_modbus_port_key(device) worker = self._get_port_worker(port_key) future = worker.submit(self._poll_modbus_device, device) self._modbus_port_futures[port_key] = future else: self._poll_device(device)
def _get_modbus_port_key(self, device: Device) -> str: config = device.modbus_config if config.serial_port: return f"serial_{config.serial_port}" return f"tcp_{config.host}:{config.port}"
|
max_workers=1 不是手抖——Modbus 在同一根串口上本来就是顺序协议,并行发指令立马乱码。这里说的”并行”,指的是不同串口之间。
启动预读取
启动后所有设备的当前值都是空的,得等第一轮轮询完才能填上。如果设备多,串行连接 + 串行预读取,能让你等几分钟——UI 上看就是满屏的”–“。
8 线程并发拉一下就好了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| from concurrent.futures import ThreadPoolExecutor
class DeviceManager: def __init__(self): self._startup_preload_executor = ThreadPoolExecutor( max_workers=8, thread_name_prefix="startup-preload", ) self._startup_preload_futures: Dict[int, Future] = {}
def connect_all_devices(self): devices = Device.select().where(Device.enabled == True) for device in devices: future = self._startup_preload_executor.submit( self._connect_and_preload, device ) self._startup_preload_futures[device.id] = future
def _connect_and_preload(self, device: Device): try: driver = self.drivers[device.config_type] driver.connect(device.config) entities = driver.read_entities(device.config) for entity in entities: self._update_entity_state(device.id, entity) except Exception as e: logger.error(f"设备 {device.name} 连接失败: {e}")
|
注意,这个预读取池是按”设备”并发的,不是按”端口”。预读取阶段串口的串行约束依然在,会被端口轮询那一层挡住——所以不用担心冲突。
事件总线
设备状态变了之后,下游想做的事不止一种:UI 刷新、自动化规则触发、写历史库、推 MQTT 广播。要是在 Driver 里一个个 hook,耦合就乱成毛线团了。
所以加了个简单的事件总线:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| from enum import Enum from typing import Callable, Dict, List
class EventType(str, Enum): STATE_CHANGED = "state_changed" DEVICE_CONNECTED = "device_connected" DEVICE_DISCONNECTED = "device_disconnected"
class EventBus: def __init__(self): self._listeners: Dict[EventType, List[Callable]] = {}
def listen(self, event_type: EventType, callback: Callable): if event_type not in self._listeners: self._listeners[event_type] = [] self._listeners[event_type].append(callback)
def unsubscribe(): self._listeners[event_type].remove(callback) return unsubscribe
def emit(self, event_type: EventType, data: dict): for callback in self._listeners.get(event_type, []): try: callback(data) except Exception as e: logger.error(f"事件处理失败: {e}")
|
事件总线最容易出的事,是回调里又触发别的事件——好家伙,循环了。这块靠业务约定撑着:回调里只读、不写设备。真有写需求,就走自动化引擎那一层(那边有防重入机制,下一篇细讲)。