XBSTACK Tech Image - XBSTACK

LangGraph Observability 实战:如何追踪每个 Agent 的决策路径?

Release Date
2026-06-14
Reading Time
23分钟
Impact Factor
2,611
langgraph
observability
trace
debugging
ai-agent
multi-agent
tool-call
Xiaobai's Note / 实验室笔记

这篇文章记录了我在贵阳实验室的实战过程。我坚信,在技术下行的时代,程序员唯一的护城河就是通过 AI 建立属于自己的数字资产。

本文解决的问题

  • LangGraph 多智能体协同在生产环境中陷入局部死循环或死锁时,该如何快速定位是哪个节点在作祟?
  • 如何区分与设计 thread_id、run_id、trace_id 的生命周期,并在日志框架中实现全链路打通?
  • 在 Supervisor 节点中,如何留存其路由的真实决策概率、理由和风险评估,避免决策过程成为完全不可审计的黑盒?
  • 工具调用日志要如何兼顾排障需要与数据合规,既防泄密又防审计信息丢失?
  • 当遭遇长延迟时,如何将运行耗时精确拆解到 LLM、原子工具、网络队列和人工审核四个层面?

适合谁读

  • 正在为多智能体系统排查模型跑偏、工具报错未捕获以及状态混乱的后端开发团队。
  • 需要满足数据合规与审计要求,无法直接接入 LangSmith 商业云服务的全栈开发工程师。
  • 负责大规模 AI Agent 稳定运营、资源消耗监控和性能调优的 SRE 人员与架构师。

一、为什么 LangGraph 多智能体系统必须做 Observability?

Demo 阶段跑通只需靠运气,生产环境下的多智能体系统必须拥有绝对透明的执行决策追踪。

写这篇文的时候,外面雷声隆隆,暴雨已经下了一整天,我桌上的这台蜗牛星际改造的飞牛 NAS 风扇发出轻微的振动声,搞得水杯里的水一晃一晃的。刚刚跑这段代码时,后台又因为三方 API 的 rate limit 报了 429 报错,搞得我脑仁疼。今天就彻底把这套 Agent 可观测性的设计摊开,聊聊我踩过的那些血泪坑。

在多智能体系统进入生产环境后,最难排查的问题往往不是程序直接崩溃,而是你不知道它为什么会这么跑。以下这几种真实物理场景,我相信每个折腾过 Agent 落地的人都遇到过:

  • 为什么 Supervisor 突然选了金融 Worker,而不是日常查询 Worker?模型在想什么?
  • 为什么 Worker 生成的工具参数少了一个必填项?这个参数是从哪一步的历史消息里幻觉出来的?
  • 本次请求调用了三次外部搜索工具,究竟是哪一次调用发生了网络抖动超时?
  • 为什么一个简单的财务问题,Agent 跑了足足三十秒才吐出答案?它在哪个图节点里死循环了?
  • 用户 A 的历史对话上下文,有没有因为图的状态泄露或串到了用户 B 的 thread 里?

如果你的系统里只有简单的 print 打印或者默认的 python 日志,当多个并发请求涌入时,所有的日志会交错在一起,变成一坨毫无逻辑的乱码。没有 trace_id、run_id 和 thread_id,你就像在一个完全漆黑的房间里调试电路,只能靠猜。

下午我刚去羽毛球馆打了两小时双打,扣杀杀得手腕发酸,回来看到控制台上满屏没有结构化的 debug log,突然发现,打球接不准好歹能看清球的物理轨迹,而没有可观测性的多智能体执行过程,连轨迹都看不见。多智能体系统一旦上线,必须拥有完整的链路追踪,才能在用户报障的第一时间,用一个 trace_id 定位到具体出错的物理节点。

二、一次 LangGraph 执行链路应该追踪什么?

规范的 trace 日志应当是一个结构化的实体,其最小元数据字段集必须能将用户的单次会话与底层的每个原子节点、工具调用以及网络耗时建立唯一绑定。

