YOLO 实时推理引擎的线程模型

游戏自动化要持续抓屏识别。一开始很自然地写成”主线程里循环抓图 → 推理 → 派发结果”——结果 UI 直接卡成 PPT。

原因不复杂:推理本身要十几毫秒,抓屏在某些情况下能阻塞几十毫秒。主线程一直被这俩活儿占着,UI 哪还有机会响应。

后来拆开了——推理线程在后台跑,主线程只读最新结果。这篇就讲讲这个引擎里几个关键设计的来龙去脉。NMS 相关的内容写在另一篇里了,这里专心聊线程模型。

推理在独立线程

主线程能做的只有一件事:拿一个最新的推理结果引用。所有的截屏、推理、回调,全都丢到后台线程里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import threading
from time import sleep
from typing import Callable, List

class YoloInferenceEngine:
def __init__(self, device):
self._device = device
self._engine = None
self._latest_results = None

self._infer_callback_list: List[Callable] = []
self._capture_failure_callback_list: List[Callable] = []

self.__flag_loop = False
self.__flag_pause = False

self.__action_lock = threading.Lock()
self.__result_write_lock = threading.Lock()

def start(self):
with self.__action_lock:
if self.__flag_loop:
return False
self.__flag_pause = False
self.__flag_loop = True
self._capture_thread = threading.Thread(
target=self._inference_loop,
daemon=True,
)
self._capture_thread.start()
return True

def stop(self):
with self.__action_lock:
if not self.__flag_loop:
return False
self.__flag_pause = False
self.__flag_loop = False
self._capture_thread.join(timeout=3)
return True

def pause(self):
with self.__action_lock:
if self.__flag_pause:
return False
self.__flag_pause = True
return True

def resume(self):
with self.__action_lock:
if not self.__flag_pause:
return False
self.__flag_pause = False
return True

daemon=True 是 GUI 应用必备——主进程退出的时候,可不希望被推理线程拽住卡半天。join(timeout=3) 是个保底,实际上 daemon 模式下不 join 也没事。

pause/resume 这一对方法,是为了切换模型时不必整个 stop 再 start。后面会用到。

推理循环本体

写得很直白——只在没暂停且能抓到帧的情况下做推理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def _inference_loop(self):
while self.__flag_loop:
if self.__flag_pause:
sleep(0.1)
continue

try:
frame = self._device.capture()
except Exception as e:
self._exec_capture_failure_callback(e)
self.__flag_loop = False
return

if frame is None or frame.size <= 0:
sleep(0.1)
continue

results = self._engine(
frame,
conf_threshold=0.6,
agnostic_nms_groups=self._agnostic_nms_groups,
)

with self.__result_write_lock:
self._latest_results = Yolo_Results(results, frame)

self._exec_infer_callback()

self.__flag_loop = False

抓帧失败是个比较严重的事——通常是设备掉了(虚拟摄像头被关、游戏退出)。这种时候直接终止循环比空转更合适,所以走的是 return 而不是 continue。回调那边可以让上层决定要不要重启。

空帧(黑屏的过场动画、设备临时无数据)只是 sleep(0.1) continue——不算异常,等下一帧就行。

回调注册

业务层不需要轮询 latest_results——它只需要在新结果出现时被通知一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def register_infer_callback(self, func: Callable):
with self.__action_lock:
if func not in self._infer_callback_list:
self._infer_callback_list.append(func)

def register_capture_failure_callback(self, func: Callable):
with self.__action_lock:
if func not in self._capture_failure_callback_list:
self._capture_failure_callback_list.append(func)

def _exec_infer_callback(self):
for callback in self._infer_callback_list:
try:
callback(self.latest_frame, self.latest_results)
except Exception as e:
logger.error(f"推理回调失败: {e}")

def _exec_capture_failure_callback(self, exc: Exception):
for callback in self._capture_failure_callback_list:
try:
callback(exc)
except Exception as callback_exc:
logger.error(f"截图失败回调失败: {callback_exc}")

每个回调单独 try/except 包起来——一个回调炸了不能拖累其他回调。

这是 callback 列表的常见坑。第一版没包,结果遇到过一次:某个调试用的可视化回调抛了异常,正常业务回调全部不执行。当场一脸黑线。

业务侧用起来:

1
2
3
4
5
6
7
8
9
engine = YoloInferenceEngine(device)

def on_inference_result(frame, results):
for box, score, class_id in zip(results.boxes, results.scores, results.class_ids):
if score > 0.8:
print(f"检测到: {class_id}, 置信度: {score:.2f}")

engine.register_infer_callback(on_inference_result)
engine.start()

状态机

四个状态:Stopped、Running、Paused、(Failed)。最后一个其实只是 Stopped + capture_failure callback——没单独定义。

1
2
3
4
5
6
7
         start()
Stopped ---------> Running
^ |
| stop() | pause()
| v
+--------- Paused <---+
resume()

只读访问器全部加锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@property
def running(self) -> bool:
with self.__action_lock:
return self.__flag_loop

@property
def is_pause(self) -> bool:
with self.__action_lock:
return self.__flag_pause

@property
def latest_frame(self):
try:
return self._latest_results.frame
except Exception:
return None

@property
def latest_results(self):
try:
return self._latest_results
except Exception:
return None

注意 latest_frame/latest_results 这俩没拿 __result_write_lock——这里有个隐患:推理线程正在写的瞬间被读,可能拿到半成品对象。

不过实际上因为 Python 的引用赋值是原子的,self._latest_results = Yolo_Results(...) 这一行的执行,读端要么拿到旧引用要么拿到新引用,不会出现”半个对象”。

但前提是 Yolo_Results 一旦构造就不可变——这是个约定,靠这个约定不需要加锁。约定要是哪天破了,那就好玩了。

模型热切换

游戏不同界面(剧情 UI、培育 UI、考试 UI)用的 YOLO 模型不同。切换时要把当前推理打断,加载新模型,再恢复:

1
2
3
4
5
6
7
8
9
10
11
def load_model(self, model_type: str = YoloModelType.BASE_UI):
if self.__flag_loop:
self.pause()

with self.__action_lock:
self._engine = YoloModelFromONNX(config.model_config[model_type])
self._model_type = model_type
self._agnostic_nms_groups = self._build_skill_card_nms_group()

if self.__flag_loop:
self.resume()

要是直接 self._engine = YoloModelFromONNX(...) 不暂停,推理线程可能正在用旧的 _engine 跑——切换瞬间会冒出来一些”半新半旧”的怪结果。pause() → 改 _engineresume(),把这个空窗堵上。

不过这里还有个小漏洞:pause 内部只是设 flag,真正生效得等推理线程跑到下一次循环检查到 flag。所以 pause 返回后立马去切模型,理论上还能撞上一次正在跑的推理。

讲究的做法是等推理线程进入 pause 状态再切,但目前用户感知不到这个 race,先记着,没动。

欢迎关注我的其它发布渠道