IoT 设备协议抽象层——让 Modbus、MQTT、ESPHome 长一个样

这个农业 IoT 项目最早只接 Modbus 传感器,业务代码到处都是 client.read_holding_registers(...)——那时候挺潇洒。

后来加了 MQTT 智能插座,又来了 ESPHome 的 DIY 板子,再后来还得支持 OpenMQTTGateway 转出来的 RF 设备……每多一种协议,业务层就得跟着改一圈。改到第三种我就受不了了——再这么下去,业务代码迟早成一锅粥。

最后还是绕回去,老老实实做了一层设备抽象。这篇记一下这层是怎么一点点长出来的,以及踩到的两个比较重要的坑:Modbus 串口阻塞,还有启动时的全量预读取。

Driver 注册表

不同协议各写一个 Driver,外面统一拿。注册表本身就是个字典:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from enum import Enum
from typing import Dict, Type

class DeviceConfigType(str, Enum):
MODBUS = "modbus"
MQTT = "mqtt"
ESPHOME_NATIVE = "esphome_native"
ESPHOME_MQTT = "esphome_mqtt"
OMG_GATEWAY = "omg_gateway"

class DeviceManager:
def __init__(self):
self.drivers: Dict[DeviceConfigType, BaseDriver] = {
DeviceConfigType.MODBUS: ModbusDriver(),
DeviceConfigType.MQTT: MQTTDriver(),
DeviceConfigType.ESPHOME_NATIVE: ESPHomeNativeDriver(),
DeviceConfigType.ESPHOME_MQTT: ESPHomeMQTTDriver(),
DeviceConfigType.OMG_GATEWAY: OMGDriver(),
}

每个 Driver 实现一组最小接口:

1
2
3
4
5
6
7
8
9
10
11
12
class BaseDriver(ABC):
@abstractmethod
async def connect(self, config: dict) -> bool: ...

@abstractmethod
async def read_entities(self, config: dict) -> list: ...

@abstractmethod
async def write_entity(self, config: dict, entity_id: str, value: any) -> bool: ...

@abstractmethod
async def disconnect(self): ...

新协议加进来,就是写一个新 Driver、在注册表里登记一下——业务层一行不用改。这种感觉爽到没朋友。

统一实体模型

每个 Driver 内部怎么访问设备,它自己的事。但对外吐出来的,必须是统一的 Entity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class Entity:
entity_id: str
device_id: int
name: str
entity_type: str # sensor / switch / light / button
value: Any
unit: Optional[str]
readable: bool
writable: bool
last_update: float

业务代码只跟 Entity 打交道,根本不知道底下是 Modbus 寄存器还是 MQTT topic:

1
2
3
4
5
temp_entity = device_manager.get_entity("greenhouse_1.temperature")
print(f"当前温度: {temp_entity.value} {temp_entity.unit}")

relay_entity = device_manager.get_entity("greenhouse_1.relay_1")
await device_manager.write_entity(relay_entity.entity_id, True)

这套抽象后来接 Home Assistant 的时候也省了大事——HA 本身就是 Entity 概念,对得上号。

映射缓存

设备的协议地址映射(哪个 entity 对应哪个寄存器 / topic / key)属于典型的”读 1000 次、改 1 次”——每次都查数据库太亏。带个 5 分钟 TTL 的缓存:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from dataclasses import dataclass
import time

@dataclass
class CachedMappingInfo:
entity_id: int
entity_type: str
scale: float
register_address: Optional[int] # Modbus
mqtt_topic: Optional[str] # MQTT
esphome_key: Optional[str] # ESPHome

class DeviceManager:
def __init__(self):
self.mapping_cache: Dict[int, Dict[int, CachedMappingInfo]] = {}
self.config_cache: Dict[int, CachedDeviceConfig] = {}
self._cache_ttl = 300
self._cache_timestamps: Dict[str, float] = {}

def get_mapping(self, device_id: int, entity_id: int):
cache_key = f"mapping_{device_id}"
if self._is_cache_expired(cache_key):
self._refresh_mapping_cache(device_id)
return self.mapping_cache.get(device_id, {}).get(entity_id)

def _is_cache_expired(self, cache_key: str) -> bool:
timestamp = self._cache_timestamps.get(cache_key, 0)
return time.time() - timestamp > self._cache_ttl

