单 Agent 开发
架构设计完成后,开始逐个实现各个 Agent。本章采用"先跑通再优化"的策略,每个 Agent 独立开发、独立测试。
开发顺序
按数据流顺序开发,每完成一个就能看到阶段性成果:
- 搜索 Agent → 能搜到东西
- 阅读 Agent → 能读懂网页
- 分析 Agent → 能分析数据
- 总结 Agent → 能生成报告
搜索 Agent 实现
搜索 Agent 负责根据研究主题生成搜索查询,并调用搜索 API 获取相关链接。
核心代码
python
# agents/search.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.tools.tavily_search import TavilySearchResults
from typing import List, Dict
import os
class SearchAgent:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
self.search_tool = TavilySearchResults(max_results=5)
self.query_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个搜索策略专家。根据研究主题,生成 3-5 个搜索查询。
要求:
- 查询要具体,避免太宽泛
- 覆盖不同角度(定义、对比、趋势、案例)
- 使用中英文混合查询提高覆盖率
输出格式:每行一个查询,不要编号"""),
("human", "研究主题:{topic}")
])
def generate_queries(self, topic: str) -> List[str]:
"""生成搜索查询"""
response = self.llm.invoke(
self.query_prompt.format_messages(topic=topic)
)
queries = [q.strip() for q in response.content.split('\n') if q.strip()]
return queries[:5] # 最多 5 个查询
def search(self, topic: str) -> Dict:
"""执行搜索"""
queries = self.generate_queries(topic)
all_results = []
for query in queries:
try:
results = self.search_tool.invoke(query)
all_results.extend(results)
except Exception as e:
print(f"搜索失败: {query}, 错误: {e}")
# 去重
seen_urls = set()
unique_results = []
for r in all_results:
if r['url'] not in seen_urls:
seen_urls.add(r['url'])
unique_results.append(r)
return {
"queries": queries,
"results": unique_results,
"count": len(unique_results)
}测试搜索 Agent
python
# tests/test_search.py
from agents.search import SearchAgent
def test_search_agent():
agent = SearchAgent()
# 测试查询生成
queries = agent.generate_queries("AI 编程工具市场")
assert len(queries) >= 3
print(f"生成查询: {queries}")
# 测试搜索
results = agent.search("AI 编程工具市场")
assert results["count"] > 0
print(f"搜索到 {results['count']} 个结果")
for r in results["results"][:3]:
print(f"- {r['url']}")
if __name__ == "__main__":
test_search_agent()阅读 Agent 实现
阅读 Agent 负责访问 URL,提取网页正文,并生成结构化摘要。
核心代码
python
# agents/reader.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.document_loaders import WebBaseLoader
from typing import List, Dict, Optional
import asyncio
class ReaderAgent:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
self.summary_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个内容分析师。阅读网页内容并提取关键信息。
输出 JSON 格式:
{{
"title": "文章标题",
"main_points": ["要点1", "要点2", "要点3"],
"data": ["具体数据或引用"],
"credibility": "high/medium/low",
"source_type": "官方/媒体/博客/论坛"
}}"""),
("human", "网页内容:\n{content}")
])
def load_url(self, url: str) -> Optional[str]:
"""加载网页内容"""
try:
loader = WebBaseLoader(url)
docs = loader.load()
if docs:
# 限制长度,避免 token 超限
content = docs[0].page_content[:8000]
return content
except Exception as e:
print(f"加载失败: {url}, 错误: {e}")
return None
def summarize(self, url: str, content: str) -> Dict:
"""生成摘要"""
try:
response = self.llm.invoke(
self.summary_prompt.format_messages(content=content)
)
# 解析 JSON
import json
summary = json.loads(response.content)
summary["url"] = url
return summary
except Exception as e:
return {
"url": url,
"title": "解析失败",
"main_points": [],
"error": str(e)
}
def read_urls(self, urls: List[str]) -> List[Dict]:
"""批量读取 URL"""
summaries = []
for url in urls:
content = self.load_url(url)
if content:
summary = self.summarize(url, content)
summaries.append(summary)
return summaries并行读取优化
网页读取是 IO 密集型操作,使用异步提升效率:
python
# agents/reader.py (异步版本)
import aiohttp
from bs4 import BeautifulSoup
class AsyncReaderAgent(ReaderAgent):
async def load_url_async(self, url: str) -> Optional[str]:
"""异步加载网页"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as response:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
# 移除脚本和样式
for tag in soup(['script', 'style', 'nav', 'footer']):
tag.decompose()
return soup.get_text()[:8000]
except Exception as e:
print(f"异步加载失败: {url}")
return None
async def read_urls_async(self, urls: List[str]) -> List[Dict]:
"""并行读取多个 URL"""
tasks = [self.load_url_async(url) for url in urls]
contents = await asyncio.gather(*tasks)
summaries = []
for url, content in zip(urls, contents):
if content:
summary = self.summarize(url, content)
summaries.append(summary)
return summaries分析 Agent 实现
分析 Agent 负责对收集的信息进行深度分析,识别趋势和模式。
核心代码
python
# agents/analyst.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from typing import List, Dict
class AnalystAgent:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
self.analysis_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个数据分析师。分析收集的信息,识别模式和趋势。
分析维度:
1. 共同主题:多个来源都提到的观点
2. 分歧观点:不同来源的不同看法
3. 数据趋势:数字和统计的变化
4. 关键洞察:值得注意的发现
输出 JSON 格式:
{{
"themes": ["主题1", "主题2"],
"disagreements": ["分歧1"],
"trends": ["趋势1", "趋势2"],
"insights": ["洞察1", "洞察2"],
"confidence": "high/medium/low"
}}"""),
("human", "信息摘要:\n{summaries}")
])
def analyze(self, summaries: List[Dict]) -> Dict:
"""分析信息"""
# 格式化输入
formatted = "\n\n".join([
f"来源: {s.get('url', 'unknown')}\n"
f"标题: {s.get('title', 'unknown')}\n"
f"要点: {', '.join(s.get('main_points', []))}\n"
f"数据: {', '.join(s.get('data', []))}"
for s in summaries
])
try:
response = self.llm.invoke(
self.analysis_prompt.format_messages(summaries=formatted)
)
import json
return json.loads(response.content)
except Exception as e:
return {"error": str(e), "confidence": "low"}总结 Agent 实现
总结 Agent 负责整合所有信息,生成最终的研究报告。
核心代码
python
# agents/summarizer.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from typing import Dict
class SummarizerAgent:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o", temperature=0.5)
self.report_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个专业的研究报告撰写者。根据收集和分析的信息,生成结构化报告。
报告结构:
# 执行摘要
(200字概述)
# 主要发现
## 发现1
## 发现2
# 详细分析
(每个发现的深入讨论)
# 结论与建议
(基于分析的建议)
# 参考来源
(列出所有引用的 URL)
要求:
- 客观中立,避免主观判断
- 数据支撑,引用具体来源
- 逻辑清晰,层次分明"""),
("human", """研究主题:{topic}
信息摘要:
{summaries}
分析结果:
{analysis}""")
])
def generate_report(self, topic: str, summaries: list, analysis: dict) -> str:
"""生成研究报告"""
# 格式化摘要
summaries_text = "\n".join([
f"- {s.get('title', 'unknown')}: {', '.join(s.get('main_points', []))}"
for s in summaries
])
# 格式化分析
analysis_text = f"""
主题: {', '.join(analysis.get('themes', []))}
趋势: {', '.join(analysis.get('trends', []))}
洞察: {', '.join(analysis.get('insights', []))}
"""
response = self.llm.invoke(
self.report_prompt.format_messages(
topic=topic,
summaries=summaries_text,
analysis=analysis_text
)
)
return response.content单元测试
每个 Agent 都需要独立测试,确保功能正常:
python
# tests/test_agents.py
import pytest
from agents.search import SearchAgent
from agents.reader import ReaderAgent
from agents.analyst import AnalystAgent
from agents.summarizer import SummarizerAgent
class TestSearchAgent:
def test_generate_queries(self):
agent = SearchAgent()
queries = agent.generate_queries("Python 异步编程")
assert len(queries) >= 3
assert all(isinstance(q, str) for q in queries)
class TestReaderAgent:
def test_load_url(self):
agent = ReaderAgent()
content = agent.load_url("https://python.org")
assert content is not None
assert len(content) > 100
class TestAnalystAgent:
def test_analyze(self):
agent = AnalystAgent()
mock_summaries = [
{"title": "Test", "main_points": ["point1"], "data": ["data1"]}
]
result = agent.analyze(mock_summaries)
assert "themes" in result or "error" in result
class TestSummarizerAgent:
def test_generate_report(self):
agent = SummarizerAgent()
report = agent.generate_report(
topic="测试主题",
summaries=[{"title": "Test", "main_points": ["point"]}],
analysis={"themes": ["theme1"], "trends": [], "insights": []}
)
assert len(report) > 100验收标准
完成单 Agent 开发后,你应该有:
- [ ] 4 个独立可运行的 Agent
- [ ] 每个 Agent 的单元测试通过
- [ ] 搜索 Agent 能返回相关结果
- [ ] 阅读 Agent 能提取网页内容
- [ ] 分析 Agent 能识别主题和趋势
- [ ] 总结 Agent 能生成结构化报告
下一步
单个 Agent 开发完成,开始将它们编排成完整的工作流。