Streaming | 产品经理学Langchian

内容纲要

Streaming 作用:实时反馈 → 提升 UX

  • 模式
    • updates:Agent 步骤事件(模型请求、工具返回、模型总结)
    • messages:LLM token 级流(text/tool_call_chunk)
    • custom:工具或业务自定义信息
    • 组合:<code>stream_mode=["updates","custom"]</code>
  • 实现
    • <code>agent.stream(input, stream_mode=...)</code>
    • <code>get_stream_writer()</code> 在工具中写自定义流
  • 应用
    • 进度条、打字机文本、工具执行日志
  • 风险
    • 需要 LangGraph 执行上下文
    • 敏感数据泄露控制
    • 多 Agent 时要配置禁用流节点

概述

流式传输对于Agent响应性至关重要。通过逐步显示输出,甚至在完整响应准备好之前就开始显示,流式传输显著改善了用户体验(UX),尤其是在处理大语言模型的延迟问题时。

LangChain Streaming 提供统一接口,将 Agent 运行过程中的模型 tokens、工具输出、用户自定义事件以「边生成边展示」方式推送给前端,显著降低感知延迟 [^stream-doc].核心能力:<code>stream/astream</code> 方法、<code>stream_mode</code>(updates/messages/custom/复合)、<code>get_stream_writer</code> 自定义信号。

PRD 必须指定:需要的流式模式、UI 呈现策略、Tool/模型的可见级别以及在多 Agent 系统中哪些节点可以/需要 streaming。

流式输出的能力

LangChain 的流式传输体系围绕 “实时反馈” 核心目标,设计了 Agent Progress(智能体进度)LLM Tokens(语言模型令牌)Custom Updates(自定义更新) 三种模式。三者既相互独立(针对不同输出粒度与场景),又可协同工作(混合模式满足复杂需求),共同构成了覆盖 “流程追踪 - 实时交互 - 业务定制” 的完整流式能力。

按Agent progress进度输出

流式输出支持进度信息的反馈,实时捕获智能体执行过程中每一步的状态更新,让开发者和用户清晰感知任务推进流程,尤其适用于需要追踪工具调用、LLM 思考步骤的场景。

通过 <code>stream</code> 或 <code>astream</code> 方法,并设置<code>stream_mode="updates"</code>,在每个步骤后触发一个事件,以 “事件” 形式输出智能体每一步的结果,从而实现透明化、问题快速定位等。

通过指定 <code>stream_mode="updates"</code>,告诉 LangChain 按 “步骤” 输出更新。立即依然抽象,可以查看代码辅助理解:

from langchain.agents import create_agent

# 1. 定义工具:查询天气的简单函数
def get_weather(city: str) -&gt; str:
    &quot;&quot;&quot;Get weather for a given city.&quot;&quot;&quot;
    return f&quot;It&#039;s always sunny in {city}!&quot;

# 2. 创建智能体:绑定 LLM(示例用 gpt-5-nano)和工具
agent = create_agent(
    model=&quot;gpt-5-nano&quot;,  # 需替换为实际可用的 LLM 模型(如 gpt-3.5-turbo)
    tools=[get_weather],  # 绑定天气查询工具
)

