照着 Home Assistant 搓一个 IoT 实体系统——状态机、事件总线、还有那个 NaN 陷阱

项目做到中期,设备数据在 DeviceManager 里进进出出,自动化引擎要监听变化,前端要实时推送,InfluxDB 要存历史——所有人都在直接读 DeviceManager 的字典,耦合得一塌糊涂。

有天我翻 Home Assistant 的源码,发现它有个很优雅的抽象:一切皆实体。温湿度传感器是实体,继电器开关是实体,连”今天的日出时间”都是实体。上层逻辑只认实体 ID 和状态值,不关心底层是 Modbus 还是 MQTT。

我想了想,这个东西我得有一个。

四个单例撑起的骨架

实体系统拆成四个核心组件,全部单例:

  • StateMachine:存所有实体的当前状态,是整个系统的单一真相源
  • EventBus:事件发布/订阅,状态变了就广播
  • EntityRegistry:实体注册表,管生命周期和索引
  • ServiceRegistry:服务注册表,turn_onturn_off 这类动作走这里调

数据流长这样:

1
2
3
4
5
驱动层读到数据 → DeviceManager → StateMachine.set() → EventBus.fire(STATE_CHANGED)

┌────────┼────────┐
↓ ↓ ↓
自动化引擎 SSE推送 InfluxDB存储

所有消费者通过 EventBus 订阅,谁也不直接调谁。

StateMachine:那个 NaN 陷阱

StateMachine 的核心就一个 set() 方法——设状态、判变化、发事件。但”判变化”这三个字,坑比想象的深。

1
2
3
4
5
6
7
8
9
10
11
12
13
def _states_equal(a, b):
# 坑1:float('nan') != float('nan') 是 True
# 但语义上,两个 NaN 应该视为"相等"
if isinstance(a, float) and isinstance(b, float):
if math.isnan(a) and math.isnan(b):
return True

# 坑2:True == 1 是 True(Python 的"贴心"设计)
# 但开关状态 True 和数值 1 完全是两码事
if type(a) is bool or type(b) is bool:
return type(a) is type(b) and a == b

return a == b

第一个坑:Modbus 传感器偶尔返回 NaN(比如断线时寄存器值是 0x7FFF)。Python 里 NaN != NaN 返回 True,如果直接拿 == 判,系统会认为状态每秒都在变——事件风暴,前端疯狂刷新,InfluxDB 疯狂写入。

第二个坑:继电器状态是 True/False,某个寄存器原始值恰好是 1/0。Python 觉得 True == 1,但语义上一个表示”开”,一个表示”1 摄氏度”。如果判等通过了,状态变化事件就不触发,前端就卡在那儿不变。

set() 方法里还有个细节——状态没变时不发事件,只更新 last_reported 时间戳。这避免了轮询场景下每秒重复广播相同值的问题:

1
2
3
4
5
6
7
state_changed = old_state is None or not _states_equal(old_state.state, new_state)
attributes_changed = old_state is None or old_state.attributes.to_dict() != new_attributes.to_dict()

if not state_changed and not attributes_changed and not force_update:
old_state.last_reported = now
self._event_bus.fire(EventType.STATE_REPORTED, ...) # 轻量级事件
return old_state

STATE_REPORTEDSTATE_CHANGED 是两个事件。前端可以只订阅 STATE_CHANGED(值变了才更新 UI),InfluxDB 可以订阅 STATE_REPORTED(每次轮询都记录,不管变没变)。

EventBus:线程池分发,别让慢监听器拖死发布者

EventBus 的 fire() 方法在触发监听器时,不是直接调,而是扔进线程池:

1
2
3
4
5
6
7
8
9
10
11
12
class EventBus:
def __init__(self):
self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="event_bus_")

def _call_listeners(self, event):
with self._lock:
listeners = list(self._listeners.get(event_type, []))
once_listeners = list(self._once_listeners.get(event_type, []))
all_listeners = list(self._all_listeners)

for listener in listeners + once_listeners + all_listeners:
self._executor.submit(self._safe_call_listener, listener, event)

为什么?因为有个血泪教训:早期 InfluxDB 写入卡了 3 秒,直接把 StateMachine 的 set() 堵住,导致所有设备的轮询数据排着队进不来,整个系统假死。

扔进线程池之后,InfluxDB 写入再慢也不影响状态更新和事件广播。监听器里抛异常也不怕,_safe_call_listener 会兜住。

另外支持三种监听模式:

  • listen(event_type, fn):常规订阅
  • listen_once(event_type, fn):触发一次自动注销(启动时等 SYSTEM_STARTED 事件特别好用)
  • listen_all(fn):所有事件都收(调试和审计用)

还有个 100 条的事件历史缓冲,出问题时可以回查最后 100 条事件。

EntityRegistry:三层索引

