Agent 编排
单个 Agent 开发完成后,需要将它们组织成一个协调工作的系统。本章使用 LangGraph 实现 Agent 编排。
为什么用 LangGraph
LangGraph 相比其他方案的优势:
| 特性 | LangGraph | 纯 LangChain | 自己写 |
|---|---|---|---|
| 状态管理 | 内置 | 需手动 | 需手动 |
| 可视化 | 支持 | 不支持 | 不支持 |
| 条件路由 | 简单 | 复杂 | 复杂 |
| 错误恢复 | 内置 | 需手动 | 需手动 |
| 调试工具 | LangSmith | LangSmith | 无 |
状态定义
首先定义整个工作流共享的状态:
python
# graph/state.py
from typing import TypedDict, List, Optional, Annotated
from operator import add
class ResearchState(TypedDict):
"""研究工作流状态"""
# 输入
topic: str
research_type: str # quick / full
# 搜索阶段
queries: List[str]
urls: Annotated[List[str], add] # 可累加
# 阅读阶段
summaries: Annotated[List[dict], add]
# 分析阶段
analysis: Optional[dict]
# 输出
report: Optional[str]
# 元数据
current_step: str
errors: Annotated[List[str], add]
progress: intAnnotated[List[str], add] 表示这个字段可以累加,多次更新会合并而不是覆盖。
节点函数
每个节点对应一个 Agent 的执行:
python
# graph/nodes.py
from graph.state import ResearchState
from agents.search import SearchAgent
from agents.reader import ReaderAgent
from agents.analyst import AnalystAgent
from agents.summarizer import SummarizerAgent
search_agent = SearchAgent()
reader_agent = ReaderAgent()
analyst_agent = AnalystAgent()
summarizer_agent = SummarizerAgent()
def search_node(state: ResearchState) -> dict:
"""搜索节点"""
topic = state["topic"]
result = search_agent.search(topic)
return {
"queries": result["queries"],
"urls": [r["url"] for r in result["results"]],
"current_step": "search",
"progress": 25
}
def read_node(state: ResearchState) -> dict:
"""阅读节点"""
urls = state["urls"][:10] # 限制数量
summaries = reader_agent.read_urls(urls)
return {
"summaries": summaries,
"current_step": "read",
"progress": 50
}
def analyze_node(state: ResearchState) -> dict:
"""分析节点"""
summaries = state["summaries"]
analysis = analyst_agent.analyze(summaries)
return {
"analysis": analysis,
"current_step": "analyze",
"progress": 75
}
def summarize_node(state: ResearchState) -> dict:
"""总结节点"""
topic = state["topic"]
summaries = state["summaries"]
analysis = state.get("analysis", {})
report = summarizer_agent.generate_report(topic, summaries, analysis)
return {
"report": report,
"current_step": "complete",
"progress": 100
}构建工作流
使用 LangGraph 的 StateGraph 构建工作流:
python
# graph/workflow.py
from langgraph.graph import StateGraph, END
from graph.state import ResearchState
from graph.nodes import search_node, read_node, analyze_node, summarize_node
def create_workflow():
"""创建研究工作流"""
# 初始化状态图
workflow = StateGraph(ResearchState)
# 添加节点
workflow.add_node("search", search_node)
workflow.add_node("read", read_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("summarize", summarize_node)
# 设置入口点
workflow.set_entry_point("search")
# 添加边(线性流程)
workflow.add_edge("search", "read")
workflow.add_edge("read", "analyze")
workflow.add_edge("analyze", "summarize")
workflow.add_edge("summarize", END)
# 编译
return workflow.compile()
# 创建应用实例
app = create_workflow()条件路由
根据研究类型选择不同路径:
python
# graph/workflow.py (带条件路由)
def route_after_read(state: ResearchState) -> str:
"""读取后的路由决策"""
research_type = state.get("research_type", "full")
if research_type == "quick":
# 快速研究跳过分析
return "summarize"
elif len(state.get("summaries", [])) < 3:
# 数据太少,直接总结
return "summarize"
else:
return "analyze"
def create_workflow_with_routing():
workflow = StateGraph(ResearchState)
workflow.add_node("search", search_node)
workflow.add_node("read", read_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("summarize", summarize_node)
workflow.set_entry_point("search")
workflow.add_edge("search", "read")
# 条件路由
workflow.add_conditional_edges(
"read",
route_after_read,
{
"analyze": "analyze",
"summarize": "summarize"
}
)
workflow.add_edge("analyze", "summarize")
workflow.add_edge("summarize", END)
return workflow.compile()错误处理
添加错误处理节点,确保流程健壮:
python
# graph/nodes.py (带错误处理)
def search_node_safe(state: ResearchState) -> dict:
"""带错误处理的搜索节点"""
try:
topic = state["topic"]
result = search_agent.search(topic)
if not result["results"]:
return {
"errors": [f"搜索无结果: {topic}"],
"current_step": "search_failed",
"progress": 25
}
return {
"queries": result["queries"],
"urls": [r["url"] for r in result["results"]],
"current_step": "search",
"progress": 25
}
except Exception as e:
return {
"errors": [f"搜索异常: {str(e)}"],
"current_step": "search_error",
"progress": 25
}
def should_continue(state: ResearchState) -> str:
"""检查是否应该继续"""
errors = state.get("errors", [])
if len(errors) > 3:
return "abort"
urls = state.get("urls", [])
if not urls:
return "abort"
return "continue"运行工作流
python
# main.py
from graph.workflow import create_workflow_with_routing
def run_research(topic: str, research_type: str = "full"):
"""运行研究"""
app = create_workflow_with_routing()
# 初始状态
initial_state = {
"topic": topic,
"research_type": research_type,
"queries": [],
"urls": [],
"summaries": [],
"analysis": None,
"report": None,
"current_step": "init",
"errors": [],
"progress": 0
}
# 执行
result = app.invoke(initial_state)
return result
if __name__ == "__main__":
result = run_research("AI 编程工具市场分析", "full")
print(result["report"])流式输出
支持实时查看进度:
python
# main.py (流式版本)
def run_research_stream(topic: str, research_type: str = "full"):
"""流式运行研究"""
app = create_workflow_with_routing()
initial_state = {
"topic": topic,
"research_type": research_type,
"queries": [],
"urls": [],
"summaries": [],
"analysis": None,
"report": None,
"current_step": "init",
"errors": [],
"progress": 0
}
# 流式执行
for event in app.stream(initial_state):
# event 是 {node_name: output} 格式
for node, output in event.items():
print(f"[{node}] 进度: {output.get('progress', 0)}%")
if output.get("errors"):
print(f" 错误: {output['errors']}")
return event可视化调试
使用 LangGraph Studio 可视化调试:
python
# langgraph.json
{
"graphs": {
"research": {
"path": "./graph/workflow.py:app"
}
},
"env": ".env"
}启动 Studio:
bash
# 安装 CLI
pip install langgraph-cli
# 启动开发服务器
langgraph dev验收标准
完成 Agent 编排后,你应该有:
- [ ] 完整的 LangGraph 工作流
- [ ] 状态在各节点间正确传递
- [ ] 条件路由正常工作
- [ ] 错误处理机制完善
- [ ] 流式输出可用
- [ ] 能在 LangGraph Studio 中可视化
下一步
工作流编排完成,开始构建用户界面。