# 3. 启动智能体进度流式传输:stream_mode=&quot;updates&quot;
for chunk in agent.stream(
    # 用户问题:查询旧金山天气
    {&quot;messages&quot;: [{&quot;role&quot;: &quot;user&quot;, &quot;content&quot;: &quot;What is the weather in SF?&quot;}]},
    stream_mode=&quot;updates&quot;,  # 关键:启用智能体进度流式传输
):
    # 遍历每一步的更新数据
    for step, data in chunk.items():
        print(f&quot;step: {step}&quot;)  # 步骤类型(model/tool)
        print(f&quot;content: {data[&#039;messages&#039;][-1].content_blocks}&quot;)  # 步骤结果
        print(&quot;-&quot; * 50)

在设置了<code>stream_mode="updates"</code> 后,可以打印的输出结果如下:

step: model
content: [{&#039;type&#039;: &#039;tool_call&#039;, &#039;name&#039;: &#039;get_weather&#039;, &#039;args&#039;: {&#039;city&#039;: &#039;San Francisco&#039;}, &#039;id&#039;: &#039;call_OW2NYNsNSKhRZpjW0wm2Aszd&#039;}]

step: tools
content: [{&#039;type&#039;: &#039;text&#039;, &#039;text&#039;: &quot;It&#039;s always sunny in San Francisco!&quot;}]

step: model
content: [{&#039;type&#039;: &#039;text&#039;, &#039;text&#039;: &#039;It&#039;s always sunny in San Francisco!&#039;}]

按tokens粒度输出

按照Token输出是目前生成内容的最小单位,类似于模拟 “打字机逐字生成” 的效果,从根本上解决 LLM 响应 latency 导致的用户等待焦虑,显著提升交互体验。

这一处理方式和我们目前常见使用的豆包、DeepSeek类似,当有结果返回的时候即可在页面呈现。LLM Tokens 流式输出分为工具调用阶段最终回答生成阶段,每个阶段的 Token 格式与含义不同。

Token 拼接逻辑:LangChain 仅输出单个 Token,需开发者在应用层手动拼接 Token 以形成完整内容(如前端需将 <code>Here</code> + <code>'s</code> + <code>what</code> 拼接为 <code>Here's what</code>);部分 LLM 会生成空 Token,需在代码中过滤,避免前端显示空白内容。

Custom updates 自定义粒度输出

Custom Updates(自定义更新) 是自由度最高的流式模式,允许开发者通过代码注入 “自定义信号”(如任务进度、数据加载状态、业务日志等),突破智能体默认步骤(Agent Progress)和 LLM Token(LLM Tokens)的输出限制,适配多样化的业务场景需求。

<code>get_stream_writer</code> 是 LangChain 提供的 “自定义信号发射器”,从 <code>langgraph.config</code> 导入;<code>stream_mode="custom"</code> 是触发 “自定义更新流式传输” 的开关,告诉 LangChain 仅输出用户通过 <code>get_stream_writer</code> 发送的自定义信号。

还是抽象,从代码了解开始:

from langchain.agents import create_agent
# 导入核心工具:get_stream_writer(用于发送自定义信号)
from langgraph.config import get_stream_writer

# 1. 定义工具:在工具内部注入 Custom Updates 信号
def get_weather(city: str) -&gt; str:
    &quot;&quot;&quot;Get weather for a given city.&quot;&quot;&quot;
    # 步骤1:创建自定义信号写入器
    writer = get_stream_writer()

    # 步骤2:发送自定义信号(可多次调用 writer,每次发送一个信号)
    writer(f&quot;[进度] 开始查询城市 &#039;{city}&#039; 的天气数据...&quot;)  # 信号1:任务启动
    writer(f&quot;[状态] 已连接天气API,等待返回结果...&quot;)        # 信号2:中间状态
    writer(f&quot;[完成] 成功获取 &#039;{city}&#039; 的天气数据!&quot;)        # 信号3:数据获取完成

    # 步骤3:工具核心逻辑(返回天气结果)
    return f&quot;It&#039;s always sunny in {city}!&quot;

# 2. 创建智能体:绑定 LLM(示例用 claude-sonnet-4)与工具
agent = create_agent(
    model=&quot;claude-sonnet-4-5-20250929&quot;,  # 需替换为实际可用的 LLM
    tools=[get_weather],
)

# 3. 启动 Custom Updates 流式传输:stream_mode=&quot;custom&quot;
for chunk in agent.stream(
    # 用户问题:查询旧金山天气
    {&quot;messages&quot;: [{&quot;role&quot;: &quot;user&quot;, &quot;content&quot;: &quot;What is the weather in SF?&quot;}]},
    stream_mode=&quot;custom&quot;,  # 关键:启用自定义更新流式传输
):
    # 输出捕获到的自定义信号(chunk 即 writer 发送的内容)
    print(chunk)
[进度] 开始查询城市 &#039;San Francisco&#039; 的天气数据...
[状态] 已连接天气API,等待返回结果...
[完成] 成功获取 &#039;San Francisco&#039; 的天气数据!

该方式比较难理解,PM可更多关注其场景和能力:

  • 数据处理进度展示:如 “CSV 文件解析:已处理 200/1000 行”“数据库查询:已返回 50/200 条记录”;
  • 外部 API 调用日志:如 “API 请求发送成功(URL: xxx)”“API 响应接收(耗时: 1.8s)”“API 数据解析完成”;
  • 业务逻辑状态提示:如 “正在过滤无效订单数据”“已生成报表初稿,正在优化格式”;
  • 调试与监控:如 “工具参数校验通过(city: Beijing)”“LLM 调用前准备完成(prompt 长度: 256 tokens)”。

流式输出三种模式的对比

对比和关系

对比维度 Agent Progress(<code>updates</code>) LLM Tokens(<code>messages</code>) Custom Updates(<code>custom</code>)
输出内容 智能体每一步的完整结果(如 LLM 工具调用请求、工具返回结果、最终回答) LLM 生成的单个 Token(如 “San”“ Francisco”“Here's”) 用户自定义信号(如 “查询中”“50% 进度”“API 调用成功”)
内容粒度 粗粒度(完整步骤结果,一次输出一个步骤) 细粒度(最小语义单位,一次输出一个 Token) 灵活粒度(可粗可细,如单个进度提示、结构化数据)
内容来源 系统自动捕获(无需用户编码) 系统自动捕获(无需用户编码) 用户手动注入(需通过 <code>get_stream_writer</code> 编码)
灵活性 低(输出格式、内容固定,由系统定义) 低(仅输出 Token 与元数据,格式固定) 高(支持文本、数字、字典等任意格式,完全自定义)
关键参数 <code>stream_mode="updates"</code> <code>stream_mode="messages"</code> <code>stream_mode="custom"</code> + <code>get_stream_writer</code>
依赖组件 需智能体绑定工具(否则仅输出 LLM 步骤,价值有限) 需支持流式的 LLM(如 GPT-3.5/4、Claude) 无强制依赖,仅需在工具 / 智能体钩子中注入代码
典型场景 1. 调试智能体(定位工具调用失败环节)2. 展示任务流程(如 “调用天气工具→获取结果→生成回答”) 1. 聊天机器人(打字机效果提升交互感)2. 长文本生成(如报告、代码,实时看到内容) 1. 业务进度展示(如 “数据解析 30/100 行”)2. 自定义日志(如 “API 请求耗时 2.3s”)3. 错误提示(如 “天气 API 调用失败”)
输出示例 <code>step: tools, content: "It's always sunny in SF"</code> <code>node: model, content: [{'type': 'text', 'text': 'Here'}]</code> <code>[进度] 已获取 SF 天气数据,正在生成回答...</code>
优缺点 ✅ 无需编码,开箱即用❌ 无法定制业务信息 ✅ 实时交互感强,用户体验好❌ 仅能输出 LLM Token,场景单一 ✅ 完全贴合业务需求,灵活度最高❌ 需手动编码,有学习成本

PM的关注点

在产品设计过程中,需要PM判断模型支持流式输出,然后才是对流式模式的确定。

  • Agent Progress 是 “基础监控工具”,无需编码即可让智能体流程可视化;
  • LLM Tokens 是 “用户体验增强器”,通过打字机效果消除等待焦虑;
  • Custom Updates 是 “业务适配核心”,通过手动编码让流式输出贴合具体场景。

所以,

第一步:明确核心需求

  • 若需实时看到 LLM 生成过程(如聊天、长文本)→ 优先选 <code>messages</code>;
  • 若需追踪智能体流程 / 调试(如工具调用、步骤故障)→ 优先选 <code>updates</code>;
  • 若需展示业务专属状态(如进度、日志)→ 必须选 <code>custom</code>。

第二步:判断是否需要混合模式

  • 需求单一(如仅打字机效果)→ 单一模式;
  • 需求复杂(如 “流程 + 进度”“交互 + 提示”)→ 混合模式(如 <code>["updates", "custom"]</code>)。

第三步:评估开发成本

  • 无编码成本 → <code>updates</code> 或 <code>messages</code>;
  • 可接受编码成本(需写 <code>get_stream_writer</code> 逻辑)→ <code>custom</code> 或混合模式。

知识点拆解

模块 关键信息 产品关注
Agent Progress(updates) 每个 Agent 节点执行完就触发事件,内容包括 <code>AIMessage</code>/<code>ToolMessage</code> 适合展示“模型→工具→模型”流水线,定义 UI 节点标签
LLM tokens(messages) 按 token 推送模型输出,含 <code>tool_call_chunk</code>、文本块 指定打字机效果、token 粒度的最大等待时间
Custom updates 工具通过 <code>get_stream_writer</code> 推送任意文本,需在 LangGraph 中运行 规划工具日志格式,如“正在查询 CRM…”,并与后端约定 throttle
多模式 streaming <code>stream_mode</code> 接受列表,可同时监听 updates+custom 需要在前端匹配事件类型,合并展示顺序
stream vs astream 同步 <code>stream</code> 返回迭代器;<code>astream</code> 适合 async 环境 根据部署(同步 API/异步服务器)设计调用方式
禁用 streaming 某些模型或节点可关闭 token streaming,避免暴露内部信息 在多 Agent 拓扑中说明哪些节点要静默输出
依赖 <code>create_agent(...).stream(...)</code>;工具内 streaming 必须在 LangGraph 执行上下文 需求需声明是否允许独立调用工具(无 streaming)
日志与审计 streaming 输出可能包含敏感数据,需要记录/脱敏策略 PRD 覆盖合规要求,如仅对某些角色开启实时日志

[^stream-doc]: LangChain Streaming 官方文档:https://docs.langchain.com/oss/python/langchain/streaming

适用场景(问题 → 方案 → 价值)

  1. 长文案生成的打字机体验
    • 问题:生成千字稿时等待时间长,用户以为卡住。
    • 方案:使用 <code>stream_mode="messages"</code> 获取 token 级文本,前端逐条渲染;遇到 Tool 调用则切换提示文案。
    • 价值:显著降低感知延迟,用户能提前审阅已生成段落。
  2. 工具执行进度追踪
    • 问题:Agent 调用外部 API(如报表查询)需数秒,需向业务用户解释进度。
    • 方案:在工具中调用 <code>get_stream_writer()</code> 推送“查询中/完成”消息,客户端同时监听 <code>stream_mode=["updates","custom"]</code>。
    • 价值:透明化执行链路,便于排障,减少重复提问。

常见问题(FAQ)

  1. <code>updates</code> 与 <code>messages</code> 有何区别?→ <code>updates</code> 强调节点事件,<code>messages</code> 是 token 级;可按需组合。
  2. <code>astream</code> 必须要 async 环境吗?→ 是,用于异步框架(FastAPI/Quart);同步环境用 <code>stream</code>。
  3. 工具里使用 <code>get_stream_writer</code> 有限制吗?→ 需在 LangGraph 运行态内,否则无法获取 writer。
  4. 如何区分不同 stream_mode 的输出?→ 迭代器会返回 (mode,chunk) 或 metadata,前端需根据类型路由。
  5. 会泄露敏感信息吗?→ streaming 内容与普通响应一致,需在 PRD 中定义屏蔽策略或敏感字段过滤。
  6. 可以为不同 Agent 设置不同 streaming 吗?→ 可以,某些节点可禁用 token streaming,只保留 updates。
  7. 多模式 streaming 的顺序如何保证?→ LangChain 按事件发生顺序推送,前端需按 timestamp 排序。
  8. 如何停止 streaming?→ 在模型配置中禁用 streaming,或在 <code>stream_mode</code> 省略 <code>messages</code>。
  9. Tool 报错会 streaming 吗?→ 会以 ToolMessage 或 custom 更新形式推送,需求中需说明错误展示方式。
  10. Streaming 与 logging 冲突吗?→ 不冲突,但需注意重复信息:实时流展示给用户,日志仍需持久化以便追踪。

六、扩展信息

滚动至顶部