实体注册表不只是个字典,它维护了三层索引:

1
2
3
4
self._entries: Dict[str, EntityEntry] = {}           # entity_id → 条目
self._unique_id_index: Dict[tuple, str] = {} # (platform, unique_id) → entity_id
self._device_entities: Dict[int, List[str]] = {} # device_id → [entity_id]
self._platform_entities: Dict[str, List[str]] = {} # platform → [entity_id]
  • unique_id_index:同一设备重新注册时,通过 (platform, unique_id) 找到已有条目,更新而不是重复创建
  • device_entities:设备下线时一键清理所有关联实体
  • platform_entities:卸载某个协议驱动时,清理该平台的所有实体

ServiceRegistry:bridge 兜底写入

服务注册表内置了 turn_onturn_offtoggle 三个通用服务。但有个问题:有些实体只在 StateMachine 里有状态(从数据库预加载的),没有在 EntityRegistry 里注册对应的 Entity 实例——也就没有 turn_on() 方法。

怎么办?两层写入策略

1
2
3
4
5
6
7
8
9
10
11
def _handle_generic_turn_on(self, call):
for entity_id in entity_ids:
entity = self._entity_registry.get_entity(entity_id)
if entity and hasattr(entity, "turn_on"):
entity.turn_on(**call.data) # 第一层:实体自己的方法
elif self._is_writable_via_bridge(entity_id):
# 第二层:bridge 兜底写设备
# EntitySystemBridge 监听了 CALL_SERVICE 事件,
# 从状态属性中提取 device_id 和 entity_db_id,
# 直接调 DeviceManager.set_entity_state()
results.append({"entity_id": entity_id, "success": True})

_is_writable_via_bridge 会检查状态机里该实体的 read_only 属性。不是只读的,就交给 bridge 的 CALL_SERVICE 事件监听器去兜底写。

这套双路径的设计,让新实体系统和旧 DeviceManager 可以和平共处。新注册的实体走 ServiceRegistry 的标准路径,老数据走 bridge 兜底,谁也不碍着谁。

Bridge:连接新旧世界的胶水

EntitySystemBridge 是整个实体系统最难写的部分——它要把新系统的事件翻译成旧系统听得懂的语言。

1
2
3
4
5
6
7
8
9
10
class EntitySystemBridge:
def attach(self, device_manager):
# 新 → 旧:状态变化触发自动化引擎
self._event_bus.listen(EventType.STATE_CHANGED, self._on_state_changed)

# 新 → 旧:服务调用转发到设备驱动
self._event_bus.listen(EventType.CALL_SERVICE, self._on_call_service)

# 旧 → 新:设备数据同步到实体状态
# DeviceManager 调 bridge.sync_device_to_entities()

_on_state_changed 里有个关键操作:从实体 ID device_3_temperature 中解析出 device_id=3,再从状态属性的 extra 字段取 entity_db_id,最后调用 automation_engine.on_entity_changed()。这样自动化引擎根本不需要知道实体系统的存在。

启动预热:InfluxDB 回填

系统重启后,实体状态全是 None,前端一看满屏空白——这体验太差了。

解法分两步:

  1. 从 SQLite 加载所有实体定义,在 StateMachine 中创建初始状态(值为 None
  2. 从 InfluxDB 查最近 24 小时的数据,把最新值填回去
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def _fill_initial_values_from_influxdb(state_machine, context, influx_service=None):
all_states = state_machine.get_all()
device_entities = {} # device_id → [(entity_id, entity_db_id)]
for entity_id, state in all_states.items():
if state.state is not None:
continue # 已有值,跳过
device_id = state.attributes.extra.get("device_id")
entity_db_id = state.attributes.extra.get("entity_db_id")
if device_id and entity_db_id:
device_entities.setdefault(device_id, []).append((entity_id, str(entity_db_id)))

for device_id, entities in device_entities.items():
raw_data = influx.query_device_data(device_id=device_id, start_time=start_time, limit=len(entities) * 2)
for entity_id, entity_db_id_str in entities:
records = raw_data.get(entity_db_id_str)
if records:
_, latest_value = records[0]
state_machine.set(entity_id=entity_id, new_state=latest_value, ...)

重启后前端立刻能看到上次的值,等新数据轮询进来再自然更新。用户感知不到重启。

回过头看

这个实体系统花了大概三周,但省下来的调试时间远不止三周。之前出了问题要翻 DeviceManager、自动化引擎、SSE 推送三个地方找数据流,现在顺着 EventBus 的事件链一路跟下去就行。

最大的教训是那个 True == 1 的坑——找了两天才定位到。Python 的动态类型在 IoT 场景里真是个定时炸弹,传感器值和布尔状态在底层都是数字,但语义完全不同。类型严格一点,bug 少一堆。

欢迎关注我的其它发布渠道