架构设计
系统概览
多 Agent 研究助手采用 状态机 + 消息传递 的架构模式,使用 LangGraph 作为编排框架。
┌─────────────────────────────────────────────────────────┐
│ 协调器 (Orchestrator) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 搜索 │→│ 阅读 │→│ 分析 │→│ 总结 │ │
│ │ Agent │ │ Agent │ │ Agent │ │ Agent │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────┘核心设计原则
1. 单一职责
每个 Agent 只做一件事:
| Agent | 职责 | 输入 | 输出 |
|---|---|---|---|
| 搜索 Agent | 搜索相关信息 | 研究主题 | URL 列表 |
| 阅读 Agent | 提取网页内容 | URL 列表 | 结构化摘要 |
| 分析 Agent | 分析数据趋势 | 摘要列表 | 分析报告 |
| 总结 Agent | 生成最终报告 | 所有信息 | 研究报告 |
2. 状态共享
所有 Agent 共享一个状态对象:
python
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph
class ResearchState(TypedDict):
# 输入
topic: str # 研究主题
research_type: str # 研究类型:market/academic/tech
# 中间状态
search_queries: List[str] # 搜索查询
urls: List[str] # 搜索到的 URL
summaries: List[dict] # 网页摘要
analysis: Optional[dict] # 分析结果
# 输出
report: Optional[str] # 最终报告
# 元数据
current_step: str # 当前步骤
errors: List[str] # 错误信息
progress: int # 进度百分比3. 可观测性
每个步骤都记录日志,方便调试:
python
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("research_agent")
def log_step(step_name: str, state: ResearchState):
logger.info(f"[{step_name}] Progress: {state['progress']}%")
logger.info(f"[{step_name}] Current data: {len(state.get('urls', []))} URLs")LangGraph 状态图
节点定义
python
from langgraph.graph import StateGraph, END
# 创建状态图
workflow = StateGraph(ResearchState)
# 添加节点
workflow.add_node("search", search_node)
workflow.add_node("read", read_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("summarize", summarize_node)
# 设置入口
workflow.set_entry_point("search")
# 添加边
workflow.add_edge("search", "read")
workflow.add_edge("read", "analyze")
workflow.add_edge("analyze", "summarize")
workflow.add_edge("summarize", END)
# 编译
app = workflow.compile()条件路由
根据研究类型选择不同路径:
python
def route_by_type(state: ResearchState) -> str:
"""根据研究类型决定下一步"""
research_type = state.get("research_type", "general")
if research_type == "quick":
# 快速研究:跳过分析
return "summarize"
else:
# 完整研究:包含分析
return "analyze"
# 添加条件边
workflow.add_conditional_edges(
"read",
route_by_type,
{
"analyze": "analyze",
"summarize": "summarize"
}
)数据流详解
阶段 1:搜索
输入: { topic: "AI 编程工具市场" }
↓
搜索 Agent 生成查询:
- "AI 编程工具 2024"
- "GitHub Copilot vs Cursor"
- "AI 代码助手市场份额"
↓
输出: { urls: ["url1", "url2", ...] }阶段 2:阅读
输入: { urls: ["url1", "url2", ...] }
↓
阅读 Agent 并行处理每个 URL:
- 提取标题
- 提取正文
- 提取关键数据
↓
输出: { summaries: [{title, content, data}, ...] }阶段 3:分析
输入: { summaries: [...] }
↓
分析 Agent:
- 识别共同主题
- 对比不同观点
- 提取数据趋势
↓
输出: { analysis: {themes, comparisons, trends} }阶段 4:总结
输入: { summaries, analysis }
↓
总结 Agent:
- 生成执行摘要
- 整理主要发现
- 给出建议
↓
输出: { report: "完整研究报告" }错误处理
重试机制
python
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def fetch_url(url: str) -> str:
"""带重试的 URL 获取"""
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.text降级策略
python
def read_node(state: ResearchState) -> ResearchState:
"""阅读节点,带降级处理"""
summaries = []
errors = []
for url in state["urls"]:
try:
content = fetch_url(url)
summary = extract_summary(content)
summaries.append(summary)
except Exception as e:
errors.append(f"Failed to read {url}: {str(e)}")
# 继续处理其他 URL,不中断流程
return {
**state,
"summaries": summaries,
"errors": state.get("errors", []) + errors
}项目结构
research-agent/
├── .env # 环境变量
├── .env.example # 环境变量示例
├── requirements.txt # 依赖
├── config.py # 配置
├── main.py # 入口
├── agents/ # Agent 实现
│ ├── __init__.py
│ ├── search.py # 搜索 Agent
│ ├── reader.py # 阅读 Agent
│ ├── analyst.py # 分析 Agent
│ └── summarizer.py # 总结 Agent
├── graph/ # LangGraph 编排
│ ├── __init__.py
│ ├── state.py # 状态定义
│ └── workflow.py # 工作流定义
├── tools/ # 工具函数
│ ├── __init__.py
│ ├── search_tools.py # 搜索工具
│ └── web_tools.py # 网页工具
├── ui/ # 前端界面
│ └── app.py # Streamlit 应用
└── tests/ # 测试
├── test_agents.py
└── test_workflow.py验收标准
完成架构设计后,你应该有:
- [ ] 清晰的状态定义(ResearchState)
- [ ] LangGraph 工作流草图
- [ ] 每个 Agent 的输入输出定义
- [ ] 错误处理策略
- [ ] 项目目录结构
下一步
架构设计完成,开始实现单个 Agent。