XBSTACK Tech Image - XBSTACK

AI 财报助手任务队列实战:PDF 解析、LLM 调用和进度回传怎么设计?

Release Date
2026-06-24
Reading Time
14分钟
Impact Factor
4,080
ai-workflow
task-queue
pdf-parsing
system-design
Xiaobai's Note / 实验室笔记

这篇文章记录了我在贵阳实验室的实战过程。我坚信,在技术下行的时代,程序员唯一的护城河就是通过 AI 建立属于自己的数字资产。

本文解决的问题

  • 为什么大模型财报助手坚决不能使用传统的同步 HTTP 接口进行 PDF 解析?
  • 如何科学地将一份财报分析任务拆分为多阶段的状态机,避免处理流程成为黑盒?
  • 生产级的 Job 任务数据表该如何设计?字段该如何配置?
  • 如何优雅地在前端与后端之间实现处理进度的低延迟回传?
  • 面对 PDF 损坏、LLM 超时、限流(429)和 Schema 校验失败,该如何设计差异化的指数退避重试策略?
  • 如何防止用户重复上传相同财报文件造成的 API 算力浪费?
  • 哪些低置信度的异常结果必须被物理拦截并路由至人工复核队列?

适合谁读

  • 后端架构师:正在设计高吞吐、长耗时的 AI Agent 工作流后端处理系统。
  • AI 系统研发人员:急需解决 RAG 系统或长文本抽取管道中的超时与队列积压故障。
  • 全栈工程师:计划开发面向投研、审计的 serious AI 工具,寻求架构闭环方案。
  • 独立开发者:寻找降低大模型 API 费用、提升 RAG 响应体感的系统级缓存与去重设计。

一、 为什么 AI 财报助手不能同步处理 PDF

在贵阳花溪的十里河滩骑完行回来,我坐在数字万物实验室里,刚喝了一口冰可乐,就开始着手重构这个 AI 财报助手的后端任务管道。在很多同学写的 Demo 里,都是直接建个同步接口,前端把 PDF 上传,后端拿去调大模型,最后返回结果。这种直觉的做法在生产环境是灾难。

做 AI 财报助手,如果接口保持同步状态,客户端几乎 100% 会遇到 504 Gateway Timeout 错误。

因为一份标准的财报 PDF(例如 NVIDIA 的 10-K 年报)通常有上百页,包含密密麻麻的财务表格和备注说明。后端系统在接收到文件后,需要先调用 PDF 解析器进行物理定位,把里面的复杂多栏表格提取出来;接着还要进行文档分块(Chunking),然后将这些上下文分批次喂给大模型;大模型需要生成长达数页的结构化 JSON。在这期间,网络抖动、模型 API 限流(Rate Limit)、以及复杂的 JSON Schema 强校验,都会让整个处理过程耗时几秒、几十秒,甚至几分钟。

更让人抓狂的是,如果采用同步接口,用户在那看着网页转圈圈,心里完全没底,不知道后台是卡在 PDF 解析了,还是在调大模型,还是网络断开已经崩溃了。因此,我们必须把同步调用彻底重构为「异步任务队列模式」,将上传与分析物理剥离,给用户提供秒级的状态感知。

二、 一个财报分析任务应该拆成哪些阶段

异步设计的核心是建立精细的状态机。

我们不能简单地把任务状态设为「处理中」和「已完成」,而是必须把财报处理流水线拆解为详细的状态节点,让用户和监控系统随时掌握任务所处的物理位置。在我的工程实践中,我将一个财报分析 Job 划分为了十个状态,并在数据库中定义了如下的枚举类型:

CREATE TYPE job_status AS ENUM (
    'uploaded',                  -- 文件上传成功,记录已写入数据库
    'queued',                    -- 任务已进入 Redis/RabbitMQ 消息队列排队
    'parsing_pdf',               -- Worker 节点已领任务,正在提取 PDF 纯文本
    'extracting_tables',         -- 正在进行物理表格定位与结构化提取
    'chunking_document',         -- 正在执行文本语义分块与向量检索准备
    'running_llm',               -- 正在调用大模型(LLM)提取指标与风险因素
    'validating_schema',         -- 正在对模型输出的 JSON 进行 Schema 格式强校验
    'generating_review_questions',-- 正在根据异常提取结果,生成人工复核问题
    'needs_human_review',        -- 数据置信度低,已物理熔断,等待分析师人工复核
    'completed',                 -- 任务全部完成,结果已持久化
    'failed'                     -- 任务失败,已记录异常日志
);

