最早搞流式的时候图省事,只丢 content 增量出去,前端那边就是个慢慢变长的文本框。结果上线第一天,工单就开始飘——“是不是卡了?”“怎么没反应?”
其实后端忙得脚不沾地,正跑工具呢,可前端两眼一抹黑啥都不知道。这事儿一查就明白了:用户需要的不是”最终结果什么时候来”,而是”你现在到底在干嘛”。
后来把事件类型一拆四:
1 2 3 4 EVENT_ROUTING = "routing" EVENT_TOOL_STATUS = "tool_status" EVENT_TOOL_RESULT = "tool_result" EVENT_CONTENT = "content"
routing:先吼一嗓子,我知道你要啥了 请求一进门,小模型一分类完,立马给前端吐一个 routing 事件。前端拿到就能甩一行”正在查询设备…“上去,至少屏幕上动起来了:
1 2 3 4 5 6 7 8 9 yield { "type" : "routing" , "data" : { "category" : "device_management" , "confidence" : 0.92 , "reasoning" : "用户询问设备状态" , "secondary_category" : None , }, }
这事儿技术上没什么了不起的,但心理效果立竿见影——首字节延迟从”等大模型出第一个 token”变成”等小模型分类完”,一个数量级的差距。用户那种”是不是死机了”的焦虑,立马就没了。
工具调用拆成两个事件,前端拿去切 loading 动画:
1 2 3 4 5 6 7 8 9 yield { "type" : "tool_status" , "data" : {"name" : "list_devices" , "status" : "calling" }, }yield { "type" : "tool_status" , "data" : {"name" : "list_devices" , "status" : "completed" }, }
为啥不合成一个事件、带个执行时长?因为工具有时候真的会跑很久。比如说让它”统计过去 7 天所有设备的趋势”——这一拉就是好几秒,前端总得在 calling 状态下持续转圈,不然用户又得开始疑神疑鬼。
工具跑完,原始结果直接甩出去。前端要是想自己渲染(比如把设备列表搞成表格),完全不用等模型用人话复述一遍:
1 2 3 4 5 6 7 8 yield { "type" : "tool_result" , "data" : { "name" : "list_devices" , "success" : True , "data" : {"devices" : [...]}, }, }
content:大家最熟悉的逐字流 最后才是大家常见的那个——一个 token 一个 token 往外吐:
1 2 3 4 5 6 7 8 yield { "type" : "content" , "data" : {"content" : "当前系统中共有" }, }yield { "type" : "content" , "data" : {"content" : " 12 个设备" }, }
一次完整对话长啥样 来个例子,用户问”现在有哪些设备在线”,SSE 流大概是这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 POST /v1/chat/completions { "model": "smart-agent", "messages": [{"role": "user", "content": "现在有哪些设备在线?"}], "stream": true } data: {"type":"routing","data":{"category":"device_management","confidence":0.95}} data: {"type":"tool_status","data":{"name":"list_devices","status":"calling"}} data: {"type":"tool_result","data":{"name":"list_devices","success":true,"data":{...}}} data: {"type":"tool_status","data":{"name":"list_devices","status":"completed"}} data: {"type":"content","data":{"content":"当前系统中共有"}} data: {"type":"content","data":{"content":" 12 个设备,其中 8 个在线"}} data: {"type":"content","data":{"content":",4 个离线。"}}
整个过程像看比赛实况——分类、调工具、出结果、播报,每一步都看得见。
思考链怎么薅出来 Qwen3、DeepSeek 这一类模型都支持思考链,但 LangChain 的封装就有点折腾人了——有时候挂在 content 列表里某个 reasoning block,有时候又躲在 additional_kwargs.reasoning_content 里。
讲究的方法是几个位置都翻一遍,谁有就要谁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @classmethod def _extract_stream_chunk_deltas (cls, chunk ) -> Dict [str , str ]: content_delta = cls._message_content_to_text(getattr (chunk, "content" , "" )) thinking_parts = [] raw_content = getattr (chunk, "content" , None ) if isinstance (raw_content, list ): for block in raw_content: if not isinstance (block, dict ): continue block_type = str (block.get("type" , "" )).lower() if block_type in {"reasoning" , "thinking" , "reasoning_content" }: text = cls._extract_reasoning_text(block) if text: thinking_parts.append(text) for container in ( getattr (chunk, "additional_kwargs" , None ), getattr (chunk, "response_metadata" , None ), ): if not isinstance (container, dict ): continue for key in ("reasoning_content" , "thinking" , "reasoning" ): if key in container: text = cls._extract_reasoning_text(container.get(key)) if text: thinking_parts.append(text) return { "content_delta" : content_delta, "thinking_delta" : "" .join(thinking_parts), }
薅出来之后单独发一个 thinking 事件:
1 2 3 4 yield { "type" : "thinking" , "data" : {"content" : "用户询问设备状态,我需要调用 list_devices 工具..." }, }
前端可以折叠成一个小抽屉,不混进正文里——不然模型一句话还没说完,思考过程已经把屏幕填满了。
并发别忘了管 不管的话很容易出事。几个用户一起来一个大任务,本地模型服务直接吃完显存。所以前面挡了个 Semaphore,超了就排队等:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import asynciofrom enum import Enumclass RequestStatus (str , Enum): QUEUED = "queued" PROCESSING = "processing" TOOL_CALLING = "tool_calling" COMPLETED = "completed" FAILED = "failed" class QueueManager : def __init__ (self, max_concurrent: int = 10 ): self ._semaphore = asyncio.Semaphore(max_concurrent) self ._active_requests: Dict [str , RequestTask] = {} async def submit (self, request_id: str , handler ) -> RequestTask: task = RequestTask(request_id=request_id) self ._active_requests[request_id] = task async def _execute (): async with self ._semaphore: task.status = RequestStatus.PROCESSING try : result = await handler(task) task.result = result task.status = RequestStatus.COMPLETED except Exception as e: task.error = str (e) task.status = RequestStatus.FAILED asyncio.create_task(_execute()) return task
排队过程其实也可以单独发个事件给前端,告诉用户”你排第 3 位呢”。还没做。
FastAPI 这边 接口本身没啥花活。几个 header 倒是要盯紧了:
Cache-Control: no-cache:别被中间层缓存了X-Accel-Buffering: no:部署在 nginx 后面的必须加,不然 SSE 流会被缓冲住,一卡半天1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 from fastapi import FastAPIfrom fastapi.responses import StreamingResponseimport json app = FastAPI()@app.post("/v1/chat/completions" ) async def chat_completions (request: ChatRequest ): if request.stream: return StreamingResponse( stream_generator(request), media_type="text/event-stream" , headers={ "Cache-Control" : "no-cache" , "Connection" : "keep-alive" , "X-Accel-Buffering" : "no" , }, ) result = await agent.chat(request.messages) return resultasync def stream_generator (request ): async for event in agent.chat_stream(request.messages): yield f"data: {json.dumps(event, ensure_ascii=False )} \n\n" yield "data: [DONE]\n\n"
X-Accel-Buffering: no 这条我反复栽过——本地开发好好的,一上 nginx 反代就开始诡异卡顿。每次都得花十分钟想起来,再补上。
前端解析 事件类型一多,前端就得做分发。最朴素的写法就是一个 switch:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 const eventSource = new EventSource ('/v1/chat/completions' ); eventSource.onmessage = (event ) => { if (event.data === '[DONE]' ) { eventSource.close (); return ; } const data = JSON .parse (event.data ); switch (data.type ) { case 'routing' : showStatus (`正在${data.data.category} ...` ); break ; case 'tool_status' : if (data.data .status === 'calling' ) { showLoading (`正在调用 ${data.data.name} ...` ); } break ; case 'tool_result' : if (data.data .success ) { showToolResult (data.data .name , data.data .data ); } break ; case 'content' : appendContent (data.data .content ); break ; case 'thinking' : showThinking (data.data .content ); break ; } };
这块代码以后必胖——因为每种工具的结果展示方式都不一样,最后多半会演化出一个”工具名 → 渲染组件”的注册表。
不过那是以后的事。第一版能跑就行,等真的有十来种工具结果再去抽象也来得及。