LangGraph实战:从零构建数据分析助手智能体

一、项目背景与目标

在数字化转型浪潮中,企业需要快速从海量数据中提取价值。传统数据分析工具依赖人工操作,而基于LangGraph框架的AI智能体可通过自然语言交互自动完成数据清洗、可视化生成、趋势解读等任务。本教程以电商销售数据分析场景为例,实现一个能理解用户意图、调用分析工具并生成结构化报告的智能体。

二、核心架构设计

1. 状态机拓扑结构

采用LangGraph的FiniteStateMachine模式,定义6个核心状态节点:

  • 初始意图识别:通过大语言模型(LLM)解析用户原始输入
  • 数据源验证:检查用户指定的数据表是否存在
  • 分析类型选择:提供趋势分析、对比分析、异常检测等选项
  • 可视化配置:确定图表类型、维度、指标等参数
  • 报告生成:整合分析结果生成Markdown格式报告
  • 异常恢复:处理数据缺失、权限不足等异常场景
  1. from langgraph.prebuilt import FiniteStateMachine
  2. class DataAnalysisAgent(FiniteStateMachine):
  3. def __init__(self):
  4. states = [
  5. "parse_intent",
  6. "validate_datasource",
  7. "select_analysis_type",
  8. "configure_visualization",
  9. "generate_report",
  10. "handle_exception"
  11. ]
  12. super().__init__(states=states)

2. 工具链集成方案

构建三级工具体系:

  • 基础工具:数据库查询(SQLAlchemy)、文件解析(Pandas)
  • 分析工具:统计分析(SciPy)、可视化(Matplotlib/Plotly)
  • 领域工具:电商指标计算(转化率、客单价等专用函数)
  1. from langchain_community.tools import Tool
  2. def create_sql_query_tool(db_engine):
  3. async def _run(query: str) -> str:
  4. try:
  5. df = pd.read_sql(query, db_engine)
  6. return df.to_csv(index=False)
  7. except Exception as e:
  8. return f"Query failed: {str(e)}"
  9. return Tool(
  10. name="SQLQueryTool",
  11. func=_run,
  12. description="Execute SQL queries on analytical database"
  13. )

三、关键实现步骤

1. 意图解析模块开发

采用”提示词工程+关键词匹配”双模式解析:

  1. from langchain.prompts import PromptTemplate
  2. INTENT_TEMPLATE = """
  3. 用户输入: {user_input}
  4. 请从以下类别中选择最匹配的意图:
  5. 1. 销售趋势分析
  6. 2. 用户行为对比
  7. 3. 异常交易检测
  8. 4. 数据质量检查
  9. 5. 其他请求
  10. 选择结果(数字):"""
  11. def parse_intent(input_text):
  12. prompt = PromptTemplate(template=INTENT_TEMPLATE, input_variables=["user_input"])
  13. llm_response = llm(prompt.format(user_input=input_text))
  14. try:
  15. return int(llm_response.split()[-1])
  16. except:
  17. return 5 # 默认其他请求

2. 动态可视化生成

通过配置驱动图表生成,支持用户自定义修改:

  1. class VisualizationConfig:
  2. def __init__(self):
  3. self.chart_type = None # bar/line/pie
  4. self.x_axis = None
  5. self.y_axis = []
  6. self.filters = {}
  7. def generate_plotly_chart(config: VisualizationConfig, data_df):
  8. fig = go.Figure()
  9. if config.chart_type == "bar":
  10. fig.add_bar(x=data_df[config.x_axis], y=data_df[config.y_axis[0]])
  11. elif config.chart_type == "line":
  12. for col in config.y_axis:
  13. fig.add_scatter(x=data_df[config.x_axis], y=data_df[col], name=col)
  14. # 其他图表类型...
  15. return fig.to_html()

3. 异常处理机制

实现三级异常恢复策略:

  1. 数据层:自动检测缺失值并提示补全
  2. 权限层:检查数据访问权限,引导用户切换数据源
  3. 逻辑层:当分析结果不合理时,建议调整参数
  1. class ExceptionHandler:
  2. @staticmethod
  3. async def handle_data_error(error):
  4. if "column not found" in str(error):
  5. return "检测到指定列不存在,请检查数据字段名或选择其他分析维度"
  6. elif "permission denied" in str(error):
  7. return "当前账号无权访问该数据表,请联系管理员或切换数据源"
  8. return "数据处理异常,建议简化查询条件重试"

四、性能优化实践

1. 缓存策略设计

  • 查询结果缓存:对相同参数的SQL查询结果缓存24小时
  • 意图解析缓存:存储常见问题的标准解析结果
  • 可视化模板缓存:复用高频图表配置
  1. from functools import lru_cache
  2. @lru_cache(maxsize=100)
  3. def cached_query(query_hash: str, params: dict):
  4. # 实际执行数据库查询
  5. pass

2. 异步处理优化

采用asyncio实现IO密集型操作的并发处理:

  1. import asyncio
  2. async def run_analysis_pipeline(config):
  3. tasks = [
  4. asyncio.create_task(fetch_data(config)),
  5. asyncio.create_task(preprocess_data(config)),
  6. asyncio.create_task(generate_visuals(config))
  7. ]
  8. results = await asyncio.gather(*tasks, return_exceptions=True)
  9. # 处理结果...

五、部署与扩展建议

1. 容器化部署方案

  1. FROM python:3.9-slim
  2. WORKDIR /app
  3. COPY requirements.txt .
  4. RUN pip install -r requirements.txt --no-cache-dir
  5. COPY . .
  6. CMD ["python", "agent_server.py"]

2. 水平扩展策略

  • 状态机实例隔离:不同分析类型运行独立实例
  • 工具服务解耦:将数据库查询、可视化生成等拆分为微服务
  • 动态资源分配:根据负载自动调整worker数量

3. 安全增强措施

  • 实施数据脱敏:在日志中隐藏敏感字段
  • 访问控制:集成OAuth2.0进行用户认证
  • 审计追踪:记录所有分析操作的完整链路

六、完整项目示例

访问GitHub示例仓库(示例链接)可获取:

  1. 完整的main.py启动脚本
  2. 预配置的prompts/目录
  3. 测试用例集合
  4. 部署配置模板

本教程通过实际项目演示了LangGraph框架在数据分析场景的强大能力。开发者可基于此架构快速构建垂直领域的数据分析助手,建议从简单场景入手,逐步增加复杂工具和异常处理逻辑。后续可探索多智能体协作、实时数据流分析等高级特性。