通过这套精细的状态划分,前端可以渲染出包含「正在解析 PDF」、「正在提取财务表格」、「正在调用 AI 深度分析」等具体步骤的进度条,用户的心理安全感会大幅提升。

三、 Job 数据表设计

一个高可用的任务队列系统,必须依赖一张设计严密的数据库底表来进行任务持久化和状态追踪。这不仅是为了在系统宕机时能够恢复任务,更是为了支持后续的性能审计与费用结算。

下面是我在 PostgreSQL 中设计的 financial_analysis_jobs 任务表 DDL:

CREATE TABLE financial_analysis_jobs (
    job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id VARCHAR(100) NOT NULL,                                          -- 提交任务的用户 ID
    file_id UUID NOT NULL,                                                  -- 关联的物理文件 ID
    status job_status NOT NULL DEFAULT 'uploaded',                           -- 任务当前状态
    progress INT NOT NULL DEFAULT 0,                                        -- 0 到 100 的物理百分比进度
    current_step VARCHAR(50) NOT NULL DEFAULT 'upload_success',             -- 当前执行的具体原子步骤
    error_code VARCHAR(50),                                                 -- 失败时的标准错误码
    retry_count INT NOT NULL DEFAULT 0,                                     -- 当前已重试次数
    model_version VARCHAR(50) NOT NULL,                                     -- 运行该任务的大模型版本
    prompt_version VARCHAR(50) NOT NULL,                                    -- 运行该任务的 Prompt 版本
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    completed_at TIMESTAMP WITH TIME ZONE
);

-- 为常用查询字段建立索引,保证大并发下的查询速度
CREATE INDEX idx_jobs_user_id ON financial_analysis_jobs(user_id);
CREATE INDEX idx_jobs_status ON financial_analysis_jobs(status);
CREATE INDEX idx_jobs_file_id ON financial_analysis_jobs(file_id);

四、 为什么要把 PDF 解析和 LLM 调用拆开

在系统架构层面,必须把「PDF 解析器」与「大模型调用器」设计为独立的物理微服务或不同的任务队列。

解耦的根本原因在于,PDF 解析与 LLM 调用是两类截然不同的计算任务,且它们的故障率和错误特征完全不同:

  • PDF 解析是 CPU 密集型任务,主要消耗服务器本地计算资源,出错通常是因为 PDF 损坏、格式不兼容或者混入了大面积的扫描图片;
  • LLM 调用是网络与 I/O 密集型任务,主要消耗外部 API 额度(Tokens),出错通常是因为大模型提供商限流、网络连接超时、或者是输出格式不符合 JSON Schema 约束。

如果把它们捆绑在一个同步任务脚本里,一旦在大模型步骤失败,我们就不得不把前面耗时极长的 PDF 物理解析步骤也重跑一遍,这会造成服务器计算资源和时间的极大浪费。通过物理拆开,我们可以将解析出来的纯文本和表格数据暂存到临时介质(如 Object Storage 或 Redis),在大模型调用失败时只重新读取缓存的文本去请求 LLM 即可,从而实现秒级重试。

小白实验室自研 / TOOL CONVERSION

如果你想直接测试 AI 财报助手,可一键跳转试用

支持 PDF 批量上传、管理层 Guidance 情绪审计、核心 KPI 指标抽取,免费免登录。

五、 进度回传方案对比:Polling vs SSE vs WebSocket

当任务进入异步处理后,前端需要实时获取处理状态。在实际工程落地中,我们有三种常见的通信方案。以下是它们在开发成本、确定性和实时体感上的对比。

评估维度轮询 (Short Polling)服务器发送事件 (SSE)双向通信 (WebSocket)
开发成本极低(前端写个简单的 setInterval 即可)中等(后端需要支持流式输出响应)高(需要管理长连接生命周期和心跳检测)
网络开销较高(频繁发起 HTTP 三次握手与头部传输)极低(建立单向长连接,按需推送数据)极低(建立持久双向信道,低开销)
服务器负载中等(在并发访问量大时容易压垮数据库)低(连接数多时需要优化文件描述符上限)较高(维持长连接需要消耗内存,需分布式网关)
单页路由适配简单(跳转时直接销毁 timer 即可)简单(路由切换时关闭 EventSource)复杂(跳转时需要处理连接断开与自动重连)
首选建议适合第一版快速上线,对并发要求不高的场景适合单向进度推送,是最推荐的平替方案适合需要复杂双向交互的交互式 AI 聊天台

