Skip to content

前端界面

后端工作流完成后,需要一个用户友好的界面。本章使用 Streamlit 快速构建前端。

技术选型

方案优点缺点适用场景
Streamlit纯 Python、快速开发定制性有限MVP、内部工具
Gradio机器学习友好布局受限模型演示
FastAPI + React灵活、可扩展开发成本高生产级应用

本项目选择 Streamlit,原因:

  • 与 LangGraph 集成简单
  • 内置流式输出支持
  • 零前端经验也能上手

基础界面

python
# ui/app.py
import streamlit as st
from graph.workflow import create_workflow_with_routing

st.set_page_config(
    page_title="研究助手",
    page_icon="🔍",
    layout="wide"
)

st.title("🔍 多 Agent 研究助手")
st.markdown("输入研究主题,AI 将自动搜索、阅读、分析并生成报告。")

# 侧边栏配置
with st.sidebar:
    st.header("设置")
    research_type = st.selectbox(
        "研究类型",
        ["full", "quick"],
        format_func=lambda x: "完整研究" if x == "full" else "快速研究"
    )
    max_urls = st.slider("最大 URL 数量", 5, 20, 10)

# 主界面
topic = st.text_input("研究主题", placeholder="例如:AI 编程工具市场分析")

if st.button("开始研究", type="primary"):
    if not topic:
        st.warning("请输入研究主题")
    else:
        run_research(topic, research_type)

进度展示

使用 Streamlit 的状态管理展示实时进度:

python
# ui/app.py (进度展示部分)
def run_research(topic: str, research_type: str):
    """运行研究并展示进度"""
    app = create_workflow_with_routing()

    # 进度容器
    progress_bar = st.progress(0)
    status_text = st.empty()

    # 各阶段展示区
    col1, col2 = st.columns(2)
    with col1:
        search_container = st.container()
        read_container = st.container()
    with col2:
        analyze_container = st.container()
        report_container = st.container()

    initial_state = {
        "topic": topic,
        "research_type": research_type,
        "queries": [],
        "urls": [],
        "summaries": [],
        "analysis": None,
        "report": None,
        "current_step": "init",
        "errors": [],
        "progress": 0
    }

    # 流式执行
    for event in app.stream(initial_state):
        for node, output in event.items():
            progress = output.get("progress", 0)
            progress_bar.progress(progress / 100)
            status_text.text(f"当前步骤: {node}")

            # 更新各区域
            if node == "search":
                with search_container:
                    st.subheader("🔍 搜索结果")
                    st.write(f"生成 {len(output.get('queries', []))} 个查询")
                    st.write(f"找到 {len(output.get('urls', []))} 个链接")

            elif node == "read":
                with read_container:
                    st.subheader("📖 阅读摘要")
                    for s in output.get("summaries", [])[:5]:
                        st.markdown(f"**{s.get('title', 'Unknown')}**")

            elif node == "analyze":
                with analyze_container:
                    st.subheader("📊 分析结果")
                    analysis = output.get("analysis", {})
                    if analysis.get("themes"):
                        st.write("主题:", ", ".join(analysis["themes"]))

            elif node == "summarize":
                with report_container:
                    st.subheader("📝 研究报告")
                    st.markdown(output.get("report", ""))

    st.success("研究完成!")

流式输出报告

让报告像打字机一样逐字显示:

python
# ui/app.py (流式报告)
def stream_report(report: str):
    """流式显示报告"""
    placeholder = st.empty()
    displayed = ""

    for char in report:
        displayed += char
        placeholder.markdown(displayed)
        time.sleep(0.01)  # 控制速度

会话历史

保存研究历史,方便回顾:

python
# ui/app.py (会话历史)
import json
from datetime import datetime

# 初始化会话状态
if "history" not in st.session_state:
    st.session_state.history = []

def save_research(topic: str, report: str):
    """保存研究记录"""
    st.session_state.history.append({
        "topic": topic,
        "report": report,
        "timestamp": datetime.now().isoformat()
    })

