Skip to content

Agent 编排

单个 Agent 开发完成后,需要将它们组织成一个协调工作的系统。本章使用 LangGraph 实现 Agent 编排。

为什么用 LangGraph

LangGraph 相比其他方案的优势:

特性LangGraph纯 LangChain自己写
状态管理内置需手动需手动
可视化支持不支持不支持
条件路由简单复杂复杂
错误恢复内置需手动需手动
调试工具LangSmithLangSmith

状态定义

首先定义整个工作流共享的状态:

python
# graph/state.py
from typing import TypedDict, List, Optional, Annotated
from operator import add

class ResearchState(TypedDict):
    """研究工作流状态"""
    # 输入
    topic: str
    research_type: str  # quick / full

    # 搜索阶段
    queries: List[str]
    urls: Annotated[List[str], add]  # 可累加

    # 阅读阶段
    summaries: Annotated[List[dict], add]

    # 分析阶段
    analysis: Optional[dict]

    # 输出
    report: Optional[str]

    # 元数据
    current_step: str
    errors: Annotated[List[str], add]
    progress: int

Annotated[List[str], add] 表示这个字段可以累加,多次更新会合并而不是覆盖。

节点函数

每个节点对应一个 Agent 的执行:

python
# graph/nodes.py
from graph.state import ResearchState
from agents.search import SearchAgent
from agents.reader import ReaderAgent
from agents.analyst import AnalystAgent
from agents.summarizer import SummarizerAgent

search_agent = SearchAgent()
reader_agent = ReaderAgent()
analyst_agent = AnalystAgent()
summarizer_agent = SummarizerAgent()

def search_node(state: ResearchState) -> dict:
    """搜索节点"""
    topic = state["topic"]
    result = search_agent.search(topic)

    return {
        "queries": result["queries"],
        "urls": [r["url"] for r in result["results"]],
        "current_step": "search",
        "progress": 25
    }

def read_node(state: ResearchState) -> dict:
    """阅读节点"""
    urls = state["urls"][:10]  # 限制数量
    summaries = reader_agent.read_urls(urls)

    return {
        "summaries": summaries,
        "current_step": "read",
        "progress": 50
    }

def analyze_node(state: ResearchState) -> dict:
    """分析节点"""
    summaries = state["summaries"]
    analysis = analyst_agent.analyze(summaries)

    return {
        "analysis": analysis,
        "current_step": "analyze",
        "progress": 75
    }

def summarize_node(state: ResearchState) -> dict:
    """总结节点"""
    topic = state["topic"]
    summaries = state["summaries"]
    analysis = state.get("analysis", {})

    report = summarizer_agent.generate_report(topic, summaries, analysis)

    return {
        "report": report,
        "current_step": "complete",
        "progress": 100
    }

构建工作流

使用 LangGraph 的 StateGraph 构建工作流:

python
# graph/workflow.py
from langgraph.graph import StateGraph, END
from graph.state import ResearchState
from graph.nodes import search_node, read_node, analyze_node, summarize_node

def create_workflow():
    """创建研究工作流"""
    # 初始化状态图
    workflow = StateGraph(ResearchState)

    # 添加节点
    workflow.add_node("search", search_node)
    workflow.add_node("read", read_node)
    workflow.add_node("analyze", analyze_node)
    workflow.add_node("summarize", summarize_node)

    # 设置入口点
    workflow.set_entry_point("search")

    # 添加边(线性流程)
    workflow.add_edge("search", "read")
    workflow.add_edge("read", "analyze")
    workflow.add_edge("analyze", "summarize")
    workflow.add_edge("summarize", END)

    # 编译
    return workflow.compile()

# 创建应用实例
app = create_workflow()

条件路由

根据研究类型选择不同路径:

python
# graph/workflow.py (带条件路由)
def route_after_read(state: ResearchState) -> str:
    """读取后的路由决策"""
    research_type = state.get("research_type", "full")

    if research_type == "quick":
        # 快速研究跳过分析
        return "summarize"
    elif len(state.get("summaries", [])) < 3:
        # 数据太少,直接总结
        return "summarize"
    else:
        return "analyze"