5 分钟是拍脑袋定的——长一点会有”改了配置半天不生效”的体感问题;短一点意义又不大。后来加了主动 invalidate,改配置时直接清缓存,TTL 就只是个保底。

端口级独立轮询

这个坑是上线之后才暴露的。

Modbus 设备共用一根 RS-485 串口。某个设备响应慢(断线超时要等 1 秒),同一串口上的其他设备就全跟着卡。一台慢设备,能把整根串口的节奏拖垮——那一刻血压直接上来。

办法是按”端口”维度做隔离:每根串口/TCP 连接,配一个单线程的 ThreadPoolExecutor,互不干扰:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from concurrent.futures import ThreadPoolExecutor, Future

class DeviceManager:
def __init__(self):
self._modbus_port_workers: Dict[str, ThreadPoolExecutor] = {}
self._modbus_port_futures: Dict[str, Future] = {}

def _get_port_worker(self, port_key: str) -> ThreadPoolExecutor:
if port_key not in self._modbus_port_workers:
self._modbus_port_workers[port_key] = ThreadPoolExecutor(
max_workers=1,
thread_name_prefix=f"modbus-{port_key}",
)
return self._modbus_port_workers[port_key]

def poll_device(self, device: Device):
if device.config_type == DeviceConfigType.MODBUS:
port_key = self._get_modbus_port_key(device)
worker = self._get_port_worker(port_key)
future = worker.submit(self._poll_modbus_device, device)
self._modbus_port_futures[port_key] = future
else:
self._poll_device(device)

def _get_modbus_port_key(self, device: Device) -> str:
config = device.modbus_config
if config.serial_port:
return f"serial_{config.serial_port}"
return f"tcp_{config.host}:{config.port}"

max_workers=1 不是手抖——Modbus 在同一根串口上本来就是顺序协议,并行发指令立马乱码。这里说的”并行”,指的是不同串口之间。

启动预读取

启动后所有设备的当前值都是空的,得等第一轮轮询完才能填上。如果设备多,串行连接 + 串行预读取,能让你等几分钟——UI 上看就是满屏的”–“。

8 线程并发拉一下就好了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from concurrent.futures import ThreadPoolExecutor

class DeviceManager:
def __init__(self):
self._startup_preload_executor = ThreadPoolExecutor(
max_workers=8,
thread_name_prefix="startup-preload",
)
self._startup_preload_futures: Dict[int, Future] = {}

def connect_all_devices(self):
devices = Device.select().where(Device.enabled == True)
for device in devices:
future = self._startup_preload_executor.submit(
self._connect_and_preload, device
)
self._startup_preload_futures[device.id] = future

def _connect_and_preload(self, device: Device):
try:
driver = self.drivers[device.config_type]
driver.connect(device.config)
entities = driver.read_entities(device.config)
for entity in entities:
self._update_entity_state(device.id, entity)
except Exception as e:
logger.error(f"设备 {device.name} 连接失败: {e}")

注意,这个预读取池是按”设备”并发的,不是按”端口”。预读取阶段串口的串行约束依然在,会被端口轮询那一层挡住——所以不用担心冲突。

事件总线

设备状态变了之后,下游想做的事不止一种:UI 刷新、自动化规则触发、写历史库、推 MQTT 广播。要是在 Driver 里一个个 hook,耦合就乱成毛线团了。

所以加了个简单的事件总线:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from enum import Enum
from typing import Callable, Dict, List

class EventType(str, Enum):
STATE_CHANGED = "state_changed"
DEVICE_CONNECTED = "device_connected"
DEVICE_DISCONNECTED = "device_disconnected"

class EventBus:
def __init__(self):
self._listeners: Dict[EventType, List[Callable]] = {}

def listen(self, event_type: EventType, callback: Callable):
if event_type not in self._listeners:
self._listeners[event_type] = []
self._listeners[event_type].append(callback)

def unsubscribe():
self._listeners[event_type].remove(callback)
return unsubscribe

def emit(self, event_type: EventType, data: dict):
for callback in self._listeners.get(event_type, []):
try:
callback(data)
except Exception as e:
logger.error(f"事件处理失败: {e}")

事件总线最容易出的事,是回调里又触发别的事件——好家伙,循环了。这块靠业务约定撑着:回调里只读、不写设备。真有写需求,就走自动化引擎那一层(那边有防重入机制,下一篇细讲)。

欢迎关注我的其它发布渠道