项目做到中期,设备数据在 DeviceManager 里进进出出,自动化引擎要监听变化,前端要实时推送,InfluxDB 要存历史——所有人都在直接读 DeviceManager 的字典,耦合得一塌糊涂。
有天我翻 Home Assistant 的源码,发现它有个很优雅的抽象:一切皆实体。温湿度传感器是实体,继电器开关是实体,连”今天的日出时间”都是实体。上层逻辑只认实体 ID 和状态值,不关心底层是 Modbus 还是 MQTT。
我想了想,这个东西我得有一个。
四个单例撑起的骨架
实体系统拆成四个核心组件,全部单例:
- StateMachine:存所有实体的当前状态,是整个系统的单一真相源
- EventBus:事件发布/订阅,状态变了就广播
- EntityRegistry:实体注册表,管生命周期和索引
- ServiceRegistry:服务注册表,
turn_on、turn_off这类动作走这里调
数据流长这样:
1 | |
所有消费者通过 EventBus 订阅,谁也不直接调谁。
StateMachine:那个 NaN 陷阱
StateMachine 的核心就一个 set() 方法——设状态、判变化、发事件。但”判变化”这三个字,坑比想象的深。
1 | |
第一个坑:Modbus 传感器偶尔返回 NaN(比如断线时寄存器值是 0x7FFF)。Python 里 NaN != NaN 返回 True,如果直接拿 == 判,系统会认为状态每秒都在变——事件风暴,前端疯狂刷新,InfluxDB 疯狂写入。
第二个坑:继电器状态是 True/False,某个寄存器原始值恰好是 1/0。Python 觉得 True == 1,但语义上一个表示”开”,一个表示”1 摄氏度”。如果判等通过了,状态变化事件就不触发,前端就卡在那儿不变。
set() 方法里还有个细节——状态没变时不发事件,只更新 last_reported 时间戳。这避免了轮询场景下每秒重复广播相同值的问题:
1 | |
STATE_REPORTED 和 STATE_CHANGED 是两个事件。前端可以只订阅 STATE_CHANGED(值变了才更新 UI),InfluxDB 可以订阅 STATE_REPORTED(每次轮询都记录,不管变没变)。
EventBus:线程池分发,别让慢监听器拖死发布者
EventBus 的 fire() 方法在触发监听器时,不是直接调,而是扔进线程池:
1 | |
为什么?因为有个血泪教训:早期 InfluxDB 写入卡了 3 秒,直接把 StateMachine 的 set() 堵住,导致所有设备的轮询数据排着队进不来,整个系统假死。
扔进线程池之后,InfluxDB 写入再慢也不影响状态更新和事件广播。监听器里抛异常也不怕,_safe_call_listener 会兜住。
另外支持三种监听模式:
listen(event_type, fn):常规订阅listen_once(event_type, fn):触发一次自动注销(启动时等SYSTEM_STARTED事件特别好用)listen_all(fn):所有事件都收(调试和审计用)
还有个 100 条的事件历史缓冲,出问题时可以回查最后 100 条事件。
EntityRegistry:三层索引
实体注册表不只是个字典,它维护了三层索引:
1 | |
unique_id_index:同一设备重新注册时,通过(platform, unique_id)找到已有条目,更新而不是重复创建device_entities:设备下线时一键清理所有关联实体platform_entities:卸载某个协议驱动时,清理该平台的所有实体
ServiceRegistry:bridge 兜底写入
服务注册表内置了 turn_on、turn_off、toggle 三个通用服务。但有个问题:有些实体只在 StateMachine 里有状态(从数据库预加载的),没有在 EntityRegistry 里注册对应的 Entity 实例——也就没有 turn_on() 方法。
怎么办?两层写入策略:
1 | |
_is_writable_via_bridge 会检查状态机里该实体的 read_only 属性。不是只读的,就交给 bridge 的 CALL_SERVICE 事件监听器去兜底写。
这套双路径的设计,让新实体系统和旧 DeviceManager 可以和平共处。新注册的实体走 ServiceRegistry 的标准路径,老数据走 bridge 兜底,谁也不碍着谁。
Bridge:连接新旧世界的胶水
EntitySystemBridge 是整个实体系统最难写的部分——它要把新系统的事件翻译成旧系统听得懂的语言。
1 | |
_on_state_changed 里有个关键操作:从实体 ID device_3_temperature 中解析出 device_id=3,再从状态属性的 extra 字段取 entity_db_id,最后调用 automation_engine.on_entity_changed()。这样自动化引擎根本不需要知道实体系统的存在。
启动预热:InfluxDB 回填
系统重启后,实体状态全是 None,前端一看满屏空白——这体验太差了。
解法分两步:
- 从 SQLite 加载所有实体定义,在 StateMachine 中创建初始状态(值为
None) - 从 InfluxDB 查最近 24 小时的数据,把最新值填回去
1 | |
重启后前端立刻能看到上次的值,等新数据轮询进来再自然更新。用户感知不到重启。
回过头看
这个实体系统花了大概三周,但省下来的调试时间远不止三周。之前出了问题要翻 DeviceManager、自动化引擎、SSE 推送三个地方找数据流,现在顺着 EventBus 的事件链一路跟下去就行。
最大的教训是那个 True == 1 的坑——找了两天才定位到。Python 的动态类型在 IoT 场景里真是个定时炸弹,传感器值和布尔状态在底层都是数字,但语义完全不同。类型严格一点,bug 少一堆。