Skip to content

单 Agent 开发

架构设计完成后,开始逐个实现各个 Agent。本章采用"先跑通再优化"的策略,每个 Agent 独立开发、独立测试。

开发顺序

按数据流顺序开发,每完成一个就能看到阶段性成果:

  1. 搜索 Agent → 能搜到东西
  2. 阅读 Agent → 能读懂网页
  3. 分析 Agent → 能分析数据
  4. 总结 Agent → 能生成报告

搜索 Agent 实现

搜索 Agent 负责根据研究主题生成搜索查询,并调用搜索 API 获取相关链接。

核心代码

python
# agents/search.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.tools.tavily_search import TavilySearchResults
from typing import List, Dict
import os

class SearchAgent:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
        self.search_tool = TavilySearchResults(max_results=5)

        self.query_prompt = ChatPromptTemplate.from_messages([
            ("system", """你是一个搜索策略专家。根据研究主题,生成 3-5 个搜索查询。

要求:
- 查询要具体,避免太宽泛
- 覆盖不同角度(定义、对比、趋势、案例)
- 使用中英文混合查询提高覆盖率

输出格式:每行一个查询,不要编号"""),
            ("human", "研究主题:{topic}")
        ])

    def generate_queries(self, topic: str) -> List[str]:
        """生成搜索查询"""
        response = self.llm.invoke(
            self.query_prompt.format_messages(topic=topic)
        )
        queries = [q.strip() for q in response.content.split('\n') if q.strip()]
        return queries[:5]  # 最多 5 个查询

    def search(self, topic: str) -> Dict:
        """执行搜索"""
        queries = self.generate_queries(topic)
        all_results = []

        for query in queries:
            try:
                results = self.search_tool.invoke(query)
                all_results.extend(results)
            except Exception as e:
                print(f"搜索失败: {query}, 错误: {e}")

        # 去重
        seen_urls = set()
        unique_results = []
        for r in all_results:
            if r['url'] not in seen_urls:
                seen_urls.add(r['url'])
                unique_results.append(r)

        return {
            "queries": queries,
            "results": unique_results,
            "count": len(unique_results)
        }

测试搜索 Agent

python
# tests/test_search.py
from agents.search import SearchAgent

def test_search_agent():
    agent = SearchAgent()

    # 测试查询生成
    queries = agent.generate_queries("AI 编程工具市场")
    assert len(queries) >= 3
    print(f"生成查询: {queries}")

    # 测试搜索
    results = agent.search("AI 编程工具市场")
    assert results["count"] > 0
    print(f"搜索到 {results['count']} 个结果")

    for r in results["results"][:3]:
        print(f"- {r['url']}")

if __name__ == "__main__":
    test_search_agent()

阅读 Agent 实现

阅读 Agent 负责访问 URL,提取网页正文,并生成结构化摘要。

核心代码

python
# agents/reader.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.document_loaders import WebBaseLoader
from typing import List, Dict, Optional
import asyncio

class ReaderAgent:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

        self.summary_prompt = ChatPromptTemplate.from_messages([
            ("system", """你是一个内容分析师。阅读网页内容并提取关键信息。

输出 JSON 格式:
{{
    "title": "文章标题",
    "main_points": ["要点1", "要点2", "要点3"],
    "data": ["具体数据或引用"],
    "credibility": "high/medium/low",
    "source_type": "官方/媒体/博客/论坛"
}}"""),
            ("human", "网页内容:\n{content}")
        ])

    def load_url(self, url: str) -> Optional[str]:
        """加载网页内容"""
        try:
            loader = WebBaseLoader(url)
            docs = loader.load()
            if docs:
                # 限制长度,避免 token 超限
                content = docs[0].page_content[:8000]
                return content
        except Exception as e:
            print(f"加载失败: {url}, 错误: {e}")
        return None

    def summarize(self, url: str, content: str) -> Dict:
        """生成摘要"""
        try:
            response = self.llm.invoke(
                self.summary_prompt.format_messages(content=content)
            )
            # 解析 JSON
            import json
            summary = json.loads(response.content)
            summary["url"] = url
            return summary
        except Exception as e:
            return {
                "url": url,
                "title": "解析失败",
                "main_points": [],
                "error": str(e)
            }

    def read_urls(self, urls: List[str]) -> List[Dict]:
        """批量读取 URL"""
        summaries = []
        for url in urls:
            content = self.load_url(url)
            if content:
                summary = self.summarize(url, content)
                summaries.append(summary)
        return summaries

并行读取优化

网页读取是 IO 密集型操作,使用异步提升效率:

python
# agents/reader.py (异步版本)
import aiohttp
from bs4 import BeautifulSoup

class AsyncReaderAgent(ReaderAgent):
    async def load_url_async(self, url: str) -> Optional[str]:
        """异步加载网页"""
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, timeout=10) as response:
                    html = await response.text()
                    soup = BeautifulSoup(html, 'html.parser')
                    # 移除脚本和样式
                    for tag in soup(['script', 'style', 'nav', 'footer']):
                        tag.decompose()
                    return soup.get_text()[:8000]
        except Exception as e:
            print(f"异步加载失败: {url}")
            return None

    async def read_urls_async(self, urls: List[str]) -> List[Dict]:
        """并行读取多个 URL"""
        tasks = [self.load_url_async(url) for url in urls]
        contents = await asyncio.gather(*tasks)

        summaries = []
        for url, content in zip(urls, contents):
            if content:
                summary = self.summarize(url, content)
                summaries.append(summary)
        return summaries

