Skip to content

架构设计

系统概览

多 Agent 研究助手采用 状态机 + 消息传递 的架构模式,使用 LangGraph 作为编排框架。

┌─────────────────────────────────────────────────────────┐
│                      协调器 (Orchestrator)               │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐    │
│  │ 搜索    │→│ 阅读    │→│ 分析    │→│ 总结    │    │
│  │ Agent   │  │ Agent   │  │ Agent   │  │ Agent   │    │
│  └─────────┘  └─────────┘  └─────────┘  └─────────┘    │
└─────────────────────────────────────────────────────────┘

核心设计原则

1. 单一职责

每个 Agent 只做一件事:

Agent职责输入输出
搜索 Agent搜索相关信息研究主题URL 列表
阅读 Agent提取网页内容URL 列表结构化摘要
分析 Agent分析数据趋势摘要列表分析报告
总结 Agent生成最终报告所有信息研究报告

2. 状态共享

所有 Agent 共享一个状态对象:

python
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph

class ResearchState(TypedDict):
    # 输入
    topic: str                          # 研究主题
    research_type: str                  # 研究类型:market/academic/tech

    # 中间状态
    search_queries: List[str]           # 搜索查询
    urls: List[str]                     # 搜索到的 URL
    summaries: List[dict]               # 网页摘要
    analysis: Optional[dict]            # 分析结果

    # 输出
    report: Optional[str]               # 最终报告

    # 元数据
    current_step: str                   # 当前步骤
    errors: List[str]                   # 错误信息
    progress: int                       # 进度百分比

3. 可观测性

每个步骤都记录日志,方便调试:

python
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("research_agent")

def log_step(step_name: str, state: ResearchState):
    logger.info(f"[{step_name}] Progress: {state['progress']}%")
    logger.info(f"[{step_name}] Current data: {len(state.get('urls', []))} URLs")

LangGraph 状态图

节点定义

python
from langgraph.graph import StateGraph, END

# 创建状态图
workflow = StateGraph(ResearchState)

# 添加节点
workflow.add_node("search", search_node)
workflow.add_node("read", read_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("summarize", summarize_node)

# 设置入口
workflow.set_entry_point("search")

# 添加边
workflow.add_edge("search", "read")
workflow.add_edge("read", "analyze")
workflow.add_edge("analyze", "summarize")
workflow.add_edge("summarize", END)

# 编译
app = workflow.compile()

条件路由

根据研究类型选择不同路径:

python
def route_by_type(state: ResearchState) -> str:
    """根据研究类型决定下一步"""
    research_type = state.get("research_type", "general")

    if research_type == "quick":
        # 快速研究:跳过分析
        return "summarize"
    else:
        # 完整研究:包含分析
        return "analyze"

# 添加条件边
workflow.add_conditional_edges(
    "read",
    route_by_type,
    {
        "analyze": "analyze",
        "summarize": "summarize"
    }
)

数据流详解

阶段 1:搜索

输入: { topic: "AI 编程工具市场" }

搜索 Agent 生成查询:
  - "AI 编程工具 2024"
  - "GitHub Copilot vs Cursor"
  - "AI 代码助手市场份额"

输出: { urls: ["url1", "url2", ...] }

阶段 2:阅读

输入: { urls: ["url1", "url2", ...] }

阅读 Agent 并行处理每个 URL:
  - 提取标题
  - 提取正文
  - 提取关键数据

输出: { summaries: [{title, content, data}, ...] }

阶段 3:分析

输入: { summaries: [...] }

分析 Agent:
  - 识别共同主题
  - 对比不同观点
  - 提取数据趋势

输出: { analysis: {themes, comparisons, trends} }

阶段 4:总结

输入: { summaries, analysis }

总结 Agent:
  - 生成执行摘要
  - 整理主要发现
  - 给出建议

输出: { report: "完整研究报告" }

错误处理

重试机制

python
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
def fetch_url(url: str) -> str:
    """带重试的 URL 获取"""
    response = requests.get(url, timeout=10)
    response.raise_for_status()
    return response.text

降级策略

python
def read_node(state: ResearchState) -> ResearchState:
    """阅读节点,带降级处理"""
    summaries = []
    errors = []

    for url in state["urls"]:
        try:
            content = fetch_url(url)
            summary = extract_summary(content)
            summaries.append(summary)
        except Exception as e:
            errors.append(f"Failed to read {url}: {str(e)}")
            # 继续处理其他 URL,不中断流程

    return {
        **state,
        "summaries": summaries,
        "errors": state.get("errors", []) + errors
    }

项目结构

research-agent/
├── .env                    # 环境变量
├── .env.example           # 环境变量示例
├── requirements.txt       # 依赖
├── config.py              # 配置
├── main.py                # 入口
├── agents/                # Agent 实现
│   ├── __init__.py
│   ├── search.py          # 搜索 Agent
│   ├── reader.py          # 阅读 Agent
│   ├── analyst.py         # 分析 Agent
│   └── summarizer.py      # 总结 Agent
├── graph/                 # LangGraph 编排
│   ├── __init__.py
│   ├── state.py           # 状态定义
│   └── workflow.py        # 工作流定义
├── tools/                 # 工具函数
│   ├── __init__.py
│   ├── search_tools.py    # 搜索工具
│   └── web_tools.py       # 网页工具
├── ui/                    # 前端界面
│   └── app.py             # Streamlit 应用
└── tests/                 # 测试
    ├── test_agents.py
    └── test_workflow.py

验收标准

完成架构设计后,你应该有:

  • [ ] 清晰的状态定义(ResearchState)
  • [ ] LangGraph 工作流草图
  • [ ] 每个 Agent 的输入输出定义
  • [ ] 错误处理策略
  • [ ] 项目目录结构

下一步

架构设计完成,开始实现单个 Agent。

继续:单 Agent 开发 →


← 返回需求调研 | 返回项目二

最近更新

基于 Apache 2.0 许可发布