我们在做可观测性设计时,首先要定义一个标准的数据结构。一次完整的 LangGraph 运行链条中,无论你使用的是 ClickHouse、Elasticsearch 还是本地的结构化 JSON 文件,每一条 Trace 记录都必须包含以下物理字段:

  • trace_id: 唯一标识一次完整调用链路,横跨所有节点。
  • run_id: 标识一次具体的执行尝试。如果系统在某处发生超时并触发重试,重试的那一次调用将生成新的 run_id,但它们共享同一个 trace_id。
  • thread_id: 对应用户的一条可恢复会话或任务链,由 Checkpointer 持久化。
  • user_id / session_id: 关联业务层面的用户与会话,用于安全合规与多租户数据隔离审计。
  • node_name: 明确当前日志是由哪一个图节点产生的。
  • worker_name: 如果当前节点是一个 Worker Agent,记录其智能体标识。
  • tool_name: 如果在节点中触发了具体的原子工具,记录该工具名称。
  • input_summary: 经过脱敏的输入摘要,严禁直接记录大段敏感原文。
  • output_summary: 经过脱敏的输出摘要或执行结果状态哈希。
  • status: 执行状态,必须是 success、failed 或 pending 之一。
  • error_code: 结构化的自定义系统错误码,如 TOOL_TIMEOUT、RATE_LIMITED。
  • latency_ms: 该单元操作的绝对耗时,以毫秒为单位。
  • created_at: 精确到毫秒的物理时间戳,使用 UTC 时间。

上个月我在排查一个复杂的财报审计 Agent 时,就是因为日志里少记了 run_id,结果系统因为网络波动触发自动重试时,两次执行的工具日志在 Elasticsearch 里完全错乱,我花了整整一个下午才定位到原来是第一次执行的残留长连接在搞鬼。有了这套结构化字段,你就可以直接在 Kibana 里画出树状的执行调用链。

三、thread_id、run_id、trace_id 怎么区分?

厘清 thread_id、run_id 与 trace_id 的生命周期是构建 Agent 追踪链的物理基石,它们分别对应长周期任务流、单次执行尝试以及单次模型或工具调用的层级关系。

很多刚上手 LangGraph 的开发者容易混淆这三个 ID,把它们乱填一气,导致后续的恢复和审计工作彻底抓瞎。我们必须从物理层面严格划清它们的边界:

thread_id (会话级,生命周期最长)

└─ run_id (执行级,例如用户点击一次发送或者执行引擎被唤醒一次)

     ├─ trace_id (链路级,本次执行中所有的图节点执行记录)
     │    ├─ node_name (节点级,例如 supervisor, finance_worker)
     │    │    └─ request_id (原子级,例如调用某个具体 Tool Call)
  • thread_id:它是持久化会话的唯一标识。一个 thread_id 对应着一个持久化的 Checkpoint 存储桶。只要用户的任务没有彻底完结,或者需要在未来某个时间点进行状态恢复, thread_id 就必须保持不变。它的生命周期通常是几天、几周甚至永久。
  • run_id:它是状态图执行引擎单次触发的物理实例。用户发送一条新消息,或者后台脚本手动调用一次 graph.invoke(),就会产生一个 run_id。如果当前的 thread 包含多次历史对话,那么在这个 thread 里会产生多个不同的 run_id。
  • trace_id:它是我们在可观测性链路中用于追踪底层调用链的 ID。通常,我们在 run_id 产生的瞬间,会将其作为 trace_id 的根节点往下传递。但如果你的系统中有多个并发的后台旁路任务,一个 run_id 可能会分叉出多个独立的 trace_id。
  • request_id:这是针对单个外部服务或 Tool 调用的物理请求 ID。当 Worker 智能体向第三方 API 发起网络请求时,必须生成或透传该 ID,以便在对方系统的日志里进行双向关联排障。

我把打字的手指从大 F 红轴键盘上移开,喝了口有些凉掉的咖啡。理解了这层物理包含关系后,你在编写 LangGraph 的 node 函数时,就可以通过 context 或 config 轻松透传这三个 ID,而不需要在全局变量里轮转。

四、Supervisor 决策路径怎么记录?

