Modbus RTU 自适应批量读取——从一次串口阻塞事故说起

事情是这样的:现场一台 Modbus 温湿度传感器,地址 0~5 六个寄存器,一帧 read_holding_registers 全读完,多舒服。

结果隔壁厂家塞进来个奇葩从机,地址 0 和地址 100 之间隔了一百个空位,批量读直接超时——更绝的是,同一串口上还挂着另外三台正常的设备,那台超时一卡就是好几秒,别的设备跟着一块儿断线。

那之后我花了一周重写了 Modbus 驱动的读取逻辑,从”一刀切批量”变成了”自适应批量+优雅降级”。这篇把这几个机制拆开讲讲。

先说问题:批量读取不是万能药

Modbus RTU 的 Read Holding Registers 最多一次读 125 个寄存器,看起来很爽。但现实比协议文档残酷:

  1. 地址不连续:有的设备寄存器散落在 0、100、200,中间全是空洞,读 200 个地址只用了 3 个
  2. 部分从机不支持批量:便宜的表只认单寄存器读取,批量请求直接返回异常码
  3. 串口是共享的:同一串口多个从机,一台设备卡住,别的全得等
  4. 写入被读取挡道:用户点”开灯”,后台还在慢悠悠轮询二十组寄存器,等拿到锁黄花菜都凉了

所以核心矛盾就一个:怎么在”尽量批量”和”随时能降级”之间找平衡?

智能寄存器分组

第一步,把散落的寄存器按地址排序,能合并的合并,该拆开的拆开。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def _build_smart_groups(self, registers, read_actions, batchable_actions, device_id):
groups = []
sorted_regs = sorted(list(registers), key=lambda r: r.address)

for reg in sorted_regs:
action = _get_read_action(reg)
if not action:
continue

# 设备已经被标记为不支持批量?直接单寄存器
if action not in batchable_actions or not self._check_batch_support(device_id, action):
groups.append({"action": action, "start": reg.address, "count": reg.register_count, "registers": [reg], "single": True})
continue

# 尝试合并到上一组
if groups:
last = groups[-1]
if last["action"] == action and not last.get("single"):
gap = reg.address - (last["start"] + last["count"])
new_count = (reg.address + reg.register_count) - last["start"]
# 间隙 <= 10 且合并后不超过 125,就合并
if 0 <= gap <= self.MAX_ADDRESS_GAP and new_count <= self.MAX_BATCH_REGISTERS:
last["registers"].append(reg)
last["count"] = new_count
merged = True
# 合不了就开新组
if not merged:
groups.append({"action": action, "start": reg.address, "count": reg.register_count, "registers": [reg]})

三个关键参数:

  • MAX_BATCH_REGISTERS = 125:协议上限
  • MAX_ADDRESS_GAP = 10:地址间隙小于这个值就填充合并,省一次请求
  • BATCH_FAIL_THRESHOLD = 3:连续失败这么多次就降级

举个例子,寄存器地址是 0、1、2、5、6、100、101,分组结果就是:

1
2
组1: addr=0, count=7  (0~6 合并,gap=3 填充)
组2: addr=100, count=2 (100~101)

地址 5 和 6 之间没有 gap,0~2 和 5~6 之间 gap 是 2(地址 3、4 是空的),小于 10 所以合并。但 6 和 100 之间 gap 是 94,超了,必须拆开。

自适应降级:失败了就老实一个个读

分组策略再好,也挡不住有些从机就是不给面子。核心思路:先试试批量,失败就当场回退单寄存器,连续失败三次就记住”这家伙不支持批量”,下次直接单读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _check_batch_support(self, device_id, action):
with self._lock:
support = self._device_batch_support.get(device_id, {}).get(action)
return support is not False # None=未知,先试批量

def _mark_batch_fail(self, device_id, action):
with self._lock:
count = self._device_batch_fail_count.get(device_id, {}).get(action, 0) + 1
self._device_batch_fail_count[device_id][action] = count
if count >= self.BATCH_FAIL_THRESHOLD:
self._device_batch_support[device_id][action] = False
logger.warning(f"设备 {device_id} 批量读取({action})连续失败{count}次,已切换为单寄存器模式")
return True # 到达阈值
return False

读取逻辑里的关键分支:

1
2
3
4
5
6
7
8
9
10
result = execute_modbus_action(client, action=group["action"], address=group["start"], count=group["count"], slave=slave)

if result.isError():
# 批量失败,回退到单寄存器
threshold_reached = self._mark_batch_fail(device.id, group["action"])
fallback_data = self._read_single_registers(client, group["registers"], action, slave, device_name, port)
data.update(fallback_data)
else:
# 批量成功,重置失败计数
self._mark_batch_success(device.id, group["action"])

三态设计:None(未知,默认试批量)、True(确认支持)、False(确认不支持)。新设备进来默认 None,先试一次批量,成了就标 True,败了也只败一次——因为回退逻辑保证这一轮数据不会丢。

还有个细节:降级是按 action 类型的。有些设备读保持寄存器支持批量,但读输入寄存器不行。不能一刀切。

写优先:别让轮询挡了用户的指令