分析 Agent 实现

分析 Agent 负责对收集的信息进行深度分析,识别趋势和模式。

核心代码

python
# agents/analyst.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from typing import List, Dict

class AnalystAgent:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0.3)

        self.analysis_prompt = ChatPromptTemplate.from_messages([
            ("system", """你是一个数据分析师。分析收集的信息,识别模式和趋势。

分析维度:
1. 共同主题:多个来源都提到的观点
2. 分歧观点:不同来源的不同看法
3. 数据趋势:数字和统计的变化
4. 关键洞察:值得注意的发现

输出 JSON 格式:
{{
    "themes": ["主题1", "主题2"],
    "disagreements": ["分歧1"],
    "trends": ["趋势1", "趋势2"],
    "insights": ["洞察1", "洞察2"],
    "confidence": "high/medium/low"
}}"""),
            ("human", "信息摘要:\n{summaries}")
        ])

    def analyze(self, summaries: List[Dict]) -> Dict:
        """分析信息"""
        # 格式化输入
        formatted = "\n\n".join([
            f"来源: {s.get('url', 'unknown')}\n"
            f"标题: {s.get('title', 'unknown')}\n"
            f"要点: {', '.join(s.get('main_points', []))}\n"
            f"数据: {', '.join(s.get('data', []))}"
            for s in summaries
        ])

        try:
            response = self.llm.invoke(
                self.analysis_prompt.format_messages(summaries=formatted)
            )
            import json
            return json.loads(response.content)
        except Exception as e:
            return {"error": str(e), "confidence": "low"}

总结 Agent 实现

总结 Agent 负责整合所有信息,生成最终的研究报告。

核心代码

python
# agents/summarizer.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from typing import Dict

class SummarizerAgent:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0.5)

        self.report_prompt = ChatPromptTemplate.from_messages([
            ("system", """你是一个专业的研究报告撰写者。根据收集和分析的信息,生成结构化报告。

报告结构:
# 执行摘要
(200字概述)

# 主要发现
## 发现1
## 发现2

# 详细分析
(每个发现的深入讨论)

# 结论与建议
(基于分析的建议)

# 参考来源
(列出所有引用的 URL)

要求:
- 客观中立,避免主观判断
- 数据支撑,引用具体来源
- 逻辑清晰,层次分明"""),
            ("human", """研究主题:{topic}

信息摘要:
{summaries}

分析结果:
{analysis}""")
        ])

    def generate_report(self, topic: str, summaries: list, analysis: dict) -> str:
        """生成研究报告"""
        # 格式化摘要
        summaries_text = "\n".join([
            f"- {s.get('title', 'unknown')}: {', '.join(s.get('main_points', []))}"
            for s in summaries
        ])

        # 格式化分析
        analysis_text = f"""
主题: {', '.join(analysis.get('themes', []))}
趋势: {', '.join(analysis.get('trends', []))}
洞察: {', '.join(analysis.get('insights', []))}
"""

        response = self.llm.invoke(
            self.report_prompt.format_messages(
                topic=topic,
                summaries=summaries_text,
                analysis=analysis_text
            )
        )
        return response.content

单元测试

每个 Agent 都需要独立测试,确保功能正常:

python
# tests/test_agents.py
import pytest
from agents.search import SearchAgent
from agents.reader import ReaderAgent
from agents.analyst import AnalystAgent
from agents.summarizer import SummarizerAgent

class TestSearchAgent:
    def test_generate_queries(self):
        agent = SearchAgent()
        queries = agent.generate_queries("Python 异步编程")
        assert len(queries) >= 3
        assert all(isinstance(q, str) for q in queries)

class TestReaderAgent:
    def test_load_url(self):
        agent = ReaderAgent()
        content = agent.load_url("https://python.org")
        assert content is not None
        assert len(content) > 100

class TestAnalystAgent:
    def test_analyze(self):
        agent = AnalystAgent()
        mock_summaries = [
            {"title": "Test", "main_points": ["point1"], "data": ["data1"]}
        ]
        result = agent.analyze(mock_summaries)
        assert "themes" in result or "error" in result

class TestSummarizerAgent:
    def test_generate_report(self):
        agent = SummarizerAgent()
        report = agent.generate_report(
            topic="测试主题",
            summaries=[{"title": "Test", "main_points": ["point"]}],
            analysis={"themes": ["theme1"], "trends": [], "insights": []}
        )
        assert len(report) > 100

验收标准

完成单 Agent 开发后,你应该有:

  • [ ] 4 个独立可运行的 Agent
  • [ ] 每个 Agent 的单元测试通过
  • [ ] 搜索 Agent 能返回相关结果
  • [ ] 阅读 Agent 能提取网页内容
  • [ ] 分析 Agent 能识别主题和趋势
  • [ ] 总结 Agent 能生成结构化报告

下一步

单个 Agent 开发完成,开始将它们编排成完整的工作流。

继续:Agent 编排 →


← 返回架构设计 | 返回项目二

最近更新

基于 Apache 2.0 许可发布