# 侧边栏显示历史
with st.sidebar:
    st.header("历史记录")
    for i, item in enumerate(st.session_state.history[-5:]):
        if st.button(item["topic"][:20], key=f"history_{i}"):
            st.session_state.selected_report = item["report"]

# 显示选中的历史报告
if "selected_report" in st.session_state:
    st.markdown(st.session_state.selected_report)

导出功能

支持导出为 Markdown 和 PDF:

python
# ui/app.py (导出功能)
def export_report(report: str, topic: str):
    """导出报告"""
    col1, col2 = st.columns(2)

    with col1:
        # Markdown 导出
        st.download_button(
            label="📄 下载 Markdown",
            data=report,
            file_name=f"{topic[:20]}_report.md",
            mime="text/markdown"
        )

    with col2:
        # PDF 导出(需要额外依赖)
        try:
            from weasyprint import HTML
            import markdown

            html = markdown.markdown(report)
            pdf = HTML(string=html).write_pdf()

            st.download_button(
                label="📑 下载 PDF",
                data=pdf,
                file_name=f"{topic[:20]}_report.pdf",
                mime="application/pdf"
            )
        except ImportError:
            st.info("PDF 导出需要安装 weasyprint")

错误处理界面

友好地展示错误信息:

python
# ui/app.py (错误处理)
def display_errors(errors: list):
    """显示错误信息"""
    if errors:
        with st.expander("⚠️ 遇到一些问题", expanded=False):
            for error in errors:
                st.error(error)

完整代码

python
# ui/app.py
import streamlit as st
import time
from datetime import datetime
from graph.workflow import create_workflow_with_routing

st.set_page_config(page_title="研究助手", page_icon="🔍", layout="wide")

# 初始化
if "history" not in st.session_state:
    st.session_state.history = []

# 侧边栏
with st.sidebar:
    st.header("⚙️ 设置")
    research_type = st.selectbox(
        "研究类型",
        ["full", "quick"],
        format_func=lambda x: "完整研究" if x == "full" else "快速研究"
    )

    st.header("📚 历史记录")
    for i, item in enumerate(reversed(st.session_state.history[-5:])):
        if st.button(f"📄 {item['topic'][:15]}...", key=f"h_{i}"):
            st.session_state.view_report = item["report"]

# 主界面
st.title("🔍 多 Agent 研究助手")

topic = st.text_input("输入研究主题")

if st.button("开始研究", type="primary", disabled=not topic):
    app = create_workflow_with_routing()
    progress = st.progress(0)
    status = st.empty()

    initial_state = {
        "topic": topic,
        "research_type": research_type,
        "queries": [], "urls": [], "summaries": [],
        "analysis": None, "report": None,
        "current_step": "init", "errors": [], "progress": 0
    }

    final_report = ""
    for event in app.stream(initial_state):
        for node, output in event.items():
            progress.progress(output.get("progress", 0) / 100)
            status.text(f"正在执行: {node}")
            if node == "summarize":
                final_report = output.get("report", "")

    st.markdown(final_report)
    st.session_state.history.append({
        "topic": topic,
        "report": final_report,
        "time": datetime.now().isoformat()
    })

# 查看历史报告
if "view_report" in st.session_state:
    st.markdown("---")
    st.markdown(st.session_state.view_report)

启动应用

bash
# 安装依赖
pip install streamlit

# 启动
streamlit run ui/app.py

验收标准

完成前端界面后,你应该有:

  • [ ] 可运行的 Streamlit 应用
  • [ ] 实时进度展示
  • [ ] 研究报告正确显示
  • [ ] 历史记录功能
  • [ ] 导出功能可用

下一步

界面完成,开始进行集成测试。

继续:集成测试 →


← 返回 Agent 编排 | 返回项目二

最近更新

基于 Apache 2.0 许可发布