游戏自动化要持续抓屏识别。一开始很自然地写成”主线程里循环抓图 → 推理 → 派发结果”——结果 UI 直接卡成 PPT。
原因不复杂:推理本身要十几毫秒,抓屏在某些情况下能阻塞几十毫秒。主线程一直被这俩活儿占着,UI 哪还有机会响应。
后来拆开了——推理线程在后台跑,主线程只读最新结果。这篇就讲讲这个引擎里几个关键设计的来龙去脉。NMS 相关的内容写在另一篇里了,这里专心聊线程模型。
推理在独立线程
主线程能做的只有一件事:拿一个最新的推理结果引用。所有的截屏、推理、回调,全都丢到后台线程里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| import threading from time import sleep from typing import Callable, List
class YoloInferenceEngine: def __init__(self, device): self._device = device self._engine = None self._latest_results = None
self._infer_callback_list: List[Callable] = [] self._capture_failure_callback_list: List[Callable] = []
self.__flag_loop = False self.__flag_pause = False
self.__action_lock = threading.Lock() self.__result_write_lock = threading.Lock()
def start(self): with self.__action_lock: if self.__flag_loop: return False self.__flag_pause = False self.__flag_loop = True self._capture_thread = threading.Thread( target=self._inference_loop, daemon=True, ) self._capture_thread.start() return True
def stop(self): with self.__action_lock: if not self.__flag_loop: return False self.__flag_pause = False self.__flag_loop = False self._capture_thread.join(timeout=3) return True
def pause(self): with self.__action_lock: if self.__flag_pause: return False self.__flag_pause = True return True
def resume(self): with self.__action_lock: if not self.__flag_pause: return False self.__flag_pause = False return True
|
daemon=True 是 GUI 应用必备——主进程退出的时候,可不希望被推理线程拽住卡半天。join(timeout=3) 是个保底,实际上 daemon 模式下不 join 也没事。
pause/resume 这一对方法,是为了切换模型时不必整个 stop 再 start。后面会用到。
推理循环本体
写得很直白——只在没暂停且能抓到帧的情况下做推理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| def _inference_loop(self): while self.__flag_loop: if self.__flag_pause: sleep(0.1) continue
try: frame = self._device.capture() except Exception as e: self._exec_capture_failure_callback(e) self.__flag_loop = False return
if frame is None or frame.size <= 0: sleep(0.1) continue
results = self._engine( frame, conf_threshold=0.6, agnostic_nms_groups=self._agnostic_nms_groups, )
with self.__result_write_lock: self._latest_results = Yolo_Results(results, frame)
self._exec_infer_callback()
self.__flag_loop = False
|
抓帧失败是个比较严重的事——通常是设备掉了(虚拟摄像头被关、游戏退出)。这种时候直接终止循环比空转更合适,所以走的是 return 而不是 continue。回调那边可以让上层决定要不要重启。
空帧(黑屏的过场动画、设备临时无数据)只是 sleep(0.1) continue——不算异常,等下一帧就行。
回调注册
业务层不需要轮询 latest_results——它只需要在新结果出现时被通知一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| def register_infer_callback(self, func: Callable): with self.__action_lock: if func not in self._infer_callback_list: self._infer_callback_list.append(func)
def register_capture_failure_callback(self, func: Callable): with self.__action_lock: if func not in self._capture_failure_callback_list: self._capture_failure_callback_list.append(func)
def _exec_infer_callback(self): for callback in self._infer_callback_list: try: callback(self.latest_frame, self.latest_results) except Exception as e: logger.error(f"推理回调失败: {e}")
def _exec_capture_failure_callback(self, exc: Exception): for callback in self._capture_failure_callback_list: try: callback(exc) except Exception as callback_exc: logger.error(f"截图失败回调失败: {callback_exc}")
|
每个回调单独 try/except 包起来——一个回调炸了不能拖累其他回调。
这是 callback 列表的常见坑。第一版没包,结果遇到过一次:某个调试用的可视化回调抛了异常,正常业务回调全部不执行。当场一脸黑线。
业务侧用起来:
1 2 3 4 5 6 7 8 9
| engine = YoloInferenceEngine(device)
def on_inference_result(frame, results): for box, score, class_id in zip(results.boxes, results.scores, results.class_ids): if score > 0.8: print(f"检测到: {class_id}, 置信度: {score:.2f}")
engine.register_infer_callback(on_inference_result) engine.start()
|
状态机
四个状态:Stopped、Running、Paused、(Failed)。最后一个其实只是 Stopped + capture_failure callback——没单独定义。
1 2 3 4 5 6 7
| start() Stopped ---------> Running ^ | | stop() | pause() | v +--------- Paused <---+ resume()
|
只读访问器全部加锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @property def running(self) -> bool: with self.__action_lock: return self.__flag_loop
@property def is_pause(self) -> bool: with self.__action_lock: return self.__flag_pause
@property def latest_frame(self): try: return self._latest_results.frame except Exception: return None
@property def latest_results(self): try: return self._latest_results except Exception: return None
|
注意 latest_frame/latest_results 这俩没拿 __result_write_lock——这里有个隐患:推理线程正在写的瞬间被读,可能拿到半成品对象。
不过实际上因为 Python 的引用赋值是原子的,self._latest_results = Yolo_Results(...) 这一行的执行,读端要么拿到旧引用要么拿到新引用,不会出现”半个对象”。
但前提是 Yolo_Results 一旦构造就不可变——这是个约定,靠这个约定不需要加锁。约定要是哪天破了,那就好玩了。
模型热切换
游戏不同界面(剧情 UI、培育 UI、考试 UI)用的 YOLO 模型不同。切换时要把当前推理打断,加载新模型,再恢复:
1 2 3 4 5 6 7 8 9 10 11
| def load_model(self, model_type: str = YoloModelType.BASE_UI): if self.__flag_loop: self.pause()
with self.__action_lock: self._engine = YoloModelFromONNX(config.model_config[model_type]) self._model_type = model_type self._agnostic_nms_groups = self._build_skill_card_nms_group()
if self.__flag_loop: self.resume()
|
要是直接 self._engine = YoloModelFromONNX(...) 不暂停,推理线程可能正在用旧的 _engine 跑——切换瞬间会冒出来一些”半新半旧”的怪结果。pause() → 改 _engine → resume(),把这个空窗堵上。
不过这里还有个小漏洞:pause 内部只是设 flag,真正生效得等推理线程跑到下一次循环检查到 flag。所以 pause 返回后立马去切模型,理论上还能撞上一次正在跑的推理。
讲究的做法是等推理线程进入 pause 状态再切,但目前用户感知不到这个 race,先记着,没动。