IoT 自动化规则的边沿触发和分支执行

最早的规则引擎,就直白的 if 条件 then 动作。演示环境下挺好,一上真设备就开始翻车:

  • 温度 31℃,规则每秒都在判断”大于 30”,风扇一秒一开一关——你听过风扇打电报的声音吗?我听过。
  • 服务重启之后,温度还是 31℃,可规则就是不触发,因为没有”变化”事件
  • 规则 A 执行动作改了某个设备状态,刚好命中规则 B 的条件,规则 B 又改回去——俩规则互相戳着死循环

这篇记一下后来逐步打上去的几个补丁:边沿触发、分支路由、启动补评估、防重入和防抖。

边沿触发

最朴素的修法:只在条件从 False 变 True 的那个瞬间触发,持续 True 不再重复发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class AutomationEngine:
def __init__(self):
self._condition_last_results: Dict[str, bool] = {}

def _evaluate_conditions(self, rule: AutomationRule, event: TriggerEvent) -> bool:
if not rule.conditions:
return True

enabled_conditions = [c for c in rule.conditions if c.enabled]
if not enabled_conditions:
return True

all_pass = True
for condition in enabled_conditions:
if condition.condition_type == ConditionType.RANGE:
if not self._check_range_condition(condition, event):
all_pass = False
break
elif condition.condition_type == ConditionType.COMPARE:
if not self._check_compare_condition(condition, event):
all_pass = False
break

last_result = self._condition_last_results.get(rule.rule_id, False)
self._condition_last_results[rule.rule_id] = all_pass

if not all_pass:
return False

if last_result:
# 持续满足,跳过
return False

# False → True,触发
return True

对应回温度那个例子:

1
2
3
4
5
时间 1: 29℃ → False        → 不触发
时间 2: 31℃ → False→True → 触发
时间 3: 32℃ → True→True → 不触发
时间 4: 29℃ → True→False → 重置(下次再过 30 又能触发)
时间 5: 31℃ → False→True → 触发

边沿触发还藏了一层语义:要想再触发一次”开风扇”,温度必须先掉回去。要是用户想要”持续超过 30 就每隔 10 分钟通知一次”,那是另一种东西——周期触发,应该在 trigger 那一侧实现,别塞进条件评估里搅浑。

分支:不只是 if,还有 else

某些条件天然就有两种结果。比如”温度在 25-30 之间”——超出范围其实也是有意义的事件,可能要触发”开警报”,而不是简单的”开制冷”。

所以条件类型里加了几个支持双分支输出的类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ConditionType(str, Enum):
RANGE = "range"
COMPARE = "compare"
TIME_RANGE = "time_range"
LOGIC = "logic"
EXPRESSION = "expression"
IF_ELSE = "if_else"

# True / False 各自走哪个输出端口
CONDITION_TRUE_OUTPUT = {
ConditionType.RANGE: "inRange",
ConditionType.COMPARE: "result",
ConditionType.IF_ELSE: "true",
}
CONDITION_FALSE_OUTPUT = {
ConditionType.RANGE: "outRange",
ConditionType.IF_ELSE: "false",
}

分支路由

执行时按条件结果挑出”当前活跃分支上的动作”——其他分支上挂的动作这次就不执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def _resolve_branched_actions(self, rule, event, enabled_conditions):
condition_results = {}
for cond in enabled_conditions:
if cond.condition_type == ConditionType.RANGE:
condition_results[cond.condition_id] = self._check_range_condition(cond, event)
elif cond.condition_type == ConditionType.COMPARE:
condition_results[cond.condition_id] = self._check_compare_condition(cond, event)

branch_map = {}
for cid, result in condition_results.items():
cond = cond_by_id.get(cid)
ctype = cond.condition_type if cond else ConditionType.EXPRESSION
if result:
branch_map[cid] = CONDITION_TRUE_OUTPUT.get(ctype, "result")
else:
false_output = CONDITION_FALSE_OUTPUT.get(ctype)
if false_output:
branch_map[cid] = false_output

# 分支也要做边沿检测,否则会持续触发
state_key = str(sorted(branch_map.items()))
cache_key = f"branch:{rule.rule_id}"
last_state = self._condition_last_results.get(cache_key)
self._condition_last_results[cache_key] = state_key

if state_key == last_state:
return None

executable = []
for action in rule.actions:
branch_info = action.config.get("_condition_branch")
if not branch_info:
executable.append(action) # 不挂分支的动作总是执行
continue
cid = branch_info.get("condition_id")
output = branch_info.get("output")
if cid in branch_map and branch_map[cid] == output:
executable.append(action)

return executable

注意分支也得做边沿检测——不然就算条件没变,每次触发器一到都会重新发一遍动作。

启动补评估

刚重启完,规则引擎里 _condition_last_results 是空的。这时候要是设备状态本来就已经超阈值了,因为没有”变化”事件,规则不会触发。

结果就是个挺尴尬的场景:你重启服务,本来开着的风扇被关掉之后再也开不回来——除非温度先掉下来再升上去。

补丁是启动后做一次全量扫描:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def _bootstrap_startup_entity_evaluation(self):
if not self.device_manager:
return