记录 Supervisor 的每一次路由决策是拆解 Agent 幻觉的黄金钥匙,必须完整保留候选节点、最终选定的 Worker、路由依据以及路由的置信度。

在 Supervisor-Worker 协同架构中,Supervisor 扮演着大脑和调度员的角色。如果它做出了错误的路由选择,后面的 Worker 干得再卖力也只能是南辕北辙。因此,Supervisor 的日志绝对不能只有一个简单的 next_node。

为了能够随时审计 Supervisor 的决策合理性,我们需要在全局状态中设计一个决策记录结构体:

{
  "trace_id": "t-987654321-xyz",
  "run_id": "r-11223344-abc",
  "thread_id": "th-user-009",
  "node_name": "supervisor",
  "decision": {
    "candidate_workers": ["finance_worker", "search_worker", "email_worker"],
    "selected_worker": "finance_worker",
    "route_reason": "用户要求分析 AltStack 2026年第一季度的复利回报及基金走势,涉及财务分析工具",
    "confidence": 0.95,
    "risk_level": "low",
    "requires_human_review": false,
    "next_node": "finance_worker"
  },
  "latency_ms": 340,
  "status": "success"
}

注意看这里的 route_reason 字段,它是从 LLM 返回的结构化 JSON 中提取出来的。在写 Prompt 时,我们要强制模型输出其路由的逻辑推理过程。这样当发生路由跑偏时,你可以直接拿着这个 reason 去调优 Prompt,或者给少样本提示喂新数据,而不需要盲人摸象。

五、Worker 执行过程怎么记录?

Worker 节点的执行过程追踪不能仅看其最终输出,还必须完整回溯其接收的任务类型、执行状态的流转以及中间产生的工具调用请求。

Worker 作为具体的执行单元,在其生命周期内通常会经历多个内部步骤:接收任务 -> 构造 Prompt -> 调用 LLM -> 提取 Tool Call -> 执行 Tool -> 返回 LLM -> 产出最终文本。如果这些中间步骤全被封装在节点内部而不做上报,那么一旦 Worker 吐出一个空回复,你根本无法判定是 LLM 的问题,还是 Tool 执行失败后 Worker 自己吞掉了报错。

标准的 Worker 执行追踪日志,需要以结构化的形式记录它在这一步所做的事情。不要只让它返回最终文本,必须把它的执行路径作为 intermediate_steps 记录进 Graph State:

{
  "trace_id": "t-987654321-xyz",
  "run_id": "r-11223344-abc",
  "node_name": "finance_worker_node",
  "worker_name": "finance_worker",
  "task_type": "read_financial_report",
  "input_summary": "Ticker: AltStack, Year: 2026, Q1",
  "tool_calls": [
    {
      "request_id": "req-tool-001",
      "tool_name": "read_financial_report_tool",
      "status": "success",
      "latency_ms": 1200
    }
  ],
  "intermediate_status": "completed",
  "final_status": "success",
  "safe_summary": "提取到 AltStack 2026 Q1 的净资产回报率为 15%,各项财务指标正常"
}

通过这种形式,外部监控系统可以清晰地绘制出这个 Worker 节点耗时 1.2 秒是因为调用了 read_financial_report_tool。如果 Worker 执行失败,它的 final_status 会标记为 failed,并携带 error_context。

六、Tool Call 日志怎么设计?

工具调用的日志设计应当聚焦于网络请求与输入输出摘要,而严禁将其设计成无选择的敏感数据全量存储库。

Tool Call 是多智能体系统与外部物理世界交互的唯一管道。正因为如此,它也是数据泄露的重灾区。很多新手在写日志时,喜欢直接把工具返回的 raw_data 全量打出来。如果工具返回的是一个 10MB 的 PDF 财报内容,或者包含用户明文密码、银行卡号的敏感 JSON,这样做不仅会导致你的日志数据库瞬间被撑爆,而且在安全审计中直接就是判定为违规。

在设计工具调用日志时,必须遵守数据脱敏红线。我们应该记录输入参数的哈希值、关键控制字段以及非敏感的输出摘要:

{
  "request_id": "req-tool-001",
  "trace_id": "t-987654321-xyz",
  "run_id": "r-11223344-abc",
  "thread_id": "th-user-009",
  "tool_name": "read_financial_report_tool",
  "tool_version": "v1.2",
  "input_hash": "a8f9c2d1e03b4...",
  "input_summary": {
    "ticker": "AltStack",
    "section": "summary"
  },
  "output_summary": {
    "status_code": 200,
    "payload_bytes": 1024,
    "data_keys": ["revenue", "net_income", "roe"]
  },
  "latency_ms": 1200,
  "status": "success",
  "error_code": null,
  "retryable": false
}

不要记录完整 Token,不要记录完整数据库返回,不要记录未经脱敏的用户个人财务和隐私数据。你只需要知道“工具以什么参数被调用”、“调用了多长时间”以及“返回的数据结构键名是否完整”。这就足够用来做调试和性能分析了。

七、错误追踪与异常诊断怎么做?

对多智能体系统的错误追踪必须实现精确定位到人、节点和具体工具的下钻分析能力,并且必须配备全局唯一的错误码以便于监控警报。

在 LangGraph 中,错误往往是跨节点级联发生的。一个工具报错,可能会导致 Worker 抛出异常,进而导致 Supervisor 崩溃。如果在排障时,你只看到了最外层的报错堆栈,你会非常困惑。

我们必须在日志中实现下钻分析。当异常发生时,通过 try-except 捕获,并映射为全局唯一的错误码。以下是常见的错误码规范分类:

  • tool_timeout: 底层三方 API 响应超时。
  • schema_mismatch: 智能体生成的参数无法通过 Pydantic 校验。
  • permission_denied: 智能体越权请求敏感数据或高危动作。
  • rate_limited: 三方服务触发频次限制。
  • worker_empty_result: 智能体没有返回有效载荷。
  • invalid_state: 图的状态在恢复时遭遇数据损坏或版本不一致。
  • supervisor_route_error: 路由分配节点不存在或非法。
  • checkpoint_restore_failed: Checkpointer 读取物理存储发生异常。

当你的 SRE 系统检测到这些特定的错误码时,应该立刻拉取对应的 trace_id。通过 trace_id 检索该次运行中所有节点的执行顺序,定位到是哪个用户、在哪个线程、执行哪一次运行、处于哪个节点、调用哪一个工具时发生的问题。这才是生产级高可用系统的排障姿势。

八、耗时和性能怎么追踪?

耗时统计是定位 Agent 响应迟钝的直观指标,必须将整体运行耗时精细拆解为 LLM 推理、工具执行、网络排队以及人工审批这四个物理维度。

用户在使用 AI 系统时,最直观的体感就是慢。但多智能体系统的慢是一个复杂的物理问题。你必须在每一个关键节点记录绝对耗时。

一次总运行耗时应该被物理拆解为以下子项之和:

  • llm_latency: 大模型生成文本与 Token 的耗时,受模型参数规模和网络层级影响。
  • tool_latency: 工具执行本地逻辑或网络请求的耗时。
  • queue_wait_time: 任务在排队引擎中的等待耗时。
  • retry_count / fallback_count: 由于重试和降级引发的额外耗时。
  • human_wait_time: 等待人工审批挂起的耗时。

如果发现耗时超标,根据这四个维度进行物理定位:

  • 慢在 LLM:考虑精简历史消息,减少上下文 Token;或者采用轻量级小模型,比如将通用路由工作从 70B/100B 模型下放到 8B 模型。
  • 慢在 Tool:对三方接口加入本地 Redis 缓存;优化数据库 SQL 索引;对于超大文件扫描,将同步 Tool 改为分片并行异步轮询。
  • 慢在 Worker:智能体逻辑过于臃肿,尝试拆分成更细粒度的单职责子节点。
  • 慢在人审:优化前台的 WebSocket 即时推送或邮件短信提醒机制,缩短人工响应周期。

九、State Snapshot 怎么保存?

保存状态快照并不是全量备份所有的敏感会话数据,而是提取状态的版本、最后一个成功执行的节点以及挂起的工具调用来构建最小可恢复点。

