Agent 流式输出的 SSE 事件设计

最早搞流式的时候图省事,只丢 content 增量出去,前端那边就是个慢慢变长的文本框。结果上线第一天,工单就开始飘——“是不是卡了?”“怎么没反应?”

其实后端忙得脚不沾地,正跑工具呢,可前端两眼一抹黑啥都不知道。这事儿一查就明白了:用户需要的不是”最终结果什么时候来”,而是”你现在到底在干嘛”。

后来把事件类型一拆四:

1
2
3
4
EVENT_ROUTING = "routing"           # 路由分类结果
EVENT_TOOL_STATUS = "tool_status" # 工具调用状态
EVENT_TOOL_RESULT = "tool_result" # 工具执行结果
EVENT_CONTENT = "content" # 正文内容

routing:先吼一嗓子,我知道你要啥了

请求一进门,小模型一分类完,立马给前端吐一个 routing 事件。前端拿到就能甩一行”正在查询设备…“上去,至少屏幕上动起来了:

1
2
3
4
5
6
7
8
9
yield {
"type": "routing",
"data": {
"category": "device_management",
"confidence": 0.92,
"reasoning": "用户询问设备状态",
"secondary_category": None,
},
}

这事儿技术上没什么了不起的,但心理效果立竿见影——首字节延迟从”等大模型出第一个 token”变成”等小模型分类完”,一个数量级的差距。用户那种”是不是死机了”的焦虑,立马就没了。

tool_status:工具的开始和收工

工具调用拆成两个事件,前端拿去切 loading 动画:

1
2
3
4
5
6
7
8
9
yield {
"type": "tool_status",
"data": {"name": "list_devices", "status": "calling"},
}

yield {
"type": "tool_status",
"data": {"name": "list_devices", "status": "completed"},
}

为啥不合成一个事件、带个执行时长?因为工具有时候真的会跑很久。比如说让它”统计过去 7 天所有设备的趋势”——这一拉就是好几秒,前端总得在 calling 状态下持续转圈,不然用户又得开始疑神疑鬼。

tool_result:原料给你,自己看着办

工具跑完,原始结果直接甩出去。前端要是想自己渲染(比如把设备列表搞成表格),完全不用等模型用人话复述一遍:

1
2
3
4
5
6
7
8
yield {
"type": "tool_result",
"data": {
"name": "list_devices",
"success": True,
"data": {"devices": [...]},
},
}

content:大家最熟悉的逐字流

最后才是大家常见的那个——一个 token 一个 token 往外吐:

1
2
3
4
5
6
7
8
yield {
"type": "content",
"data": {"content": "当前系统中共有"},
}
yield {
"type": "content",
"data": {"content": " 12 个设备"},
}

一次完整对话长啥样

来个例子,用户问”现在有哪些设备在线”,SSE 流大概是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
POST /v1/chat/completions
{
"model": "smart-agent",
"messages": [{"role": "user", "content": "现在有哪些设备在线?"}],
"stream": true
}

data: {"type":"routing","data":{"category":"device_management","confidence":0.95}}

data: {"type":"tool_status","data":{"name":"list_devices","status":"calling"}}

data: {"type":"tool_result","data":{"name":"list_devices","success":true,"data":{...}}}

data: {"type":"tool_status","data":{"name":"list_devices","status":"completed"}}

data: {"type":"content","data":{"content":"当前系统中共有"}}

data: {"type":"content","data":{"content":" 12 个设备,其中 8 个在线"}}

data: {"type":"content","data":{"content":",4 个离线。"}}

整个过程像看比赛实况——分类、调工具、出结果、播报,每一步都看得见。

思考链怎么薅出来

Qwen3、DeepSeek 这一类模型都支持思考链,但 LangChain 的封装就有点折腾人了——有时候挂在 content 列表里某个 reasoning block,有时候又躲在 additional_kwargs.reasoning_content 里。

讲究的方法是几个位置都翻一遍,谁有就要谁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@classmethod
def _extract_stream_chunk_deltas(cls, chunk) -> Dict[str, str]:
content_delta = cls._message_content_to_text(getattr(chunk, "content", ""))
thinking_parts = []