Modbus 串口是半双工的,读和写不能同时进行。我们系统里读取是周期轮询,写入是用户触发的。如果读取正在扫二十组寄存器,用户这时候点”开灯”,等读完才能写,体验就是——灯过了好几秒才亮。

解法:写请求打断读循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 读取循环中,每组之间检查
for group in groups:
# 检查有没有写请求在等
write_pending = self._port_write_pending.get(port)
if write_pending and write_pending.is_set():
break # 立即让出锁

io_lock.acquire(timeout=1)
try:
# ... 读取当前组 ...
finally:
io_lock.release() # 每组读完立即释放

# 释放后再检查一次
if write_pending and write_pending.is_set():
break

写入端:

1
2
3
4
5
6
7
8
9
10
11
12
13
def write(self, device, target_register, value):
# 通知读循环让路
write_pending.set()

# 等待 IO 锁,最坏情况 = 一组读取的超时
_write_lock_timeout = self.SERIAL_TIMEOUT * (self.MODBUS_RETRIES + 1) + 0.5
acquired = io_lock.acquire(timeout=_write_lock_timeout)

try:
_do_write(client)
finally:
write_pending.clear() # 写完清除标记
io_lock.release()

关键点:

  1. 读取是逐组获取/释放锁的,不是一把锁锁到底
  2. write_pending 是个 Event,设置后读循环会在组间检查到并主动退出
  3. 写操作的超时上限是 SERIAL_TIMEOUT × (retries+1) + 0.5s,也就是最坏等一组读取完成

用户体验从”等 5 秒”变成了”最多等 0.5 秒”。

串口引用计数:多设备共用一个口

Modbus RTU 的物理现实:一个 RS485 串口可以挂 247 个从机。所以同一个 /dev/ttyUSB0 上可能连着温湿度、继电器、电表,它们共享同一个 ModbusSerialClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def connect(self, device):
port = cached_config.modbus_config.serial
if port in self._clients:
# 串口已打开,直接复用
self._port_ref_count[port] += 1
return True

# 新建客户端
client = ModbusSerialClient(port=port, ...)
if client.connect():
self._clients[port] = client
self._port_ref_count[port] = 1

def disconnect(self, device):
self._port_ref_count[port] -= 1
if self._port_ref_count[port] <= 0:
self._clients[port].close() # 最后一个设备断开才关串口

听起来简单,但配合 IO 锁就有个坑:同一串口上的设备轮询是串行的(Modbus 协议要求),但不同串口的设备可以并行。所以 IO 锁是按端口的,不是全局的。

写去抖:别让继电器打电报

前端有时候手抖,按钮连点两下;或者自动化规则快速重复触发。对于继电器来说,0.5 秒内同一寄存器写同一个值,完全是浪费——更糟的是,有些继电器会真的吸合-断开-再吸合,发出”哒哒哒”的声音。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
WRITE_DEBOUNCE_TIME = 0.5  # 秒

def write(self, device, target_register, value):
write_key = (device.id, target_register.address)
value_hash = hash(str(value))
current_time = time.time()

with self._lock:
if write_key in self._write_debounce_cache:
last_time, last_value_hash = self._write_debounce_cache[write_key]
if current_time - last_time < self.WRITE_DEBOUNCE_TIME and value_hash == last_value_hash:
return True # 重复请求,静默吸收

self._write_debounce_cache[write_key] = (current_time, value_hash)

只去重”同值”的写请求,不同值还是要正常下发。0.5 秒内同一个继电器先写 ON 再写 OFF?那得正常执行,不能吞掉。

离线检测与写复活

设备连续读取失败 5 次,标记为离线。但有意思的是——写成功一次就复活

为什么?因为读取失败可能是寄存器地址不对、从机只支持写不支持读,或者只是响应慢。但如果写命令成功返回了,说明从机在线、通信正常、地址也没错。这时候再把读失败计数清零,让下一轮轮询重新尝试读取,比傻等靠谱得多。

1
2
3
4
5
6
7
if success:
with self._lock:
old_fail = self._device_read_fail_count.get(device.id, 0)
if old_fail > 0:
self._device_read_fail_count[device.id] = 0
self._device_timeout_count[device.id] = 0
logger.info(f"Modbus设备 {device.name} 写入成功,重置离线状态(原连续读取失败 {old_fail} 次)")

几个调参建议

跑了几个月下来,这几个参数比较关键:

参数默认值调参思路
SERIAL_TIMEOUT0.3s9600 波特率下偏紧,115200 可以再压
DEVICE_READ_TIMEOUT15s单设备总超时,超了跳过剩余组
MAX_ADDRESS_GAP10间隙里读到的垃圾值会被丢弃,10 是个安全值
BATCH_FAIL_THRESHOLD3太小容易误判,太大降级太慢
WRITE_DEBOUNCE_TIME0.5s继电器场景够用,电机控制可能要更长

这套机制跑下来,最直观的感受是:现场那种”时好时坏”的从机再也不用人工切模式了。好的时候批量读,快;坏的时候自动降级,稳;后来自己又恢复了,下次重启又会重新试批量——因为状态只在内存里,不落盘。

说白了,工业现场你得假设每个设备都可能不守规矩,然后让系统自己去适应它。

欢迎关注我的其它发布渠道