Checkpointer 会在每次节点跳转时自动保存图的状态快照。但这并不意味着我们可以在 trace 日志里全量存储这些快照。快照中可能包含大量的历史上下文对话,如果全量塞入日志,会对存储系统带来巨大的负担。

规范的 Observability 设计是只记录状态的元数据摘要,而将真正的物理快照留给专业的存储层(如 PostgreSQL 或 Redis checkpointer):

{
  "trace_id": "t-987654321-xyz",
  "run_id": "r-11223344-abc",
  "state_snapshot_summary": {
    "state_version": 14,
    "node_name": "finance_worker_node",
    "last_success_node": "supervisor",
    "current_worker": "finance_worker",
    "pending_tool_call": {
      "tool_name": "read_financial_report_tool",
      "request_id": "req-tool-001"
    },
    "error_state": null,
    "approval_state": "not_applicable",
    "retry_count": 1
  }
}

有了这一层摘要,运维人员在看日志时,就能一目了然当前状态图卡在第 14 个版本,正在等待 read_financial_report_tool 返回结果。如果程序崩溃,直接拿这个 state_version 去底层数据库读取物理快照,就可以完美复现崩溃前的状态。

十、在 LangGraph 中自建 Observability 追踪系统(Python 实战代码)

不要依赖模型的自觉性来打印状态,而要使用强类型的装饰器在状态机的边缘拦截耗时与错误。

下面是一个完整的、可运行的 Python 代码示例,展示了如何在不引入任何三方云端服务的前提下,在 LangGraph 中构建一套极简但功能完备的结构化可观测性追踪系统。我们定义了一个 trace_node 装饰器,自动捕获每个节点的输入输出摘要、耗时,并且捕获潜在的工具执行异常并生成错误码:

import time
import uuid
import json
from typing import Dict, Any, TypedDict, List
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

# 1. 定义带有可观测性日志列表的全局状态
class AgentState(TypedDict):
    task: str
    messages: List[Dict[str, str]]
    trace_id: str
    run_id: str
    thread_id: str
    # 用于记录执行轨迹的结构化日志容器
    observability_logs: List[Dict[str, Any]]
    next_worker: str
    result: str
    status: str

# 2. 节点拦截器:实现自动化耗时度量与安全摘要脱敏
def trace_node(node_name: str):
    def decorator(func):
        def wrapper(state: AgentState, config: Dict[str, Any] = None) -> Dict[str, Any]:
            start_time = time.time()
            trace_id = state.get("trace_id", "t-default-id")
            run_id = state.get("run_id", "r-default-id")

            # 脱敏构建输入摘要,只提取长度和核心字段,严禁泄露完整对话
            input_summary = {
                "task_length": len(state.get("task", "")),
                "message_count": len(state.get("messages", []))
            }

            print(f"[Log Audit] Node '{node_name}' execution started. Trace ID: {trace_id}")

            try:
                # 执行底层的节点业务函数
                if config:
                    output = func(state, config)
                else:
                    output = func(state)

                latency_ms = int((time.time() - start_time) * 1000)

                # 构建成功的观测日志
                log_entry = {
                    "node_name": node_name,
                    "trace_id": trace_id,
                    "run_id": run_id,
                    "input_summary": input_summary,
                    "output_summary": {k: v for k, v in output.items() if k not in ["messages"]},
                    "latency_ms": latency_ms,
                    "status": "success",
                    "error_code": None,
                    "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
                }

                # 将该条日志安全追加至 Graph 状态中
                existing_logs = state.get("observability_logs", []) or []
                output["observability_logs"] = existing_logs + [log_entry]
                return output

            except Exception as e:
                latency_ms = int((time.time() - start_time) * 1000)
                error_code = "NODE_EXECUTION_FAILED"

                if isinstance(e, TimeoutError):
                    error_code = "TOOL_TIMEOUT"
                elif isinstance(e, PermissionError):
                    error_code = "PERMISSION_DENIED"

                print(f"[Log Alert] Node '{node_name}' crashed. Error: {str(e)}")

                log_entry = {
                    "node_name": node_name,
                    "trace_id": trace_id,
                    "run_id": run_id,
                    "input_summary": input_summary,
                    "output_summary": {"raw_exception": str(e)},
                    "latency_ms": latency_ms,
                    "status": "failed",
                    "error_code": error_code,
                    "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
                }

                existing_logs = state.get("observability_logs", []) or []
                return {
                    "status": "failed",
                    "observability_logs": existing_logs + [log_entry]
                }
        return wrapper
    return decorator

