LangGraph构建【NL2SQL】Agent:从零搭建可解释的数据库查询工作流

张开发
2026/4/21 18:09:14 15 分钟阅读

分享文章

LangGraph构建【NL2SQL】Agent:从零搭建可解释的数据库查询工作流
1. 为什么需要NL2SQL Agent想象一下你是一个不会写SQL的业务人员但需要从公司数据库里查询上个月华东地区销售额最高的客户。传统方式需要找技术同事帮忙写查询语句一来一回可能半天就过去了。而NL2SQLNatural Language to SQL技术可以直接把你的自然语言问题转换成SQL查询就像有个懂技术的翻译官。但现实中的数据库查询往往更复杂可能需要先查表结构需要验证SQL语法可能要多次查询才能得到最终结果这就是为什么我们需要智能体Agent——它不只是简单翻译而是能像人类分析师一样思考先做什么、后做什么遇到错误怎么修正。而LangGraph提供的图结构工作流正好能清晰定义这种复杂查询逻辑。2. LangGraph vs 传统LangChain方案2.1 传统Agent的工作方式在LangChain中实现NL2SQL时典型的流程是这样的from langchain_community.agent_toolkits import create_sql_agent agent create_sql_agent( llmChatOpenAI(modelgpt-4), dbdb, agent_typeopenai-tools, verboseTrue ) agent.invoke(找出学分最高的课程)这种方式的优点是简单但缺点也很明显黑箱操作你不知道Agent内部具体调用了哪些工具调试困难出错时难以定位问题环节流程固定无法自定义复杂的查询逻辑2.2 LangGraph的解决方案LangGraph通过图结构将工作流可视化用户提问 → LLM分析 → 工具调用 → 结果处理 → (循环或结束)每个节点都是一个明确的操作单元边代表流转逻辑。我们来看个实际对比特性LangChain AgentLangGraph方案流程可视化❌✅自定义节点有限完全自定义错误处理统一处理按节点处理调试难度高低适合场景简单查询复杂工作流3. 从零构建NL2SQL工作流3.1 准备数据库连接首先建立数据库连接以MySQL为例from sqlalchemy import create_engine from langchain_community.utilities import SQLDatabase # 替换为你的数据库信息 engine create_engine(mysqlmysqlconnector://user:passwordhost:port/dbname) db SQLDatabase(engine) # 测试连接 print(db.run(SELECT * FROM courses LIMIT 1))3.2 定义工具集LangGraph可以和LangChain的工具集无缝集成from langchain_community.agent_toolkits import SQLDatabaseToolkit toolkit SQLDatabaseToolkit(dbdb, llmChatOpenAI(temperature0)) tools toolkit.get_tools() # 工具包括 # - QuerySQLDataBaseTool执行SQL查询 # - InfoSQLDatabaseTool查看表结构 # - QuerySQLCheckerTool语法检查 # - ListSQLDatabaseTool列出所有表3.3 构建工作流图这是最核心的部分我们分步骤实现3.3.1 定义状态结构from typing import TypedDict, Annotated from langgraph.graph.message import add_messages class State(TypedDict): messages: Annotated[list, add_messages] # 消息历史记录3.3.2 添加LLM节点from langgraph.graph import StateGraph builder StateGraph(State) # 绑定工具的LLM llm_with_tools ChatOpenAI(modelgpt-4).bind_tools(tools) def llm_node(state: State): response llm_with_tools.invoke(state[messages]) return {messages: [response]} builder.add_node(llm, llm_node)3.3.3 添加工具节点from langchain_core.messages import ToolMessage class ToolNode: def __init__(self, tools): self.tools {tool.name: tool for tool in tools} def __call__(self, state: State): last_msg state[messages][-1] results [] for tool_call in last_msg.tool_calls: tool self.tools[tool_call[name]] result tool.invoke(tool_call[args]) results.append(ToolMessage( contentstr(result), nametool_call[name] )) return {messages: results} builder.add_node(tools, ToolNode(tools))3.3.4 设置路由逻辑from typing import Literal def router(state: State) - Literal[tools, __end__]: last_msg state[messages][-1] return tools if hasattr(last_msg, tool_calls) else __end__ builder.add_conditional_edges( llm, router, {tools: tools, __end__: __end__} ) builder.add_edge(tools, llm) # 工具执行后返回LLM builder.set_entry_point(llm) graph builder.compile()4. 实战可视化与调试4.1 查看工作流图LangGraph支持输出可视化流程图需要安装额外依赖from IPython.display import Image Image(graph.get_graph().draw_mermaid_png())你会看到一个清晰的流程图开始 → LLM节点LLM节点 → 工具节点如果需要工具工具节点 → LLM节点LLM节点 → 结束如果不需要进一步操作4.2 运行查询现在可以像聊天一样使用from langchain_core.messages import HumanMessage inputs {messages: [HumanMessage(content找出学分最高的课程)]} for output in graph.stream(inputs): for node, value in output.items(): print(f【{node}】输出) print(value[messages][-1].content)典型执行过程LLM决定先调用ListSQLDatabaseTool查看所有表然后调用InfoSQLDatabaseTool查看courses表结构最后生成并执行SQL查询4.3 错误处理实战当查询出错时工作流会自动修正inputs {messages: [HumanMessage(content找出没有学生的课程)]} # LLM可能会先尝试错误查询 # SELECT * FROM courses WHERE student_count 0 # 但通过QuerySQLCheckerTool检查后 # 发现表里没有student_count字段 # 会自动调整为正确的JOIN查询5. 高级技巧与优化建议5.1 性能优化缓存表结构避免重复查询schemafrom functools import lru_cache lru_cache(maxsize32) def get_table_schema(table_name): return db.get_table_info(table_name)限制查询范围防止生成过于耗时的查询def safe_query(query): if DELETE in query.upper(): raise ValueError(不允许执行删除操作) return db.run(query)5.2 准确度提升Few-shot示例提供优质查询样本few_shots [ (列出计算机系的学生, SELECT * FROM students WHERE department 计算机系), (找出平均分超过90的课程, SELECT course_id, AVG(score) FROM scores GROUP BY course_id HAVING AVG(score) 90) ]结果后处理让LLM优化最终输出def format_results(state: State): raw_result state[messages][-1].content return llm.invoke(f将以下SQL结果转换为Markdown表格:\n{raw_result})5.3 生产环境建议添加权限控制不同用户设置不同查询权限实现查询审计记录所有生成的SQL语句设置超时机制防止长时间运行的查询from concurrent.futures import ThreadPoolExecutor, TimeoutError def run_with_timeout(query, timeout30): with ThreadPoolExecutor() as executor: future executor.submit(db.run, query) try: return future.result(timeouttimeout) except TimeoutError: return 查询超时请简化查询条件我在实际项目中遇到过这样的场景市场团队需要实时查询客户数据但每次都要等技术人员写SQL。通过部署这个LangGraph方案后他们的自助查询效率提升了80%而且错误查询的数量反而比人工编写时减少了——因为系统会自动进行语法检查和结果验证。

更多文章