【精通】SmartWriter v2.3:流式写作引擎 — Streaming 五种模式深度实战 目录前言技术背景与演进逻辑Streaming 五种模式深度解析values:完整状态快照流updates:增量状态变更流messages:LLM Token 级流式输出custom:自定义进度事件流debug:全链路调试追踪高级模式:checkpoints 与 tasks流式写作 UX 架构技术优缺点与适用场景实战落地:SmartWriter v2.3 完整代码生产避坑经验全文总结本期专栏更新说明专栏推荐参考资料前言核心痛点:在前面的文章中,SmartWriter 已经拥有了 Researcher、Writer、Editor、FactChecker 四个专业 Agent,通过 Subgraph + A2A 协议实现了模块化协作。但用户在使用时仍然面对一个尴尬的体验——提交写作任务后,页面"卡死"30 秒,然后一次性吐出整篇文章。这不是现代 AI 产品应有的体验。本文解决的核心问题:如何让多 Agent 写作流程的每一个步骤都实时可见,如何让用户感受到 Token 级逐字生成的流畅体验,以及如何在长流程中实现可控的中断和恢复。前置知识:需要掌握 LangGraph StateGraph 基础、Agent 基本概念,以及本系列 v2.0-v2.1 的多 Agent 架构知识。系列阶段:精通篇第 4 篇(总第 20 篇)。第一季共 24 篇,贯穿案例为 SmartWriter。收获能力:读完本文,你将掌握 LangGraph 五种 Streaming 模式的底层原理与 Best Practice,能独立实现包括 Token 级流式输出、自定义进度事件、多模式混合流、背压控制在内的完整流式写作引擎。依赖版本:langgraph = 1.1.0, langchain = 1.0.0(2026 年 6 月最新稳定版)技术背景与演进逻辑从阻塞到流式:Agent 交互体验的三次跃迁AI 应用的用户体验经历了三个阶段:阶段模式用户感知SmartWriter 版本阻塞式invoke() 等待全部完成白屏 30-60s,一次性出结果v0.1 - v2.0节点级流式stream_mode=“updates”看到 Agent 切换,但 LLM 输出仍是块状v2.0 - v2.2Token 级流式stream_mode=[“messages”,“custom”]逐字生成 + 进度条 + 可中断v2.3阻塞式架构的本质问题不是"慢",而是"信息不对称"——用户不知道系统在做什么,无法判断是正常运行还是卡死,也无法在中间阶段介入修正。LangGraph Streaming 架构全景LangGraph 的流式系统建立在 Pregel/BSP 执行模型之上。每个节点执行完毕后,Pregel Runtime 会触发一次"超级步"(Superstep),将状态更新广播给订阅者。Streaming API 正是这个广播机制的可编程接口:[LangGraph Streaming 架构] [Pregel Runtime] │ ├── 超级步 1:节点 A 执行 │ │ │ ├──→ values emit: 完整 State 快照 │ ├──→ updates emit: {A 产生的变更} │ ├──→ messages emit: LLM Token 序列 │ ├──→ custom emit: 用户自定义事件 │ └──→ debug emit: 执行追踪信息 │ ├── 超级步 2:节点 B 执行 │ └──→ (同上) │ └── 超级步 N:END关键设计原则:Streaming 不是"事后记录",而是"执行过程中同步推送"。每个 emit 都是在节点产生输出的同一瞬间触发,这意味着客户端可以在 LLM 还在生成第一个 token 时就开始渲染。v2 统一格式LangGraph 1.1 引入的version="v2"统一了所有模式的输出格式。无论使用单模式还是多模式,每个 chunk 都是相同的StreamPart结构:{"type":"values"|"updates"|"messages"|"custom"|"checkpoints"|"tasks"|"debug","ns":(),# namespace 元组,子图事件时非空"data":...,# 各模式的具体负载}这解决了 v1 格式的"模式猜谜"问题——v1 中单模式返回裸数据,多模式返回(mode, data)元组,子图模式返回(namespace, mode, data)三元组。v2 统一后,客户端只需if chunk["type"] == "messages"即可,类型推断也完整支持。Streaming 五种模式深度解析values:完整状态快照流stream_mode="values"在每次超级步后推送完整的当前 State。它不是只推送变化的部分,而是整个 State 字典的全部键值对。底层机制:Pregel 每个超级步结束时,Runtime 调用state_snapshot()获取当前 State 的深拷贝,然后通过values通道 emit。适用场景:需要前端展示完整写作状态的面板式 UI(左侧大纲 + 中间正文 + 右侧引用)状态检查点记录(每次 emit 都可以持久化为一个检查点)调试时查看完整上下文数据量特征:每次推送的字节数 = State 中所有字段的序列化大小之和。如果 State 中有messages列表且不断增长,推送量会线性增加。对于长文档写作(State 可能包含数十万 token 的消息历史),values 模式的数据开销需要重点关注。# values 模式:每次拿到完整 Stateforchunkingraph.stream({"topic":"GPU推理安全"},stream_mode="values",version="v2",):ifchunk["type"]=="values":state=chunk["data"]print(f"当前进度:草稿{len(state.get('draft',''))}字符")print(f"引用来源:{len(state.get('sources',[]))}条")updates:增量状态变更流stream_mode="updates"只推送每个节点返回的状态更新字典。相比 values,它只传输变化量。底层机制:节点函数执行完毕后,Runtime 捕获其返回的dict。在 Reducer 合并之前,这个原始返回值先通过updates通道 emit。这意味着你看到的是"这个节点想改什么",而不是"State 最终变成了什么"——如果 Reducer 做了合并/去重/排序,updates 中的值可能与最终 State 不一致。与 values 的对比实验:初始 State: {"counter": 0, "items": []} 节点 A 返回: {"counter": 1, "items": ["a"]} values emit → {"counter": 1, "items": ["a"]} updates emit → {"counter": 1, "items": ["a"]} 节点 B 返回: {"items": ["b"]} # items 使用 add_messages Reducer → ["a", "b"] values emit → {"counter": 1, "items": ["a", "b"]} updates emit → {"items": ["b"]} ← 注意:只看到节点返回的,不是合并后的!适用场景:Dashboard 式的进度指示器(“Writer 刚完成了 500 字的段落”)前端增量 DOM 更新(只替换变化的区域,不重新渲染整个页面)窄带宽环境(移动端、IoT)forchunkingraph.stream({"topic":"A2A协议"},stream_mode="updates",version="v2",):ifchunk["type"]=="updates":fornode_name,updateinchunk["data"].items():forkey,valueinupdate.items():preview=str(value)[:80]print(f"[{node_name}]{key}:{preview}...")messages:LLM Token 级流式输出stream_mode="messages"是最直接影响用户体验的模式。它将 LLM 的输出以 Token 为单位逐个推送,实现"打字机效果"。底层机制:LangChain ChatModel 的.stream()方法产生的 token 迭代器被 LangGraph 捕获,每个 token 封装为(message_chunk, metadata)元组推送到 messages 通道。关键是:即使你使用.invoke()而非.stream()调用 LLM,messages 模式仍然生效——LangGraph 在内部自动将 invoke 转为 stream。metadata 关键字段:字段含义示例值langgraph_node产生该 token 的节点名"writer_section"tagsLLM 实例的标签["joke"]ls_model_name模型名称"claude-sonnet-4-6"thread_id当前 thread"article-001"标签过滤:当图中有多个 LLM 实例时,可以为它们打上不同标签,前端据此区分"正文生成"和"内部推理":# 正文模型:标签 "writer"writer_model=init_chat_model("claude-sonnet-4-6",tags=["writer"])# 内部推理模型:标签 "nostream"(静默,不推送到 messages 流)think_model=init_chat_model("claude-haiku-4-5-20251001",tags=["nostream"])forchunkingraph.stream(inputs,stream_mode="messages",version="v2"):ifchunk["type"]=="messages":msg,metadata=chunk["data"]if"writer"inmetadata.get("tags",[]):print(msg.content,end="",flush=True)# 只显示正文多节点过滤:在多 Agent 写作场景中,你可能只想流式展示 Writer 的输出,而不显示 Researcher 或 Editor 的 LLM 调用:forchunkingraph.stream(inputs,stream_mode="messages",version="v2"):ifchunk["type"]=="messages":msg,metadata=chunk["data"]ifmetadata.get("langgraph_node")=="write_section":# 只流式输出 Writer 的"写正文"节点yieldmsg.contentcustom:自定义进度事件流stream_mode="custom"允许你在节点或工具内部通过get_stream_writer()推送任意结构化数据。这是连接"后端执行状态"与"前端 UI 更新"的桥梁。核心 API:fromlanggraph.configimportget_stream_writerdefwriting_node(state:WriterState):writer=get_stream_writer()writer({"type":"progress","stage":"outline","percent":0})outline=generate_outline(state)writer({"type":"progress","stage":"writing","percent":10})fori,sectioninenumerate(state["sections"]):draft=write_section(section)writer({"type":"progress","stage":"writing","percent":10+int(80*(i+1)/len(state["sections"