# 模拟底层原子工具
def mock_financial_api(ticker: str) -> Dict[str, Any]:
    # 模拟网络调用延迟
    time.sleep(0.15)
    if ticker == "BLOCKED_STOCK":
        raise PermissionError("Access forbidden: compliance restriction.")
    return {
        "status_code": 200,
        "ticker": ticker,
        "revenue_usd": 125000000,
        "net_income_usd": 32000000
    }

# 3. 编写 Supervisor 路由决策节点
@trace_node("supervisor")
def supervisor_node(state: AgentState) -> Dict[str, Any]:
    task = state.get("task", "")

    # 模拟简单的路由分发决策
    if "financial" in task or "stock" in task:
        next_worker = "finance_worker"
        reason = "Task queries stock data. Direct to finance_worker."
    else:
        next_worker = "general_worker"
        reason = "General query. Direct to general_worker."

    return {
        "next_worker": next_worker,
        "status": "pending",
        "result": "routing_decision_made",
        "decision_log": {
            "selected_worker": next_worker,
            "route_reason": reason,
            "confidence_score": 0.96
        }
    }

# 4. 编写 Worker 执行节点
@trace_node("finance_worker")
def finance_worker_node(state: AgentState) -> Dict[str, Any]:
    task = state.get("task", "")
    # 从任务文本中提取具体的目标资产标识
    ticker = "BLOCKED_STOCK" if "blocked" in task else "AltStack"

    # 执行原子工具调用
    tool_response = mock_financial_api(ticker)

    return {
        "status": "success",
        "result": f"Analysis finished for {ticker}. Net Income: ${tool_response['net_income_usd']} USD."
    }

# 5. 构建 LangGraph 状态图
workflow = StateGraph(AgentState)

# 注册节点
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("finance_worker", finance_worker_node)

# 设置入口
workflow.set_entry_point("supervisor")

# 定义路由逻辑边
def router_edge(state: AgentState):
    if state.get("status") == "failed":
        return END
    next_node = state.get("next_worker")
    if next_node == "finance_worker":
        return "finance_worker"
    return END

workflow.add_conditional_edges("supervisor", router_edge, {
    "finance_worker": "finance_worker",
    END: END
})
workflow.add_edge("finance_worker", END)

# 使用内存 checkpointer 实例化图
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)

# 6. 测试成功与失败两条不同的物理执行链路
if __name__ == "__main__":
    # 物理配置参数
    config = {"configurable": {"thread_id": "thread-user-009"}}

    # 场景一:正常查询链路
    initial_state_success = {
        "task": "Please analyze financial reports for AltStack.",
        "messages": [{"role": "user", "content": "Analyze AltStack"}],
        "trace_id": "t-success-001",
        "run_id": "r-success-001",
        "thread_id": "thread-user-009",
        "observability_logs": [],
        "next_worker": "",
        "result": "",
        "status": "init"
    }

    print("=== RUNNING SUCCESS PATH ===")
    final_state_success = app.invoke(initial_state_success, config)
    print("\n[Audit Trace Result - Success Path]")
    print(json.dumps(final_state_success["observability_logs"], indent=2, ensure_ascii=False))

    # 场景二:越权受限查询链路
    initial_state_fail = {
        "task": "Please analyze financial reports for blocked asset.",
        "messages": [{"role": "user", "content": "Analyze blocked"}],
        "trace_id": "t-fail-002",
        "run_id": "r-fail-002",
        "thread_id": "thread-user-009",
        "observability_logs": [],
        "next_worker": "",
        "result": "",
        "status": "init"
    }

    print("\n=== RUNNING FAIL PATH ===")
    final_state_fail = app.invoke(initial_state_fail, config)
    print("\n[Audit Trace Result - Fail Path]")
    print(json.dumps(final_state_fail["observability_logs"], indent=2, ensure_ascii=False))