在 AI 财报助手的架构设计中,第一版我建议采用「轮询(Polling)」或者「服务器发送事件(SSE)」。因为财报分析的进度是完全单向的(从 0% 到 100%),SSE 能够提供非常通畅的进度条跳动体验,且不需要像 WebSocket 那样维护沉重的双向连接管理和复杂的网关集群。

六、 失败重试策略设计

异步任务如果不设计容错,在生产环境会死得非常难看。我们必须根据不同的错误类型,设计针对性的指数退避(Exponential Backoff)重试策略,而不是盲目地死循环重试。

以下是我为财报分析 Worker 设计的 Python 容错重试模块:

import time
import random
from typing import Callable, Any

def execute_with_exponential_backoff(
    task_fn: Callable[[], Any],
    max_retries: int = 3,
    initial_delay: float = 1.0,
    backoff_factor: float = 2.0
) -> Any:
    """
    针对 LLM 限流与超时等暂时性错误执行指数退避重试,并对硬性物理故障进行快速熔断。
    """
    delay = initial_delay
    for attempt in range(max_retries):
        try:
            return task_fn()
        except Exception as e:
            err_msg = str(e).lower()
            current_attempt = attempt + 1

            # 1. 物理不可逆故障:直接熔断,坚决不进行无谓的重试
            if "pdfsyntaxerror" in err_msg or "corrupted" in err_msg:
                print(f"[-] 物理级故障检测: {e},触发快速熔断,任务直接宣告失败。")
                raise ValueError("ERR_PDF_CORRUPTED") from e

            # 2. JSON Schema 强校验失败:可能是 Prompt 缺陷,重试最大 1 次即可
            if "validationerror" in err_msg or "json_schema" in err_msg:
                if current_attempt >= 2:
                    print(f"[-] Schema 校验连续失败,已达重试上限,路由至人工复核。")
                    raise ValueError("ERR_SCHEMA_VALIDATION_FAILED") from e
                # 尝试稍微等待后重试,给模型一次纠偏机会
                time.sleep(1.0)
                continue

            # 3. 暂时性网络故障或大模型限流(429 / Timeout):执行指数退避 + 随机抖动 (Jitter)
            if "429" in err_msg or "ratelimit" in err_msg or "timeout" in err_msg or "deadline" in err_msg:
                if current_attempt == max_retries:
                    print(f"[-] 网络/限流重试已达最大次数 {max_retries},任务失败。")
                    raise e

                # 加上随机抖动,防止多个并发 Worker 在同一时间重新发起请求,压垮大模型网关
                jitter = random.uniform(0.1, 1.0)
                sleep_time = (delay * pow(backoff_factor, attempt)) + jitter
                print(f"[!] 遭遇暂时性错误: {e}。正在进行第 {current_attempt} 次退避等待,等待时长: {sleep_time:.2f}秒...")
                time.sleep(sleep_time)
            else:
                # 4. 未知业务异常:直接抛出,不进行重试
                raise e

    raise TimeoutError("ERR_MAX_RETRIES_EXCEEDED")

七、 结果缓存和重复上传怎么处理

在严肃的生产环境中,大文件财报的 PDF 解析和 LLM 调用的费用是非常高昂的。如果用户重复上传同一个 PDF,或者多个用户在同一天上传同一份 NVIDIA 2024 年报,系统如果每次都重新解析,那就是在给大模型厂商送钱。

我们需要在架构上实现「物理去重与结果缓存」。

当用户上传 PDF 时,我们首先在前端或 API 网关层读取文件的二进制流,计算出其物理 SHA-256 哈希值。然后,我们去数据库的 financial_reports_cache 表中检索:

CREATE TABLE financial_reports_cache (
    file_hash CHAR(64) PRIMARY KEY,                                         -- 文件物理 SHA-256 哈希值
    file_name VARCHAR(255) NOT NULL,
    parsed_text_url VARCHAR(512),                                           -- 暂存在对象存储中的纯文本路径
    extracted_kpis JSONB,                                                   -- 缓存的提取 KPI 结果
    model_version VARCHAR(50) NOT NULL,
    prompt_version VARCHAR(50) NOT NULL,
    cached_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

去重处理逻辑如下:

  1. 计算上传文件的 SHA-256;
  2. 如果哈希值匹配,并且模型版本与 Prompt 版本与当前系统配置一致,则跳过排队和分析步骤,直接克隆该缓存结果生成一条新的 Job 记录,状态设为 completed
  3. 前端可以在 1 秒内弹窗提示「检测到已有解析结果,正在为您加载…」,用户体验会从「等待几分钟」瞬间飙升至「秒级加载」,同时也为我们的系统省下了大量的 token 费用。

八、 哪些任务要进入人工复核队列

在投研和审计等严肃的财务场景下,我们不能完全寄希望于大模型输出 100% 的准确度。如果大模型输出的结果触发了我们设定的异常红线,系统必须执行物理熔断,把任务状态设为 needs_human_review,并路由到专门的人工复核后台。

在我的队列逻辑中,以下六类情况会被评测引擎判定为低置信度并拦截,流转到人工队列:

  • 缺失证据页码:返回的财务 KPI 指标中,source_page 字段为 null 或小于等于 0;
  • 证据校验失败:模型返回了 evidence 原文,但我们去 PDF 对应的 source_page 中搜索该文本时,返回了 contains 校验失败;
  • 财务指标极端异常:同比数据暴涨暴跌超过 500%,或者出现「收入为负值」等违背基本财务常识的数据;
  • 大模型输出置信度评分低于 0.85;
  • JSON Schema 强校验失败,且在退避重试后依然无法生成合规格式;
  • 提取出的风险因素(Risk Factors)中,出现了系统黑名单里的敏感词或存在严重的语义冲突。

关于人工复核的具体 UI 交互和队列拦截机制,可以参考我之前在实验室里总结的:👉 人工复核队列设计 —— 了解如何在 AI 与人类分析师之间建立物理级的双盲校验安全链。

九、 接入 AI Financial Report Analyzer

这套异步任务队列方案是 AI 财报助手的底座,我们在 XBSTACK 实验室中正是基于这套系统构建了完整的技术闭环:

下一步 / NEXT READING

准备好分析你的第一份财报了吗?

你可以立刻上传一份 PDF 财报(如 NVIDIA 10-K),体验本工具自动生成的核心 KPI 报表、风险因素和复核清单。

常见问题解答

为什么不推荐用 Redis + Celery 的默认队列直接处理 PDF 解析?

因为 Celery 默认的 prefetch 机制在面对这种耗时长、资源消耗极度不均的任务时,容易导致 Worker 节点发生严重的饥饿或内存崩溃。PDF 解析是 CPU 密集型的,大模型调用是 I/O 密集型的。如果这两种任务混在一个 Celery 队列里,几个大 PDF 解析任务就会彻底占满所有 Worker 并引起内存泄漏,导致轻量的大模型调用请求在后面排队饿死。强烈建议使用独立命名的 RabbitMQ 队列进行物理隔离。

处理进度(progress 字段)在后台应该如何计算并更新?

我们不需要搞非常精确的动态时间估算,那在物理上是不准确的。推荐采用「里程碑式进度更新」。例如:文件上传成功为 10%,PDF 解析完成为 30%,表格抽取结束为 50%,LLM 推理中为 80%,Schema 校验通过为 95%,完全存入数据库后归档为 100%。这种分阶段的离散步长,实现起来最稳定,也最符合用户的直观体感。

如何防止在高并发下,长连接(SSE / WebSocket)把后端的 Express 或 Node 服务的文件描述符撑爆?

在使用 SSE 回传进度时,如果有很多用户同时在线挂起网页,每个用户都会占用一个持久 the TCP 连接。在 Node.js 后端,我们需要对操作系统执行 ulimit -n 65535 提高最大文件描述符限制。同时,可以配置连接超时时间,例如当任务处于 queued 状态超过 10 分钟没有状态更新,或者用户离开页面时,前端主动调用 eventSource.close() 释放服务端物理连接,防止连接被无谓占满。

继续阅读

喜欢这篇文章?
加入小白实验室的周刊

每周我都会分享最新的 AI 实战、产品构建心得以及程序员视角的投资笔记。不发废话,只发干货。已有 5000+ 开发者在此共同进化。

Comments