最早的规则引擎,就直白的 if 条件 then 动作。演示环境下挺好,一上真设备就开始翻车:
温度 31℃,规则每秒都在判断”大于 30”,风扇一秒一开一关——你听过风扇打电报的声音吗?我听过。 服务重启之后,温度还是 31℃,可规则就是不触发,因为没有”变化”事件 规则 A 执行动作改了某个设备状态,刚好命中规则 B 的条件,规则 B 又改回去——俩规则互相戳着死循环 这篇记一下后来逐步打上去的几个补丁:边沿触发、分支路由、启动补评估、防重入和防抖。
边沿触发 最朴素的修法:只在条件从 False 变 True 的那个瞬间触发,持续 True 不再重复发:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class AutomationEngine : def __init__ (self ): self ._condition_last_results: Dict [str , bool ] = {} def _evaluate_conditions (self, rule: AutomationRule, event: TriggerEvent ) -> bool : if not rule.conditions: return True enabled_conditions = [c for c in rule.conditions if c.enabled] if not enabled_conditions: return True all_pass = True for condition in enabled_conditions: if condition.condition_type == ConditionType.RANGE: if not self ._check_range_condition(condition, event): all_pass = False break elif condition.condition_type == ConditionType.COMPARE: if not self ._check_compare_condition(condition, event): all_pass = False break last_result = self ._condition_last_results.get(rule.rule_id, False ) self ._condition_last_results[rule.rule_id] = all_pass if not all_pass: return False if last_result: return False return True
对应回温度那个例子:
1 2 3 4 5 时间 1: 29℃ → False → 不触发 时间 2: 31℃ → False→True → 触发 时间 3: 32℃ → True→True → 不触发 时间 4: 29℃ → True→False → 重置(下次再过 30 又能触发) 时间 5: 31℃ → False→True → 触发
边沿触发还藏了一层语义:要想再触发一次”开风扇”,温度必须先掉回去。要是用户想要”持续超过 30 就每隔 10 分钟通知一次”,那是另一种东西——周期触发,应该在 trigger 那一侧实现,别塞进条件评估里搅浑。
分支:不只是 if,还有 else 某些条件天然就有两种结果。比如”温度在 25-30 之间”——超出范围其实也是有意义的事件,可能要触发”开警报”,而不是简单的”开制冷”。
所以条件类型里加了几个支持双分支输出的类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class ConditionType (str , Enum): RANGE = "range" COMPARE = "compare" TIME_RANGE = "time_range" LOGIC = "logic" EXPRESSION = "expression" IF_ELSE = "if_else" CONDITION_TRUE_OUTPUT = { ConditionType.RANGE: "inRange" , ConditionType.COMPARE: "result" , ConditionType.IF_ELSE: "true" , } CONDITION_FALSE_OUTPUT = { ConditionType.RANGE: "outRange" , ConditionType.IF_ELSE: "false" , }
分支路由 执行时按条件结果挑出”当前活跃分支上的动作”——其他分支上挂的动作这次就不执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 def _resolve_branched_actions (self, rule, event, enabled_conditions ): condition_results = {} for cond in enabled_conditions: if cond.condition_type == ConditionType.RANGE: condition_results[cond.condition_id] = self ._check_range_condition(cond, event) elif cond.condition_type == ConditionType.COMPARE: condition_results[cond.condition_id] = self ._check_compare_condition(cond, event) branch_map = {} for cid, result in condition_results.items(): cond = cond_by_id.get(cid) ctype = cond.condition_type if cond else ConditionType.EXPRESSION if result: branch_map[cid] = CONDITION_TRUE_OUTPUT.get(ctype, "result" ) else : false_output = CONDITION_FALSE_OUTPUT.get(ctype) if false_output: branch_map[cid] = false_output state_key = str (sorted (branch_map.items())) cache_key = f"branch:{rule.rule_id} " last_state = self ._condition_last_results.get(cache_key) self ._condition_last_results[cache_key] = state_key if state_key == last_state: return None executable = [] for action in rule.actions: branch_info = action.config.get("_condition_branch" ) if not branch_info: executable.append(action) continue cid = branch_info.get("condition_id" ) output = branch_info.get("output" ) if cid in branch_map and branch_map[cid] == output: executable.append(action) return executable
注意分支也得做边沿检测——不然就算条件没变,每次触发器一到都会重新发一遍动作。
启动补评估 刚重启完,规则引擎里 _condition_last_results 是空的。这时候要是设备状态本来就已经超阈值了,因为没有”变化”事件,规则不会触发。
结果就是个挺尴尬的场景:你重启服务,本来开着的风扇被关掉之后再也开不回来——除非温度先掉下来再升上去。
补丁是启动后做一次全量扫描:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 def _bootstrap_startup_entity_evaluation (self ): if not self .device_manager: return state_machine = getattr (self .device_manager, "_state_machine" , None ) if not state_machine: return watch_entity_ids = self ._collect_startup_watch_entity_ids() if not watch_entity_ids: return all_states = state_machine.get_all() for state_entity_id, state in all_states.items(): if not state or not getattr (state, "attributes" , None ): continue extra = getattr (state.attributes, "extra" , None ) or {} raw_entity_db_id = extra.get("entity_db_id" ) if raw_entity_db_id is None : continue entity_db_id = int (raw_entity_db_id) if entity_db_id not in watch_entity_ids: continue new_value = getattr (state, "state" , None ) scale = extra.get("scale" , 1.0 ) self ._on_trigger(TriggerEvent( event_type="entity_value_changed" , entity_id=entity_db_id, old_value=None , new_value=new_value, scale=scale, additional_data={"bootstrap" : True }, ))
additional_data={"bootstrap": True} 这个标记给下游一个信号——这条事件是补评估出来的,不是真的状态变化。
举个例子:某些行为(比如发钉钉通知)可能不希望在启动补评估时触发,毕竟没人想半夜被一堆”温度已恢复正常”的通知吵醒——根据这个标记跳过即可。
防重入 规则 A 触发后写了设备 X,X 的状态变化又满足规则 A 自己的条件——经典自激振荡,俩规则你来我往,CPU 开始冒烟。
修法很俗,但有效:执行中的规则 ID 进一个 set,在 set 里的规则跳过新触发:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class AutomationEngine : def __init__ (self ): self ._executing_rules: set = set () def _on_trigger (self, event: TriggerEvent ): with self ._lock: for rule in self .rules.values(): if not rule.enabled: continue if rule.rule_id in self ._executing_rules: continue def _execute_rule_async (self, rule, context, event ): rule_id = rule.rule_id with self ._lock: self ._executing_rules.add(rule_id) try : for action in actions: await self ._execute_action(action, context) finally : with self ._lock: self ._executing_rules.discard(rule_id)
这招只防”规则触发自己”。规则 A → 规则 B → 规则 A 这种隔了一层的循环挡不住——要是真担心,得把执行栈传下去。目前没遇到这种规则配置,先在心里记着这个边界。
防抖和冷却 传感器抖一下,瞬间穿过阈值再回来,就能给你触发一次没意义的开关。Trigger 这一层加个防抖窗口:
1 2 3 4 5 6 7 8 9 10 11 class BaseTrigger : def __init__ (self ): self ._last_trigger_time: float = 0 self ._debounce_until: float = 0 def check_debounce (self ) -> bool : now = time.time() if now < self ._debounce_until: return False self ._debounce_until = now + self .debounce_seconds return True
规则级再加一个冷却时间和每日次数上限,给用户兜底——这样就算逻辑写错了,也不至于一天给某个设备来个上千次开关:
1 2 3 4 5 6 7 8 9 10 11 12 13 class AutomationEngine : def _check_cooldown (self, rule: AutomationRule ) -> bool : stats = self .execution_stats.get(rule.rule_id, {}) last_execution = stats.get('last_execution_at' ) if last_execution is None : return True return time.time() - last_execution >= rule.cooldown_seconds def _check_daily_limit (self, rule: AutomationRule ) -> bool : if rule.daily_limit is None : return True stats = self .execution_stats.get(rule.rule_id, {}) return stats.get('total_executions' , 0 ) < rule.daily_limit
防抖和冷却语义不一样:防抖是”短时间内只算一次”,冷却是”一次执行完之后多久不再执行”。两者都得有。
图编译器 前端有个可视化的节点编辑器——条件节点、动作节点、连接线,用户拖来拖去。后端规则用的是另一套模型(rule、condition、action 三张表)。所以中间需要把节点图”编译”成规则。
主要的活儿,是从连接线里反推每个动作挂在哪个条件的哪个输出端口上,写回 action.config 里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 def _rebuild_condition_branches (self, rule: AutomationRule ): nodes_config = rule.nodes_config if not nodes_config: return connections = nodes_config.get('connections' , []) node_to_condition = {} for cond in rule.conditions: node_id = cond.config.get("_node_id" ) if cond.config else None if node_id: node_to_condition[node_id] = cond.condition_id node_to_action = {} for action in rule.actions: node_id = action.config.get("_node_id" ) if action.config else None if node_id: node_to_action[node_id] = action.action_id for conn in connections: from_node = conn.get('source' ) from_output = conn.get('sourceOutput' ) to_node = conn.get('target' ) if from_node in condition_node_set and to_node in action_node_set: backend_action_id = node_to_action[to_node] backend_condition_id = node_to_condition[from_node] for action in rule.actions: if action.action_id == backend_action_id: action.config["_condition_branch" ] = { "condition_id" : backend_condition_id, "output" : from_output, }
_condition_branch、_node_id 这类带下划线前缀的字段,是后端内部用的标记,前端不会展示——这种约定哪天破了会让你查半天,最好早早写进文档里,免得自己挖坑自己跳。