这段代码揭示了如何通过 Python 闭包将图的 context 统一收拢在 observability_logs 这一状态槽中。无论状态图执行多少次条件跳转,哪怕因为三方接口崩溃而在中间节点戛然而止,先前所产生的全部 Trace 都会作为 Graph State 的一部分被 Checkpointer 完美物理固化。当运维人员发起状态恢复时,原有的历史轨迹同样能被无缝读出,杜绝了审计线索的断裂。

十一、三种主流观测方案对比

选择何种观测架构取决于团队当前的物理预算与业务规模,自建结构化日志、官方 LangSmith 或是 OpenTelemetry 标准各有其物理边界与适用场景。

在决定如何实施 Observability 时,我们不需要盲目跟风。我们要根据当前的物理预算、团队规模和安全合规限制,选择最适合的观测架构:

评估维度自建 JSON 日志方案官方 LangSmith / LangFuse开放标准 OpenTelemetry
接入成本极低。只需在 Python 节点中嵌入 logger 和装饰器,无需额外依赖。中等。需要配置 API 凭证,导入官方专属的 SDK 进行事件监听。较高。需要配置 OpenTelemetry Collector,理解 spans 与 trace 上下文传递。
隐私与合规性100% 本地完全受控。日志留在内网,极易通过本地过滤器进行物理脱敏。存在数据出境和安全审计风险。LangSmith 默认将 prompt 和返回发至云端。完全私有化部署。通常数据保留在企业自建的集群中,适合金融级系统。
分析体验依赖 Kibana 或 ClickHouse 仪表盘。需要运维人员自行编写查询 SQL。极其丝滑。官方自带开箱即用的图形化树状运行链,支持人工打标和评估。依赖 Grafana、Jaeger 等微服务观测工具。展现形式偏向经典 APM 链路。
适用场景早期原型系统、私有化 NAS 部署、对 Token 脱敏合规要求极严的本地服务。团队处于快速研发迭代期,且允许将非敏感提示词与调用链托管在外部云端的项目。已经具备完善的分布式微服务观测体系,需要将 Agent 作为标准服务接入的企业。

我自己的这台飞牛 NAS 在家跑,由于涉及个人的投资和日记数据,我绝对不会直接接入 LangSmith 这种云端服务。对于我个人而言,自建一套简单的 JSON 日志,配合 Elasticsearch/Kibana 检索,是性价比最高且最安全的物理闭环方案。

十二、生产环境常见坑与真实报错

直接将未脱敏的用户数据灌入日志,或者忽略对成功工具调用的审计,是生产环境中最为普遍的架构灾难。

我们在将多智能体系统推向生产时,经常会遇到由于观测设计不当而引发的次生灾害。以下是三个最常踩到的物理大坑:

1. 忽略成功工具调用的审计

很多开发团队只给 exception 模块写日志,以为只要工具不报错,系统就是完美的。但在实际中,很多模型幻觉会导致其在短时间内连续成功调用几十次无意义的查询工具,或者被恶意用户利用来刷接口。不记录成功的工具调用,你根本无法感知这些悄无声息的 Token 与带宽流失。

2. 状态递归深度溢出 (GraphRecursionError)

当 Supervisor 与 Worker 之间的逻辑路由 Prompt 存在边界模糊时,经常会导致它们互相推诿,在两个节点间死循环。如果你的配置里没有限制递归上限,它会一直跑,直到把你的 Token 额度扣光。

在生产环境中,最经典的死循环报错日志如下:

langgraph.errors.GraphRecursionError: Recursion limit of 25 reached without hitting a terminal node.
[ERROR] 2026-06-14T15:30:12Z - Thread 'th-user-009' reached max recursion limit at node 'supervisor' with trace_id 't-987654321-xyz'.

如果你的日志里没有记录 trace_id 和每次调用的 node_name,当你看到这个报错时,你根本不知道是在哪两个节点之间打架。

3. 三方接口限流报错 (RateLimitError)