def create_workflow_with_routing():
    workflow = StateGraph(ResearchState)

    workflow.add_node("search", search_node)
    workflow.add_node("read", read_node)
    workflow.add_node("analyze", analyze_node)
    workflow.add_node("summarize", summarize_node)

    workflow.set_entry_point("search")
    workflow.add_edge("search", "read")

    # 条件路由
    workflow.add_conditional_edges(
        "read",
        route_after_read,
        {
            "analyze": "analyze",
            "summarize": "summarize"
        }
    )

    workflow.add_edge("analyze", "summarize")
    workflow.add_edge("summarize", END)

    return workflow.compile()

错误处理

添加错误处理节点,确保流程健壮:

python
# graph/nodes.py (带错误处理)
def search_node_safe(state: ResearchState) -> dict:
    """带错误处理的搜索节点"""
    try:
        topic = state["topic"]
        result = search_agent.search(topic)

        if not result["results"]:
            return {
                "errors": [f"搜索无结果: {topic}"],
                "current_step": "search_failed",
                "progress": 25
            }

        return {
            "queries": result["queries"],
            "urls": [r["url"] for r in result["results"]],
            "current_step": "search",
            "progress": 25
        }
    except Exception as e:
        return {
            "errors": [f"搜索异常: {str(e)}"],
            "current_step": "search_error",
            "progress": 25
        }

def should_continue(state: ResearchState) -> str:
    """检查是否应该继续"""
    errors = state.get("errors", [])
    if len(errors) > 3:
        return "abort"

    urls = state.get("urls", [])
    if not urls:
        return "abort"

    return "continue"

运行工作流

python
# main.py
from graph.workflow import create_workflow_with_routing

def run_research(topic: str, research_type: str = "full"):
    """运行研究"""
    app = create_workflow_with_routing()

    # 初始状态
    initial_state = {
        "topic": topic,
        "research_type": research_type,
        "queries": [],
        "urls": [],
        "summaries": [],
        "analysis": None,
        "report": None,
        "current_step": "init",
        "errors": [],
        "progress": 0
    }

    # 执行
    result = app.invoke(initial_state)

    return result

if __name__ == "__main__":
    result = run_research("AI 编程工具市场分析", "full")
    print(result["report"])

流式输出

支持实时查看进度:

python
# main.py (流式版本)
def run_research_stream(topic: str, research_type: str = "full"):
    """流式运行研究"""
    app = create_workflow_with_routing()

    initial_state = {
        "topic": topic,
        "research_type": research_type,
        "queries": [],
        "urls": [],
        "summaries": [],
        "analysis": None,
        "report": None,
        "current_step": "init",
        "errors": [],
        "progress": 0
    }

    # 流式执行
    for event in app.stream(initial_state):
        # event 是 {node_name: output} 格式
        for node, output in event.items():
            print(f"[{node}] 进度: {output.get('progress', 0)}%")
            if output.get("errors"):
                print(f"  错误: {output['errors']}")

    return event

可视化调试

使用 LangGraph Studio 可视化调试:

python
# langgraph.json
{
    "graphs": {
        "research": {
            "path": "./graph/workflow.py:app"
        }
    },
    "env": ".env"
}

启动 Studio:

bash
# 安装 CLI
pip install langgraph-cli

# 启动开发服务器
langgraph dev

验收标准

完成 Agent 编排后,你应该有:

  • [ ] 完整的 LangGraph 工作流
  • [ ] 状态在各节点间正确传递
  • [ ] 条件路由正常工作
  • [ ] 错误处理机制完善
  • [ ] 流式输出可用
  • [ ] 能在 LangGraph Studio 中可视化

下一步

工作流编排完成,开始构建用户界面。

继续:前端界面 →


← 返回单 Agent 开发 | 返回项目二

最近更新

基于 Apache 2.0 许可发布