fastapi 调用ollama之下的sqlcoder模式进行对话操作数据库

embedded/2024/11/20 21:42:33/
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import ollama
import mysql.connector
from mysql.connector.cursor import MySQLCursor
import jsonapp = FastAPI()# 数据库连接配置
DB_CONFIG = {"database": "web",        # 您的数据库,用于存储业务数据"user": "root",          # 数据库用户名,需要有读写权限"password": "XXXXXX",    # 数据库密码,建议使用强密码"host": "127.0.0.1",    # 数据库主机地址,本地开发环境使用localhost"port": "3306"          # MySQL 默认端口,可根据实际配置修改
}# 数据库连接函数
def get_db_connection():try:conn = mysql.connector.connect(**DB_CONFIG)return connexcept Exception as e:raise HTTPException(status_code=500, detail=f"数据库连接失败: {str(e)}")class SQLRequest(BaseModel):question: strdef get_table_relationships():"""动态获取表之间的关联关系"""conn = get_db_connection()cur = conn.cursor()try:# 获取当前数据库名cur.execute("SELECT DATABASE()")db_name = cur.fetchone()[0]# 获取外键关系cur.execute("""SELECT TABLE_NAME,COLUMN_NAME,REFERENCED_TABLE_NAME,REFERENCED_COLUMN_NAMEFROM INFORMATION_SCHEMA.KEY_COLUMN_USAGEWHERE TABLE_SCHEMA = %sAND REFERENCED_TABLE_NAME IS NOT NULLORDER BY TABLE_NAME, COLUMN_NAME""", (db_name,))relationships = []for row in rows:table_name, column_name, ref_table, ref_column = rowrelationships.append(f"-- {table_name}.{column_name} can be joined with {ref_table}.{ref_column}")return "\n".join(relationships) if relationships else "-- No foreign key relationships found"finally:cur.close()conn.close()def get_database_schema():"""获取MySQL数据库表结构,以CREATE TABLE格式返回"""conn = get_db_connection()cur = conn.cursor()try:# 获取当前数据库名cur.execute("SELECT DATABASE()")db_name = cur.fetchone()[0]# 获取所有表的结构信息cur.execute("""SELECT t.TABLE_NAME,c.COLUMN_NAME,c.COLUMN_TYPE,c.IS_NULLABLE,c.COLUMN_KEY,c.COLUMN_COMMENTFROM INFORMATION_SCHEMA.TABLES tJOIN INFORMATION_SCHEMA.COLUMNS c ON t.TABLE_NAME = c.TABLE_NAMEWHERE t.TABLE_SCHEMA = %sAND t.TABLE_TYPE = 'BASE TABLE'ORDER BY t.TABLE_NAME, c.ORDINAL_POSITION""", (db_name,))rows = cur.fetchall()schema = []current_table = Nonetable_columns = []for row in rows:table_name, column_name, column_type, nullable, key, comment = rowif current_table != table_name:if current_table is not None:schema.append(f"CREATE TABLE {current_table} (\n" + ",\n".join(table_columns) + "\n);\n")current_table = table_nametable_columns = []# 构建列定义column_def = f"  {column_name} {column_type.upper()}"if key == "PRI":column_def += " PRIMARY KEY"elif nullable == "NO":column_def += " NOT NULL"if comment:column_def += f" -- {comment}"table_columns.append(column_def)# 添加最后一个表if current_table is not None:schema.append(f"CREATE TABLE {current_table} (\n" + ",\n".join(table_columns) + "\n);\n")return "\n".join(schema)finally:cur.close()conn.close()def get_chinese_table_mapping():"""动态生成表名的中文映射"""conn = get_db_connection()cur = conn.cursor()try:# 获取所有表的注释信息cur.execute("""SELECT t.TABLE_NAME,t.TABLE_COMMENTFROM information_schema.TABLES tWHERE t.TABLE_SCHEMA = DATABASE()ORDER BY t.TABLE_NAME""")mappings = []for table_name, table_comment in cur.fetchall():# 生成表的中文名称chinese_name = table_nameif table_name.startswith('web_'):chinese_name = table_name.replace('web_', '').replace('_', '')if table_comment:chinese_name = table_comment.split('--')[0].strip()# 如果中文名称以"表"结尾,则去掉"表"if chinese_name.endswith('表'):chinese_name = chinese_name[:-1]mappings.append(f'           - "{chinese_name}" -> {table_name} table')return "\n".join(mappings)finally:cur.close()conn.close()@app.post("/query")
async def query_database(request: Request):try:# 获取请求体数据并确保正确处理中文body = await request.body()try:request_data = json.loads(body.decode('utf-8'))except UnicodeDecodeError:request_data = json.loads(body.decode('gbk'))question = request_data.get('question')print(f"收到问题: {question}")  # 调试日志if not question:raise HTTPException(status_code=400, detail="缺少 question 参数")# 获取数据库结构db_schema = get_database_schema()#print(f"数据库结构: {db_schema}")  # 调试日志# 获取中文映射并打印chinese_mapping = get_chinese_table_mapping()#print(f"表映射关系:\n{chinese_mapping}")  # 添加这行来打印映射# 修改 prompt 使用更严格的指导prompt = f"""### Instructions:Convert Chinese question to MySQL query. Follow these rules strictly:1. ONLY return a valid SELECT SQL query2. Use EXACT table names from the mapping below3. DO NOT use any table that's not in the mapping4. For Chinese terms, use these exact mappings:
{chinese_mapping}### Examples:Question: 所有装修记录SQL: SELECT * FROM web_decoration ORDER BY id;Question: 查询装修SQL: SELECT * FROM web_decoration ORDER BY id;### Database Schema:{db_schema}### Question:{question}### SQL Query:"""# 获取 SQL 查询response = ollama.chat(model='sqlcoder:latest', messages=[{'role': 'user', 'content': prompt}])sql_query = response['message']['content'].strip()print(f"生成的SQL: {sql_query}")  # 调试日志# 验证 SQL 查询if "装修" in question and "web_decoration" not in sql_query:sql_query = "SELECT * FROM web_decoration ORDER BY id"if not sql_query.upper().startswith('SELECT'):raise HTTPException(status_code=400, detail="无效的SQL查询格式")conn = get_db_connection()cur = conn.cursor(dictionary=True)try:cur.execute(sql_query)results = cur.fetchall()return {"sql_query": sql_query,"results": results}except mysql.connector.Error as e:raise HTTPException(status_code=400, detail=f"SQL 执行错误: {str(e)}")finally:cur.close()conn.close()except json.JSONDecodeError as e:raise HTTPException(status_code=400, detail=f"JSON 解析错误: {str(e)}")except Exception as e:raise HTTPException(status_code=500, detail=f"处理请求时生错误: {str(e)}")if __name__ == "__main__":import uvicornport = 8666print(f"Starting server on port {port}")uvicorn.run("main:app",host="0.0.0.0", port=port,reload=True)