在调用大模型或高频外部 API 时,由于并发激增,经常会遇到以下报错:

openai.RateLimitError: Error code: 429 - {'error': {'message': 'You exceeded your current quota, please check your plan and billing details.'}}
[WARNING] 2026-06-14T15:31:00Z - Worker 'finance_worker' received 429 from OpenAI. Initiating Backoff.

这种错误发生时,我们必须在日志里清晰记录 attempt_count 和下一次重试的时间间隔。如果在 Trace 链路中看不到重试次数的累加,运维团队会误以为程序已经陷入卡死。

十三、上线检查清单

在将 LangGraph 系统推向生产环境 the 最后关头,必须按照可观测性指标逐项核对日志的字段与脱敏逻辑。

  • 是否每次运行都生成并绑定了全局唯一的 trace_id?
  • 系统是否能根据 thread_id 检索到该会话下所有历史运行的 run_id?
  • 日志格式是否采用结构化 JSON,而非零散的普通文本?
  • Supervisor 节点的路由输出中是否包含了 selected_worker 和 route_reason?
  • Worker 智能体是否将其中间的每一轮工具调用明细都写入了 logs 摘要?
  • 每次工具调用是否都统计并记录了精确的 latency_ms 耗时?
  • 日志脱敏过滤器是否配置完毕,严禁向外暴露出明文 API Key、密码及完整 SQL 语句?
  • 当触发 GraphRecursionError 时,日志中是否能清晰呈现出死循环的节点环路?
  • 是否区分了网络抖动重试和参数校验失败,并在日志中予以打标?
  • 耗时分析看板是否搭建完毕,支持按 LLM 耗时与工具耗时进行分流下钻?

十四、关于 LangGraph 可观测性的常见问题解答 (FAQ)

解答在生产环境中落地 LangGraph 可观测性设计时最受开发者关注的核心技术疑问。

LangGraph Observability 和普通日志有什么区别?

普通日志往往是杂乱无章的,只记录了请求的输入和偶发的堆栈崩溃。而 LangGraph Observability 是以状态图的节点跳转为线索,将 trace_id、run_id 和 thread_id 作为脉络,把 Supervisor 的决策逻辑、Worker 的中间流转以及 Tool 的性能参数穿联起来,还原出一次完整的 Agent 决策树。

trace_id 和 thread_id 有什么区别?

trace_id 是一次单次图运行的生命周期标识,用来追踪这一次调用里所有节点和工具的流转;thread_id 是图的状态持久化会话标识,代表一个可以跨越多次对话、支持状态恢复的长生命周期任务链。一个 thread_id 可以包含多次运行,因此会关联多个不同的 trace_id。

Tool Call 的完整输入输出要不要保存?

生产环境下严禁保存完整的敏感输入输出。只建议保存经过脱敏的 input_summary、output_summary 结构体、数据包字节数以及数据的哈希校验码,确保日志系统不会成为潜在的隐私数据泄露源头。

LangGraph 失败后如何快速定位问题?

首先通过用户报障的 thread_id 检索出最后一次出错的 trace_id;接着在链路分析中找到 status 为 failed 的 node_name;查看该节点的 error_code 与具体报错堆栈,确定是模型生成参数错误还是外部接口超时,从而精准排障。

Observability 是否必须接入 LangSmith?

并非必须。对于大部分处于早期迭代阶段、或者有严格私有化合规要求的团队,自建一套基于结构化 JSON 日志的收集清洗方案是性价比最高、最安全的底线设计。等系统规模扩大、微服务链路复杂后,再考虑接入商业化的 LangSmith 或开放的 OpenTelemetry 标准。

系列导航

LangGraph 生产级 Agent 编排实战系列:

十五、继续阅读

若要构建真正稳定且具备金融级稳定性的多智能体应用,还需要在状态隔离、失败重试与手控审批等领域进行配套的设计。

喜欢这篇文章?
加入小白实验室的周刊

每周我都会分享最新的 AI 实战、产品构建心得以及程序员视角的投资笔记。不发废话,只发干货。已有 5000+ 开发者在此共同进化。

Comments