raw_content = getattr(chunk, "content", None)
if isinstance(raw_content, list):
for block in raw_content:
if not isinstance(block, dict):
continue
block_type = str(block.get("type", "")).lower()
if block_type in {"reasoning", "thinking", "reasoning_content"}:
text = cls._extract_reasoning_text(block)
if text:
thinking_parts.append(text)

for container in (
getattr(chunk, "additional_kwargs", None),
getattr(chunk, "response_metadata", None),
):
if not isinstance(container, dict):
continue
for key in ("reasoning_content", "thinking", "reasoning"):
if key in container:
text = cls._extract_reasoning_text(container.get(key))
if text:
thinking_parts.append(text)

return {
"content_delta": content_delta,
"thinking_delta": "".join(thinking_parts),
}

薅出来之后单独发一个 thinking 事件:

1
2
3
4
yield {
"type": "thinking",
"data": {"content": "用户询问设备状态,我需要调用 list_devices 工具..."},
}

前端可以折叠成一个小抽屉,不混进正文里——不然模型一句话还没说完,思考过程已经把屏幕填满了。

并发别忘了管

不管的话很容易出事。几个用户一起来一个大任务,本地模型服务直接吃完显存。所以前面挡了个 Semaphore,超了就排队等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
from enum import Enum

class RequestStatus(str, Enum):
QUEUED = "queued"
PROCESSING = "processing"
TOOL_CALLING = "tool_calling"
COMPLETED = "completed"
FAILED = "failed"

class QueueManager:
def __init__(self, max_concurrent: int = 10):
self._semaphore = asyncio.Semaphore(max_concurrent)
self._active_requests: Dict[str, RequestTask] = {}

async def submit(self, request_id: str, handler) -> RequestTask:
task = RequestTask(request_id=request_id)
self._active_requests[request_id] = task

async def _execute():
async with self._semaphore:
task.status = RequestStatus.PROCESSING
try:
result = await handler(task)
task.result = result
task.status = RequestStatus.COMPLETED
except Exception as e:
task.error = str(e)
task.status = RequestStatus.FAILED

asyncio.create_task(_execute())
return task

排队过程其实也可以单独发个事件给前端,告诉用户”你排第 3 位呢”。还没做。

FastAPI 这边

接口本身没啥花活。几个 header 倒是要盯紧了:

  • Cache-Control: no-cache:别被中间层缓存了
  • X-Accel-Buffering: no:部署在 nginx 后面的必须加,不然 SSE 流会被缓冲住,一卡半天
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/v1/chat/completions")
async def chat_completions(request: ChatRequest):
if request.stream:
return StreamingResponse(
stream_generator(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
result = await agent.chat(request.messages)
return result

async def stream_generator(request):
async for event in agent.chat_stream(request.messages):
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"

X-Accel-Buffering: no 这条我反复栽过——本地开发好好的,一上 nginx 反代就开始诡异卡顿。每次都得花十分钟想起来,再补上。

前端解析

事件类型一多,前端就得做分发。最朴素的写法就是一个 switch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
const eventSource = new EventSource('/v1/chat/completions');

eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}

const data = JSON.parse(event.data);

switch (data.type) {
case 'routing':
showStatus(`正在${data.data.category}...`);
break;
case 'tool_status':
if (data.data.status === 'calling') {
showLoading(`正在调用 ${data.data.name}...`);
}
break;
case 'tool_result':
if (data.data.success) {
showToolResult(data.data.name, data.data.data);
}
break;
case 'content':
appendContent(data.data.content);
break;
case 'thinking':
showThinking(data.data.content);
break;
}
};

这块代码以后必胖——因为每种工具的结果展示方式都不一样,最后多半会演化出一个”工具名 → 渲染组件”的注册表。

不过那是以后的事。第一版能跑就行,等真的有十来种工具结果再去抽象也来得及。

欢迎关注我的其它发布渠道