我来为您解释这段代码的主要功能和结构:

主要功能

这是一个使用 FastAPI 框架开发的 API 服务,主要用于将中文问题转换为 SQL 查询语句并执行。

核心组件说明

  1. 数据库配置与连接
DB_CONFIG = {"database": "web","user": "root","password": "XXXXXX","host": "127.0.0.1","port": "3306"
}

提供了 MySQL 数据库的基本连接配置。

  1. 主要工具函数
  • get_table_relationships(): 获取数据库表之间的外键关系
  • get_database_schema(): 获取数据库表结构
  • get_chinese_table_mapping(): 生成表名的中文映射关系
  1. 核心 API 端点
@app.post("/query")

这个端点接收中文问题,主要处理流程:

  • 接收并解析用户的中文问题
  • 获取数据库结构和表映射
  • 使用 ollama 模型将中文转换为 SQL 查询
  • 执行 SQL 查询并返回结果
  1. 智能转换功能
    使用 ollamasqlcoder 模型将中文问题转换为 SQL 查询,包含:
  • 严格的表名映射
  • SQL 查询验证
  • 错误处理机制

特点

  1. 支持中文输入处理
  2. 自动获取数据库结构
  3. 动态生成中文表名映射
  4. 完善的错误处理机制
  5. 支持热重载的开发模式

使用示例

可以通过 POST 请求访问 /query 端点:

{"question": "查询所有装修记录"
}

服务会返回:

{"sql_query": "SELECT * FROM web_decoration ORDER BY id","results": [...]
}

安全特性

  1. 数据库连接错误处理
  2. SQL 注入防护
  3. 请求体编码自适应(支持 UTF-8 和 GBK)
  4. 查询结果的安全封装

查看效果:
在这里插入图片描述


http://www.ppmy.cn/embedded/139184.html

相关文章

199. 二叉树的右视图【 力扣(LeetCode) 】

文章目录 零、原题链接一、题目描述二、测试用例三、解题思路四、参考代码 零、原题链接 199. 二叉树的右视图 一、题目描述 给定一个二叉树的 根节点 root,想象自己站在它的右侧,按照从顶部到底部的顺序,返回从右侧所能看到的节点值。 二…

基于 Python Django 的二手房间可视化系统分析

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…

AI工业大模型报告:体系架构、关键技术与典型应用

研究意义 随着新一代人工智能的发展, 大模型(如 GPT-4o 等)凭借大规模训练数据、网络参数和算 力涌现出强大的生成能力、泛化能力和自然交互能力, 展现出改变工业世界的巨大潜力. 尽管大模型 已在自然语言等多个领域取得突破性进展, 但其在工业应用中的…

【Flutter 问题系列第 84 篇】如何清除指定网络图片的缓存

这是【Flutter 问题系列第 84 篇】,如果觉得有用的话,欢迎关注专栏。 博文当前所用 Flutter SDK:3.24.3、Dart SDK:3.5.3,网络图片缓存用的插件 cached_network_image: 3.4.1,缓存的网络图像的存储和检索用…

第T8周:Tensorflow实现猫狗识别(1)

🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 具体实现 (一)环境 语言环境:Python 3.10 编 译 器: PyCharm 框 架: (二)具体步骤 from absl.l…

传奇996_24——变量lua

1. 引擎变量 系统变量也叫全局变量,玩家变量也叫个人变量,个人标识也是个人变量 系统变量A,G,I 介绍: 个数一共1100个,分三种 (1)A字符型系统变量,重启服务器保存&am…

【经典】webpack和vite的区别?

‌Webpack和Vite在构建速度、开发体验和构建结果等方面存在显著区别。‌ Webpack是一个传统的构建工具,它在开发过程中需要对整个应用或大部分应用进行打包,这导致在大型项目中,打包过程非常耗时,尤其是在页面代码更改后&#xf…

K8S资源限制之ResourceQuota

ResourceQuota介绍 在K8S中,大部分资源都可以指定到一个名称空间下,因此可以对一个名称空间的计算资源,存储资源,资源数量等维度做资源限制。 如限制pod数量、svc数量,控制器数量,限制PVC请求的存储量 注…