前端界面
后端工作流完成后,需要一个用户友好的界面。本章使用 Streamlit 快速构建前端。
技术选型
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Streamlit | 纯 Python、快速开发 | 定制性有限 | MVP、内部工具 |
| Gradio | 机器学习友好 | 布局受限 | 模型演示 |
| FastAPI + React | 灵活、可扩展 | 开发成本高 | 生产级应用 |
本项目选择 Streamlit,原因:
- 与 LangGraph 集成简单
- 内置流式输出支持
- 零前端经验也能上手
基础界面
python
# ui/app.py
import streamlit as st
from graph.workflow import create_workflow_with_routing
st.set_page_config(
page_title="研究助手",
page_icon="🔍",
layout="wide"
)
st.title("🔍 多 Agent 研究助手")
st.markdown("输入研究主题,AI 将自动搜索、阅读、分析并生成报告。")
# 侧边栏配置
with st.sidebar:
st.header("设置")
research_type = st.selectbox(
"研究类型",
["full", "quick"],
format_func=lambda x: "完整研究" if x == "full" else "快速研究"
)
max_urls = st.slider("最大 URL 数量", 5, 20, 10)
# 主界面
topic = st.text_input("研究主题", placeholder="例如:AI 编程工具市场分析")
if st.button("开始研究", type="primary"):
if not topic:
st.warning("请输入研究主题")
else:
run_research(topic, research_type)进度展示
使用 Streamlit 的状态管理展示实时进度:
python
# ui/app.py (进度展示部分)
def run_research(topic: str, research_type: str):
"""运行研究并展示进度"""
app = create_workflow_with_routing()
# 进度容器
progress_bar = st.progress(0)
status_text = st.empty()
# 各阶段展示区
col1, col2 = st.columns(2)
with col1:
search_container = st.container()
read_container = st.container()
with col2:
analyze_container = st.container()
report_container = st.container()
initial_state = {
"topic": topic,
"research_type": research_type,
"queries": [],
"urls": [],
"summaries": [],
"analysis": None,
"report": None,
"current_step": "init",
"errors": [],
"progress": 0
}
# 流式执行
for event in app.stream(initial_state):
for node, output in event.items():
progress = output.get("progress", 0)
progress_bar.progress(progress / 100)
status_text.text(f"当前步骤: {node}")
# 更新各区域
if node == "search":
with search_container:
st.subheader("🔍 搜索结果")
st.write(f"生成 {len(output.get('queries', []))} 个查询")
st.write(f"找到 {len(output.get('urls', []))} 个链接")
elif node == "read":
with read_container:
st.subheader("📖 阅读摘要")
for s in output.get("summaries", [])[:5]:
st.markdown(f"**{s.get('title', 'Unknown')}**")
elif node == "analyze":
with analyze_container:
st.subheader("📊 分析结果")
analysis = output.get("analysis", {})
if analysis.get("themes"):
st.write("主题:", ", ".join(analysis["themes"]))
elif node == "summarize":
with report_container:
st.subheader("📝 研究报告")
st.markdown(output.get("report", ""))
st.success("研究完成!")流式输出报告
让报告像打字机一样逐字显示:
python
# ui/app.py (流式报告)
def stream_report(report: str):
"""流式显示报告"""
placeholder = st.empty()
displayed = ""
for char in report:
displayed += char
placeholder.markdown(displayed)
time.sleep(0.01) # 控制速度会话历史
保存研究历史,方便回顾:
python
# ui/app.py (会话历史)
import json
from datetime import datetime
# 初始化会话状态
if "history" not in st.session_state:
st.session_state.history = []
def save_research(topic: str, report: str):
"""保存研究记录"""
st.session_state.history.append({
"topic": topic,
"report": report,
"timestamp": datetime.now().isoformat()
})
# 侧边栏显示历史
with st.sidebar:
st.header("历史记录")
for i, item in enumerate(st.session_state.history[-5:]):
if st.button(item["topic"][:20], key=f"history_{i}"):
st.session_state.selected_report = item["report"]
# 显示选中的历史报告
if "selected_report" in st.session_state:
st.markdown(st.session_state.selected_report)导出功能
支持导出为 Markdown 和 PDF:
python
# ui/app.py (导出功能)
def export_report(report: str, topic: str):
"""导出报告"""
col1, col2 = st.columns(2)
with col1:
# Markdown 导出
st.download_button(
label="📄 下载 Markdown",
data=report,
file_name=f"{topic[:20]}_report.md",
mime="text/markdown"
)
with col2:
# PDF 导出(需要额外依赖)
try:
from weasyprint import HTML
import markdown
html = markdown.markdown(report)
pdf = HTML(string=html).write_pdf()
st.download_button(
label="📑 下载 PDF",
data=pdf,
file_name=f"{topic[:20]}_report.pdf",
mime="application/pdf"
)
except ImportError:
st.info("PDF 导出需要安装 weasyprint")错误处理界面
友好地展示错误信息:
python
# ui/app.py (错误处理)
def display_errors(errors: list):
"""显示错误信息"""
if errors:
with st.expander("⚠️ 遇到一些问题", expanded=False):
for error in errors:
st.error(error)完整代码
python
# ui/app.py
import streamlit as st
import time
from datetime import datetime
from graph.workflow import create_workflow_with_routing
st.set_page_config(page_title="研究助手", page_icon="🔍", layout="wide")
# 初始化
if "history" not in st.session_state:
st.session_state.history = []
# 侧边栏
with st.sidebar:
st.header("⚙️ 设置")
research_type = st.selectbox(
"研究类型",
["full", "quick"],
format_func=lambda x: "完整研究" if x == "full" else "快速研究"
)
st.header("📚 历史记录")
for i, item in enumerate(reversed(st.session_state.history[-5:])):
if st.button(f"📄 {item['topic'][:15]}...", key=f"h_{i}"):
st.session_state.view_report = item["report"]
# 主界面
st.title("🔍 多 Agent 研究助手")
topic = st.text_input("输入研究主题")
if st.button("开始研究", type="primary", disabled=not topic):
app = create_workflow_with_routing()
progress = st.progress(0)
status = st.empty()
initial_state = {
"topic": topic,
"research_type": research_type,
"queries": [], "urls": [], "summaries": [],
"analysis": None, "report": None,
"current_step": "init", "errors": [], "progress": 0
}
final_report = ""
for event in app.stream(initial_state):
for node, output in event.items():
progress.progress(output.get("progress", 0) / 100)
status.text(f"正在执行: {node}")
if node == "summarize":
final_report = output.get("report", "")
st.markdown(final_report)
st.session_state.history.append({
"topic": topic,
"report": final_report,
"time": datetime.now().isoformat()
})
# 查看历史报告
if "view_report" in st.session_state:
st.markdown("---")
st.markdown(st.session_state.view_report)启动应用
bash
# 安装依赖
pip install streamlit
# 启动
streamlit run ui/app.py验收标准
完成前端界面后,你应该有:
- [ ] 可运行的 Streamlit 应用
- [ ] 实时进度展示
- [ ] 研究报告正确显示
- [ ] 历史记录功能
- [ ] 导出功能可用
下一步
界面完成,开始进行集成测试。