Streaming 作用:实时反馈 → 提升 UX
- 模式
- updates:Agent 步骤事件(模型请求、工具返回、模型总结)
- messages:LLM token 级流(text/tool_call_chunk)
- custom:工具或业务自定义信息
- 组合:<code>stream_mode=["updates","custom"]</code>
- 实现
- <code>agent.stream(input, stream_mode=...)</code>
- <code>get_stream_writer()</code> 在工具中写自定义流
- 应用
- 进度条、打字机文本、工具执行日志
- 风险
- 需要 LangGraph 执行上下文
- 敏感数据泄露控制
- 多 Agent 时要配置禁用流节点
概述
流式传输对于Agent响应性至关重要。通过逐步显示输出,甚至在完整响应准备好之前就开始显示,流式传输显著改善了用户体验(UX),尤其是在处理大语言模型的延迟问题时。
LangChain Streaming 提供统一接口,将 Agent 运行过程中的模型 tokens、工具输出、用户自定义事件以「边生成边展示」方式推送给前端,显著降低感知延迟 [^stream-doc].核心能力:<code>stream/astream</code> 方法、<code>stream_mode</code>(updates/messages/custom/复合)、<code>get_stream_writer</code> 自定义信号。
PRD 必须指定:需要的流式模式、UI 呈现策略、Tool/模型的可见级别以及在多 Agent 系统中哪些节点可以/需要 streaming。
流式输出的能力
LangChain 的流式传输体系围绕 “实时反馈” 核心目标,设计了 Agent Progress(智能体进度)、LLM Tokens(语言模型令牌)、Custom Updates(自定义更新) 三种模式。三者既相互独立(针对不同输出粒度与场景),又可协同工作(混合模式满足复杂需求),共同构成了覆盖 “流程追踪 - 实时交互 - 业务定制” 的完整流式能力。
按Agent progress进度输出
流式输出支持进度信息的反馈,实时捕获智能体执行过程中每一步的状态更新,让开发者和用户清晰感知任务推进流程,尤其适用于需要追踪工具调用、LLM 思考步骤的场景。
通过 <code>stream</code> 或 <code>astream</code> 方法,并设置<code>stream_mode="updates"</code>,在每个步骤后触发一个事件,以 “事件” 形式输出智能体每一步的结果,从而实现透明化、问题快速定位等。
通过指定 <code>stream_mode="updates"</code>,告诉 LangChain 按 “步骤” 输出更新。立即依然抽象,可以查看代码辅助理解:
from langchain.agents import create_agent
# 1. 定义工具:查询天气的简单函数
def get_weather(city: str) -> str:
"""Get weather for a given city."""
return f"It's always sunny in {city}!"
# 2. 创建智能体:绑定 LLM(示例用 gpt-5-nano)和工具
agent = create_agent(
model="gpt-5-nano", # 需替换为实际可用的 LLM 模型(如 gpt-3.5-turbo)
tools=[get_weather], # 绑定天气查询工具
)
# 3. 启动智能体进度流式传输:stream_mode="updates"
for chunk in agent.stream(
# 用户问题:查询旧金山天气
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="updates", # 关键:启用智能体进度流式传输
):
# 遍历每一步的更新数据
for step, data in chunk.items():
print(f"step: {step}") # 步骤类型(model/tool)
print(f"content: {data['messages'][-1].content_blocks}") # 步骤结果
print("-" * 50)
在设置了<code>stream_mode="updates"</code> 后,可以打印的输出结果如下:
step: model
content: [{'type': 'tool_call', 'name': 'get_weather', 'args': {'city': 'San Francisco'}, 'id': 'call_OW2NYNsNSKhRZpjW0wm2Aszd'}]
step: tools
content: [{'type': 'text', 'text': "It's always sunny in San Francisco!"}]
step: model
content: [{'type': 'text', 'text': 'It's always sunny in San Francisco!'}]
按tokens粒度输出
按照Token输出是目前生成内容的最小单位,类似于模拟 “打字机逐字生成” 的效果,从根本上解决 LLM 响应 latency 导致的用户等待焦虑,显著提升交互体验。
这一处理方式和我们目前常见使用的豆包、DeepSeek类似,当有结果返回的时候即可在页面呈现。LLM Tokens 流式输出分为工具调用阶段和最终回答生成阶段,每个阶段的 Token 格式与含义不同。
Token 拼接逻辑:LangChain 仅输出单个 Token,需开发者在应用层手动拼接 Token 以形成完整内容(如前端需将 <code>Here</code> + <code>'s</code> + <code>what</code> 拼接为 <code>Here's what</code>);部分 LLM 会生成空 Token,需在代码中过滤,避免前端显示空白内容。
Custom updates 自定义粒度输出
Custom Updates(自定义更新) 是自由度最高的流式模式,允许开发者通过代码注入 “自定义信号”(如任务进度、数据加载状态、业务日志等),突破智能体默认步骤(Agent Progress)和 LLM Token(LLM Tokens)的输出限制,适配多样化的业务场景需求。
<code>get_stream_writer</code> 是 LangChain 提供的 “自定义信号发射器”,从 <code>langgraph.config</code> 导入;<code>stream_mode="custom"</code> 是触发 “自定义更新流式传输” 的开关,告诉 LangChain 仅输出用户通过 <code>get_stream_writer</code> 发送的自定义信号。
还是抽象,从代码了解开始:
from langchain.agents import create_agent
# 导入核心工具:get_stream_writer(用于发送自定义信号)
from langgraph.config import get_stream_writer
# 1. 定义工具:在工具内部注入 Custom Updates 信号
def get_weather(city: str) -> str:
"""Get weather for a given city."""
# 步骤1:创建自定义信号写入器
writer = get_stream_writer()
# 步骤2:发送自定义信号(可多次调用 writer,每次发送一个信号)
writer(f"[进度] 开始查询城市 '{city}' 的天气数据...") # 信号1:任务启动
writer(f"[状态] 已连接天气API,等待返回结果...") # 信号2:中间状态
writer(f"[完成] 成功获取 '{city}' 的天气数据!") # 信号3:数据获取完成
# 步骤3:工具核心逻辑(返回天气结果)
return f"It's always sunny in {city}!"
# 2. 创建智能体:绑定 LLM(示例用 claude-sonnet-4)与工具
agent = create_agent(
model="claude-sonnet-4-5-20250929", # 需替换为实际可用的 LLM
tools=[get_weather],
)
# 3. 启动 Custom Updates 流式传输:stream_mode="custom"
for chunk in agent.stream(
# 用户问题:查询旧金山天气
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="custom", # 关键:启用自定义更新流式传输
):
# 输出捕获到的自定义信号(chunk 即 writer 发送的内容)
print(chunk)
[进度] 开始查询城市 'San Francisco' 的天气数据...
[状态] 已连接天气API,等待返回结果...
[完成] 成功获取 'San Francisco' 的天气数据!
该方式比较难理解,PM可更多关注其场景和能力:
- 数据处理进度展示:如 “CSV 文件解析:已处理 200/1000 行”“数据库查询:已返回 50/200 条记录”;
- 外部 API 调用日志:如 “API 请求发送成功(URL: xxx)”“API 响应接收(耗时: 1.8s)”“API 数据解析完成”;
- 业务逻辑状态提示:如 “正在过滤无效订单数据”“已生成报表初稿,正在优化格式”;
- 调试与监控:如 “工具参数校验通过(city: Beijing)”“LLM 调用前准备完成(prompt 长度: 256 tokens)”。
流式输出三种模式的对比
对比和关系
| 对比维度 | Agent Progress(<code>updates</code>) | LLM Tokens(<code>messages</code>) | Custom Updates(<code>custom</code>) |
|---|---|---|---|
| 输出内容 | 智能体每一步的完整结果(如 LLM 工具调用请求、工具返回结果、最终回答) | LLM 生成的单个 Token(如 “San”“ Francisco”“Here's”) | 用户自定义信号(如 “查询中”“50% 进度”“API 调用成功”) |
| 内容粒度 | 粗粒度(完整步骤结果,一次输出一个步骤) | 细粒度(最小语义单位,一次输出一个 Token) | 灵活粒度(可粗可细,如单个进度提示、结构化数据) |
| 内容来源 | 系统自动捕获(无需用户编码) | 系统自动捕获(无需用户编码) | 用户手动注入(需通过 <code>get_stream_writer</code> 编码) |
| 灵活性 | 低(输出格式、内容固定,由系统定义) | 低(仅输出 Token 与元数据,格式固定) | 高(支持文本、数字、字典等任意格式,完全自定义) |
| 关键参数 | <code>stream_mode="updates"</code> | <code>stream_mode="messages"</code> | <code>stream_mode="custom"</code> + <code>get_stream_writer</code> |
| 依赖组件 | 需智能体绑定工具(否则仅输出 LLM 步骤,价值有限) | 需支持流式的 LLM(如 GPT-3.5/4、Claude) | 无强制依赖,仅需在工具 / 智能体钩子中注入代码 |
| 典型场景 | 1. 调试智能体(定位工具调用失败环节)2. 展示任务流程(如 “调用天气工具→获取结果→生成回答”) | 1. 聊天机器人(打字机效果提升交互感)2. 长文本生成(如报告、代码,实时看到内容) | 1. 业务进度展示(如 “数据解析 30/100 行”)2. 自定义日志(如 “API 请求耗时 2.3s”)3. 错误提示(如 “天气 API 调用失败”) |
| 输出示例 | <code>step: tools, content: "It's always sunny in SF"</code> | <code>node: model, content: [{'type': 'text', 'text': 'Here'}]</code> | <code>[进度] 已获取 SF 天气数据,正在生成回答...</code> |
| 优缺点 | ✅ 无需编码,开箱即用❌ 无法定制业务信息 | ✅ 实时交互感强,用户体验好❌ 仅能输出 LLM Token,场景单一 | ✅ 完全贴合业务需求,灵活度最高❌ 需手动编码,有学习成本 |
PM的关注点
在产品设计过程中,需要PM判断模型支持流式输出,然后才是对流式模式的确定。
- Agent Progress 是 “基础监控工具”,无需编码即可让智能体流程可视化;
- LLM Tokens 是 “用户体验增强器”,通过打字机效果消除等待焦虑;
- Custom Updates 是 “业务适配核心”,通过手动编码让流式输出贴合具体场景。
所以,
第一步:明确核心需求
- 若需实时看到 LLM 生成过程(如聊天、长文本)→ 优先选 <code>messages</code>;
- 若需追踪智能体流程 / 调试(如工具调用、步骤故障)→ 优先选 <code>updates</code>;
- 若需展示业务专属状态(如进度、日志)→ 必须选 <code>custom</code>。
第二步:判断是否需要混合模式
- 需求单一(如仅打字机效果)→ 单一模式;
- 需求复杂(如 “流程 + 进度”“交互 + 提示”)→ 混合模式(如 <code>["updates", "custom"]</code>)。
第三步:评估开发成本
- 无编码成本 → <code>updates</code> 或 <code>messages</code>;
- 可接受编码成本(需写 <code>get_stream_writer</code> 逻辑)→ <code>custom</code> 或混合模式。
知识点拆解
| 模块 | 关键信息 | 产品关注 |
|---|---|---|
| Agent Progress(updates) | 每个 Agent 节点执行完就触发事件,内容包括 <code>AIMessage</code>/<code>ToolMessage</code> | 适合展示“模型→工具→模型”流水线,定义 UI 节点标签 |
| LLM tokens(messages) | 按 token 推送模型输出,含 <code>tool_call_chunk</code>、文本块 | 指定打字机效果、token 粒度的最大等待时间 |
| Custom updates | 工具通过 <code>get_stream_writer</code> 推送任意文本,需在 LangGraph 中运行 | 规划工具日志格式,如“正在查询 CRM…”,并与后端约定 throttle |
| 多模式 streaming | <code>stream_mode</code> 接受列表,可同时监听 updates+custom | 需要在前端匹配事件类型,合并展示顺序 |
| stream vs astream | 同步 <code>stream</code> 返回迭代器;<code>astream</code> 适合 async 环境 | 根据部署(同步 API/异步服务器)设计调用方式 |
| 禁用 streaming | 某些模型或节点可关闭 token streaming,避免暴露内部信息 | 在多 Agent 拓扑中说明哪些节点要静默输出 |
| 依赖 | <code>create_agent(...).stream(...)</code>;工具内 streaming 必须在 LangGraph 执行上下文 | 需求需声明是否允许独立调用工具(无 streaming) |
| 日志与审计 | streaming 输出可能包含敏感数据,需要记录/脱敏策略 | PRD 覆盖合规要求,如仅对某些角色开启实时日志 |
[^stream-doc]: LangChain Streaming 官方文档:https://docs.langchain.com/oss/python/langchain/streaming
适用场景(问题 → 方案 → 价值)
- 长文案生成的打字机体验
- 问题:生成千字稿时等待时间长,用户以为卡住。
- 方案:使用 <code>stream_mode="messages"</code> 获取 token 级文本,前端逐条渲染;遇到 Tool 调用则切换提示文案。
- 价值:显著降低感知延迟,用户能提前审阅已生成段落。
- 工具执行进度追踪
- 问题:Agent 调用外部 API(如报表查询)需数秒,需向业务用户解释进度。
- 方案:在工具中调用 <code>get_stream_writer()</code> 推送“查询中/完成”消息,客户端同时监听 <code>stream_mode=["updates","custom"]</code>。
- 价值:透明化执行链路,便于排障,减少重复提问。
常见问题(FAQ)
- <code>updates</code> 与 <code>messages</code> 有何区别?→ <code>updates</code> 强调节点事件,<code>messages</code> 是 token 级;可按需组合。
- <code>astream</code> 必须要 async 环境吗?→ 是,用于异步框架(FastAPI/Quart);同步环境用 <code>stream</code>。
- 工具里使用 <code>get_stream_writer</code> 有限制吗?→ 需在 LangGraph 运行态内,否则无法获取 writer。
- 如何区分不同 stream_mode 的输出?→ 迭代器会返回 (mode,chunk) 或 metadata,前端需根据类型路由。
- 会泄露敏感信息吗?→ streaming 内容与普通响应一致,需在 PRD 中定义屏蔽策略或敏感字段过滤。
- 可以为不同 Agent 设置不同 streaming 吗?→ 可以,某些节点可禁用 token streaming,只保留 updates。
- 多模式 streaming 的顺序如何保证?→ LangChain 按事件发生顺序推送,前端需按 timestamp 排序。
- 如何停止 streaming?→ 在模型配置中禁用 streaming,或在 <code>stream_mode</code> 省略 <code>messages</code>。
- Tool 报错会 streaming 吗?→ 会以 ToolMessage 或 custom 更新形式推送,需求中需说明错误展示方式。
- Streaming 与 logging 冲突吗?→ 不冲突,但需注意重复信息:实时流展示给用户,日志仍需持久化以便追踪。
六、扩展信息
- LangChain Streaming 文档:https://docs.langchain.com/oss/python/langchain/streaming
- LangGraph state/stream API:https://reference.langchain.com/python/langgraph/graphs/#langgraph.graph.state.CompiledStateGraph.stream
- 多 Agent Streaming 控制:https://docs.langchain.com/oss/python/langchain/multi-agent