state_machine = getattr(self.device_manager, "_state_machine", None)
if not state_machine:
return

watch_entity_ids = self._collect_startup_watch_entity_ids()
if not watch_entity_ids:
return

all_states = state_machine.get_all()

for state_entity_id, state in all_states.items():
if not state or not getattr(state, "attributes", None):
continue

extra = getattr(state.attributes, "extra", None) or {}
raw_entity_db_id = extra.get("entity_db_id")
if raw_entity_db_id is None:
continue

entity_db_id = int(raw_entity_db_id)
if entity_db_id not in watch_entity_ids:
continue

new_value = getattr(state, "state", None)
scale = extra.get("scale", 1.0)

self._on_trigger(TriggerEvent(
event_type="entity_value_changed",
entity_id=entity_db_id,
old_value=None, # 启动时没有旧值
new_value=new_value,
scale=scale,
additional_data={"bootstrap": True},
))

additional_data={"bootstrap": True} 这个标记给下游一个信号——这条事件是补评估出来的,不是真的状态变化。

举个例子:某些行为(比如发钉钉通知)可能不希望在启动补评估时触发,毕竟没人想半夜被一堆”温度已恢复正常”的通知吵醒——根据这个标记跳过即可。

防重入

规则 A 触发后写了设备 X,X 的状态变化又满足规则 A 自己的条件——经典自激振荡,俩规则你来我往,CPU 开始冒烟。

修法很俗,但有效:执行中的规则 ID 进一个 set,在 set 里的规则跳过新触发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class AutomationEngine:
def __init__(self):
self._executing_rules: set = set()

def _on_trigger(self, event: TriggerEvent):
with self._lock:
for rule in self.rules.values():
if not rule.enabled:
continue
if rule.rule_id in self._executing_rules:
continue
# ... 匹配逻辑

def _execute_rule_async(self, rule, context, event):
rule_id = rule.rule_id
with self._lock:
self._executing_rules.add(rule_id)
try:
for action in actions:
await self._execute_action(action, context)
finally:
with self._lock:
self._executing_rules.discard(rule_id)

这招只防”规则触发自己”。规则 A → 规则 B → 规则 A 这种隔了一层的循环挡不住——要是真担心,得把执行栈传下去。目前没遇到这种规则配置,先在心里记着这个边界。

防抖和冷却

传感器抖一下,瞬间穿过阈值再回来,就能给你触发一次没意义的开关。Trigger 这一层加个防抖窗口:

1
2
3
4
5
6
7
8
9
10
11
class BaseTrigger:
def __init__(self):
self._last_trigger_time: float = 0
self._debounce_until: float = 0

def check_debounce(self) -> bool:
now = time.time()
if now < self._debounce_until:
return False
self._debounce_until = now + self.debounce_seconds
return True

规则级再加一个冷却时间和每日次数上限,给用户兜底——这样就算逻辑写错了,也不至于一天给某个设备来个上千次开关:

1
2
3
4
5
6
7
8
9
10
11
12
13
class AutomationEngine:
def _check_cooldown(self, rule: AutomationRule) -> bool:
stats = self.execution_stats.get(rule.rule_id, {})
last_execution = stats.get('last_execution_at')
if last_execution is None:
return True
return time.time() - last_execution >= rule.cooldown_seconds

def _check_daily_limit(self, rule: AutomationRule) -> bool:
if rule.daily_limit is None:
return True
stats = self.execution_stats.get(rule.rule_id, {})
return stats.get('total_executions', 0) < rule.daily_limit

防抖和冷却语义不一样:防抖是”短时间内只算一次”,冷却是”一次执行完之后多久不再执行”。两者都得有。

图编译器

前端有个可视化的节点编辑器——条件节点、动作节点、连接线,用户拖来拖去。后端规则用的是另一套模型(rule、condition、action 三张表)。所以中间需要把节点图”编译”成规则。

主要的活儿,是从连接线里反推每个动作挂在哪个条件的哪个输出端口上,写回 action.config 里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def _rebuild_condition_branches(self, rule: AutomationRule):
nodes_config = rule.nodes_config
if not nodes_config:
return

connections = nodes_config.get('connections', [])

node_to_condition = {}
for cond in rule.conditions:
node_id = cond.config.get("_node_id") if cond.config else None
if node_id:
node_to_condition[node_id] = cond.condition_id

node_to_action = {}
for action in rule.actions:
node_id = action.config.get("_node_id") if action.config else None
if node_id:
node_to_action[node_id] = action.action_id

for conn in connections:
from_node = conn.get('source')
from_output = conn.get('sourceOutput')
to_node = conn.get('target')

if from_node in condition_node_set and to_node in action_node_set:
backend_action_id = node_to_action[to_node]
backend_condition_id = node_to_condition[from_node]
for action in rule.actions:
if action.action_id == backend_action_id:
action.config["_condition_branch"] = {
"condition_id": backend_condition_id,
"output": from_output,
}

_condition_branch_node_id 这类带下划线前缀的字段,是后端内部用的标记,前端不会展示——这种约定哪天破了会让你查半天,最好早早写进文档里,免得自己挖坑自己跳。

欢迎关注我的其它发布渠道