事情是这样的:现场一台 Modbus 温湿度传感器,地址 0~5 六个寄存器,一帧 read_holding_registers 全读完,多舒服。
结果隔壁厂家塞进来个奇葩从机,地址 0 和地址 100 之间隔了一百个空位,批量读直接超时——更绝的是,同一串口上还挂着另外三台正常的设备,那台超时一卡就是好几秒,别的设备跟着一块儿断线。
那之后我花了一周重写了 Modbus 驱动的读取逻辑,从”一刀切批量”变成了”自适应批量+优雅降级”。这篇把这几个机制拆开讲讲。
先说问题:批量读取不是万能药 Modbus RTU 的 Read Holding Registers 最多一次读 125 个寄存器,看起来很爽。但现实比协议文档残酷:
地址不连续 :有的设备寄存器散落在 0、100、200,中间全是空洞,读 200 个地址只用了 3 个部分从机不支持批量 :便宜的表只认单寄存器读取,批量请求直接返回异常码串口是共享的 :同一串口多个从机,一台设备卡住,别的全得等写入被读取挡道 :用户点”开灯”,后台还在慢悠悠轮询二十组寄存器,等拿到锁黄花菜都凉了所以核心矛盾就一个:怎么在”尽量批量”和”随时能降级”之间找平衡?
智能寄存器分组 第一步,把散落的寄存器按地址排序,能合并的合并,该拆开的拆开。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def _build_smart_groups (self, registers, read_actions, batchable_actions, device_id ): groups = [] sorted_regs = sorted (list (registers), key=lambda r: r.address) for reg in sorted_regs: action = _get_read_action(reg) if not action: continue if action not in batchable_actions or not self ._check_batch_support(device_id, action): groups.append({"action" : action, "start" : reg.address, "count" : reg.register_count, "registers" : [reg], "single" : True }) continue if groups: last = groups[-1 ] if last["action" ] == action and not last.get("single" ): gap = reg.address - (last["start" ] + last["count" ]) new_count = (reg.address + reg.register_count) - last["start" ] if 0 <= gap <= self .MAX_ADDRESS_GAP and new_count <= self .MAX_BATCH_REGISTERS: last["registers" ].append(reg) last["count" ] = new_count merged = True if not merged: groups.append({"action" : action, "start" : reg.address, "count" : reg.register_count, "registers" : [reg]})
三个关键参数:
MAX_BATCH_REGISTERS = 125:协议上限MAX_ADDRESS_GAP = 10:地址间隙小于这个值就填充合并,省一次请求BATCH_FAIL_THRESHOLD = 3:连续失败这么多次就降级举个例子,寄存器地址是 0、1、2、5、6、100、101,分组结果就是:
1 2 组1: addr=0, count=7 (0~6 合并,gap=3 填充) 组2: addr=100, count=2 (100~101)
地址 5 和 6 之间没有 gap,0~2 和 5~6 之间 gap 是 2(地址 3、4 是空的),小于 10 所以合并。但 6 和 100 之间 gap 是 94,超了,必须拆开。
自适应降级:失败了就老实一个个读 分组策略再好,也挡不住有些从机就是不给面子。核心思路:先试试批量,失败就当场回退单寄存器,连续失败三次就记住”这家伙不支持批量”,下次直接单读 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def _check_batch_support (self, device_id, action ): with self ._lock: support = self ._device_batch_support.get(device_id, {}).get(action) return support is not False def _mark_batch_fail (self, device_id, action ): with self ._lock: count = self ._device_batch_fail_count.get(device_id, {}).get(action, 0 ) + 1 self ._device_batch_fail_count[device_id][action] = count if count >= self .BATCH_FAIL_THRESHOLD: self ._device_batch_support[device_id][action] = False logger.warning(f"设备 {device_id} 批量读取({action} )连续失败{count} 次,已切换为单寄存器模式" ) return True return False
读取逻辑里的关键分支:
1 2 3 4 5 6 7 8 9 10 result = execute_modbus_action(client, action=group["action" ], address=group["start" ], count=group["count" ], slave=slave)if result.isError(): threshold_reached = self ._mark_batch_fail(device.id , group["action" ]) fallback_data = self ._read_single_registers(client, group["registers" ], action, slave, device_name, port) data.update(fallback_data)else : self ._mark_batch_success(device.id , group["action" ])
三态设计:None(未知,默认试批量)、True(确认支持)、False(确认不支持)。新设备进来默认 None,先试一次批量,成了就标 True,败了也只败一次——因为回退逻辑保证这一轮数据不会丢。
还有个细节:降级是按 action 类型 的。有些设备读保持寄存器支持批量,但读输入寄存器不行。不能一刀切。
写优先:别让轮询挡了用户的指令 Modbus 串口是半双工的,读和写不能同时进行。我们系统里读取是周期轮询,写入是用户触发的。如果读取正在扫二十组寄存器,用户这时候点”开灯”,等读完才能写,体验就是——灯过了好几秒才亮。
解法:写请求打断读循环 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 for group in groups: write_pending = self ._port_write_pending.get(port) if write_pending and write_pending.is_set(): break io_lock.acquire(timeout=1 ) try : finally : io_lock.release() if write_pending and write_pending.is_set(): break
写入端:
1 2 3 4 5 6 7 8 9 10 11 12 13 def write (self, device, target_register, value ): write_pending.set () _write_lock_timeout = self .SERIAL_TIMEOUT * (self .MODBUS_RETRIES + 1 ) + 0.5 acquired = io_lock.acquire(timeout=_write_lock_timeout) try : _do_write(client) finally : write_pending.clear() io_lock.release()
关键点:
读取是逐组 获取/释放锁的,不是一把锁锁到底 write_pending 是个 Event,设置后读循环会在组间 检查到并主动退出写操作的超时上限是 SERIAL_TIMEOUT × (retries+1) + 0.5s,也就是最坏等一组读取完成 用户体验从”等 5 秒”变成了”最多等 0.5 秒”。
串口引用计数:多设备共用一个口 Modbus RTU 的物理现实:一个 RS485 串口可以挂 247 个从机。所以同一个 /dev/ttyUSB0 上可能连着温湿度、继电器、电表,它们共享同一个 ModbusSerialClient。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def connect (self, device ): port = cached_config.modbus_config.serial if port in self ._clients: self ._port_ref_count[port] += 1 return True client = ModbusSerialClient(port=port, ...) if client.connect(): self ._clients[port] = client self ._port_ref_count[port] = 1 def disconnect (self, device ): self ._port_ref_count[port] -= 1 if self ._port_ref_count[port] <= 0 : self ._clients[port].close()
听起来简单,但配合 IO 锁就有个坑:同一串口上的设备轮询是串行的(Modbus 协议要求),但不同串口的设备可以并行。所以 IO 锁是按端口 的,不是全局的。
写去抖:别让继电器打电报 前端有时候手抖,按钮连点两下;或者自动化规则快速重复触发。对于继电器来说,0.5 秒内同一寄存器写同一个值,完全是浪费——更糟的是,有些继电器会真的吸合-断开-再吸合,发出”哒哒哒”的声音。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 WRITE_DEBOUNCE_TIME = 0.5 def write (self, device, target_register, value ): write_key = (device.id , target_register.address) value_hash = hash (str (value)) current_time = time.time() with self ._lock: if write_key in self ._write_debounce_cache: last_time, last_value_hash = self ._write_debounce_cache[write_key] if current_time - last_time < self .WRITE_DEBOUNCE_TIME and value_hash == last_value_hash: return True self ._write_debounce_cache[write_key] = (current_time, value_hash)
只去重”同值”的写请求,不同值还是要正常下发。0.5 秒内同一个继电器先写 ON 再写 OFF?那得正常执行,不能吞掉。
离线检测与写复活 设备连续读取失败 5 次,标记为离线。但有意思的是——写成功一次就复活 。
为什么?因为读取失败可能是寄存器地址不对、从机只支持写不支持读,或者只是响应慢。但如果写命令成功返回了,说明从机在线、通信正常、地址也没错。这时候再把读失败计数清零,让下一轮轮询重新尝试读取,比傻等靠谱得多。
1 2 3 4 5 6 7 if success: with self ._lock: old_fail = self ._device_read_fail_count.get(device.id , 0 ) if old_fail > 0 : self ._device_read_fail_count[device.id ] = 0 self ._device_timeout_count[device.id ] = 0 logger.info(f"Modbus设备 {device.name} 写入成功,重置离线状态(原连续读取失败 {old_fail} 次)" )
几个调参建议 跑了几个月下来,这几个参数比较关键:
参数 默认值 调参思路 SERIAL_TIMEOUT0.3s 9600 波特率下偏紧,115200 可以再压 DEVICE_READ_TIMEOUT15s 单设备总超时,超了跳过剩余组 MAX_ADDRESS_GAP10 间隙里读到的垃圾值会被丢弃,10 是个安全值 BATCH_FAIL_THRESHOLD3 太小容易误判,太大降级太慢 WRITE_DEBOUNCE_TIME0.5s 继电器场景够用,电机控制可能要更长
这套机制跑下来,最直观的感受是:现场那种”时好时坏”的从机再也不用人工切模式了。好的时候批量读,快;坏的时候自动降级,稳;后来自己又恢复了,下次重启又会重新试批量——因为状态只在内存里,不落盘。
说白了,工业现场你得假设每个设备都可能不守规矩,然后让系统自己去适应它。