fastapi 调用ollama之下的sqlcoder模式进行对话操作数据库

devtools/2024/11/17 22:44:37/
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import ollama
import mysql.connector
from mysql.connector.cursor import MySQLCursor
import jsonapp = FastAPI()# 数据库连接配置
DB_CONFIG = {"database": "web",        # 您的数据库,用于存储业务数据"user": "root",          # 数据库用户名,需要有读写权限"password": "XXXXXX",    # 数据库密码,建议使用强密码"host": "127.0.0.1",    # 数据库主机地址,本地开发环境使用localhost"port": "3306"          # MySQL 默认端口,可根据实际配置修改
}# 数据库连接函数
def get_db_connection():try:conn = mysql.connector.connect(**DB_CONFIG)return connexcept Exception as e:raise HTTPException(status_code=500, detail=f"数据库连接失败: {str(e)}")class SQLRequest(BaseModel):question: strdef get_table_relationships():"""动态获取表之间的关联关系"""conn = get_db_connection()cur = conn.cursor()try:# 获取当前数据库名cur.execute("SELECT DATABASE()")db_name = cur.fetchone()[0]# 获取外键关系cur.execute("""SELECT TABLE_NAME,COLUMN_NAME,REFERENCED_TABLE_NAME,REFERENCED_COLUMN_NAMEFROM INFORMATION_SCHEMA.KEY_COLUMN_USAGEWHERE TABLE_SCHEMA = %sAND REFERENCED_TABLE_NAME IS NOT NULLORDER BY TABLE_NAME, COLUMN_NAME""", (db_name,))relationships = []for row in rows:table_name, column_name, ref_table, ref_column = rowrelationships.append(f"-- {table_name}.{column_name} can be joined with {ref_table}.{ref_column}")return "\n".join(relationships) if relationships else "-- No foreign key relationships found"finally:cur.close()conn.close()def get_database_schema():"""获取MySQL数据库表结构,以CREATE TABLE格式返回"""conn = get_db_connection()cur = conn.cursor()try:# 获取当前数据库名cur.execute("SELECT DATABASE()")db_name = cur.fetchone()[0]# 获取所有表的结构信息cur.execute("""SELECT t.TABLE_NAME,c.COLUMN_NAME,c.COLUMN_TYPE,c.IS_NULLABLE,c.COLUMN_KEY,c.COLUMN_COMMENTFROM INFORMATION_SCHEMA.TABLES tJOIN INFORMATION_SCHEMA.COLUMNS c ON t.TABLE_NAME = c.TABLE_NAMEWHERE t.TABLE_SCHEMA = %sAND t.TABLE_TYPE = 'BASE TABLE'ORDER BY t.TABLE_NAME, c.ORDINAL_POSITION""", (db_name,))rows = cur.fetchall()schema = []current_table = Nonetable_columns = []for row in rows:table_name, column_name, column_type, nullable, key, comment = rowif current_table != table_name:if current_table is not None:schema.append(f"CREATE TABLE {current_table} (\n" + ",\n".join(table_columns) + "\n);\n")current_table = table_nametable_columns = []# 构建列定义column_def = f"  {column_name} {column_type.upper()}"if key == "PRI":column_def += " PRIMARY KEY"elif nullable == "NO":column_def += " NOT NULL"if comment:column_def += f" -- {comment}"table_columns.append(column_def)# 添加最后一个表if current_table is not None:schema.append(f"CREATE TABLE {current_table} (\n" + ",\n".join(table_columns) + "\n);\n")return "\n".join(schema)finally:cur.close()conn.close()def get_chinese_table_mapping():"""动态生成表名的中文映射"""conn = get_db_connection()cur = conn.cursor()try:# 获取所有表的注释信息cur.execute("""SELECT t.TABLE_NAME,t.TABLE_COMMENTFROM information_schema.TABLES tWHERE t.TABLE_SCHEMA = DATABASE()ORDER BY t.TABLE_NAME""")mappings = []for table_name, table_comment in cur.fetchall():# 生成表的中文名称chinese_name = table_nameif table_name.startswith('web_'):chinese_name = table_name.replace('web_', '').replace('_', '')if table_comment:chinese_name = table_comment.split('--')[0].strip()# 如果中文名称以"表"结尾,则去掉"表"if chinese_name.endswith('表'):chinese_name = chinese_name[:-1]mappings.append(f'           - "{chinese_name}" -> {table_name} table')return "\n".join(mappings)finally:cur.close()conn.close()@app.post("/query")
async def query_database(request: Request):try:# 获取请求体数据并确保正确处理中文body = await request.body()try:request_data = json.loads(body.decode('utf-8'))except UnicodeDecodeError:request_data = json.loads(body.decode('gbk'))question = request_data.get('question')print(f"收到问题: {question}")  # 调试日志if not question:raise HTTPException(status_code=400, detail="缺少 question 参数")# 获取数据库结构db_schema = get_database_schema()#print(f"数据库结构: {db_schema}")  # 调试日志# 获取中文映射并打印chinese_mapping = get_chinese_table_mapping()#print(f"表映射关系:\n{chinese_mapping}")  # 添加这行来打印映射# 修改 prompt 使用更严格的指导prompt = f"""### Instructions:Convert Chinese question to MySQL query. Follow these rules strictly:1. ONLY return a valid SELECT SQL query2. Use EXACT table names from the mapping below3. DO NOT use any table that's not in the mapping4. For Chinese terms, use these exact mappings:
{chinese_mapping}### Examples:Question: 所有装修记录SQL: SELECT * FROM web_decoration ORDER BY id;Question: 查询装修SQL: SELECT * FROM web_decoration ORDER BY id;### Database Schema:{db_schema}### Question:{question}### SQL Query:"""# 获取 SQL 查询response = ollama.chat(model='sqlcoder:latest', messages=[{'role': 'user', 'content': prompt}])sql_query = response['message']['content'].strip()print(f"生成的SQL: {sql_query}")  # 调试日志# 验证 SQL 查询if "装修" in question and "web_decoration" not in sql_query:sql_query = "SELECT * FROM web_decoration ORDER BY id"if not sql_query.upper().startswith('SELECT'):raise HTTPException(status_code=400, detail="无效的SQL查询格式")conn = get_db_connection()cur = conn.cursor(dictionary=True)try:cur.execute(sql_query)results = cur.fetchall()return {"sql_query": sql_query,"results": results}except mysql.connector.Error as e:raise HTTPException(status_code=400, detail=f"SQL 执行错误: {str(e)}")finally:cur.close()conn.close()except json.JSONDecodeError as e:raise HTTPException(status_code=400, detail=f"JSON 解析错误: {str(e)}")except Exception as e:raise HTTPException(status_code=500, detail=f"处理请求时生错误: {str(e)}")if __name__ == "__main__":import uvicornport = 8666print(f"Starting server on port {port}")uvicorn.run("main:app",host="0.0.0.0", port=port,reload=True)

我来为您解释这段代码的主要功能和结构:

主要功能

这是一个使用 FastAPI 框架开发的 API 服务,主要用于将中文问题转换为 SQL 查询语句并执行。

核心组件说明

  1. 数据库配置与连接
DB_CONFIG = {"database": "web","user": "root","password": "XXXXXX","host": "127.0.0.1","port": "3306"
}

提供了 MySQL 数据库的基本连接配置。

  1. 主要工具函数
  • get_table_relationships(): 获取数据库表之间的外键关系
  • get_database_schema(): 获取数据库表结构
  • get_chinese_table_mapping(): 生成表名的中文映射关系
  1. 核心 API 端点
@app.post("/query")

这个端点接收中文问题,主要处理流程:

  • 接收并解析用户的中文问题
  • 获取数据库结构和表映射
  • 使用 ollama 模型将中文转换为 SQL 查询
  • 执行 SQL 查询并返回结果
  1. 智能转换功能
    使用 ollamasqlcoder 模型将中文问题转换为 SQL 查询,包含:
  • 严格的表名映射
  • SQL 查询验证
  • 错误处理机制

特点

  1. 支持中文输入处理
  2. 自动获取数据库结构
  3. 动态生成中文表名映射
  4. 完善的错误处理机制
  5. 支持热重载的开发模式

使用示例

可以通过 POST 请求访问 /query 端点:

{"question": "查询所有装修记录"
}

服务会返回:

{"sql_query": "SELECT * FROM web_decoration ORDER BY id","results": [...]
}

安全特性

  1. 数据库连接错误处理
  2. SQL 注入防护
  3. 请求体编码自适应(支持 UTF-8 和 GBK)
  4. 查询结果的安全封装

查看效果:
在这里插入图片描述


http://www.ppmy.cn/devtools/134809.html

相关文章

AI技术对软件开发带来的发展

AI 重塑软件开发:流程、优势、挑战与展望 一、流程与模式介绍【传统软件开发 VS AI 参与的软件开发】 传统软件开发流程与模式 需求分析阶段:开发团队与客户进行深入沟通,通过面谈、问卷调查、文档分析等方式收集需求信息。例如,开…

深入理解 source 和 sh、bash 的区别

1 引言 在日常使用 Linux 的过程中,脚本的执行是不可避免的需求之一,而 source、sh、bash 等命令则是执行脚本的常用方式。尽管这些命令都能运行脚本,但它们之间的执行方式和效果却有着显著的区别。这些区别可能会影响到脚本的环境变量、工作…

YOLOv8改进,YOLOv8通过RFAConv卷积创新空间注意力和标准卷积,包括RFCAConv, RFCBAMConv,二次创新C2f结构,助力涨点

摘要 空间注意力已广泛应用于提升卷积神经网络(CNN)的性能,但它存在一定的局限性。作者提出了一个新的视角,认为空间注意力机制本质上解决了卷积核参数共享的问题。然而,空间注意力生成的注意力图信息对于大尺寸卷积核来说是不足够的。因此,提出了一种新型的注意力机制—…

速盾:cdn 支持 php 吗?

在网络开发中,PHP 是一种广泛使用的服务器端脚本语言,用于创建动态网页和 web 应用程序。CDN(Content Delivery Network,内容分发网络)在内容分发方面具有强大的功能,那么它是否支持 PHP 呢? C…

3D Gaussian Splatting 代码层理解之Part2

现在让我们来谈谈高斯分布。我们已经在Part1介绍了如何根据相机的位置获取 3D 点并将其转换为 2D。在本文中,我们将继续处理高斯泼溅的高斯部分。这里用到的是 GitHub 中part2. 我们在这里要做的一个小改动是,我们将使用透视投影,它利用与上一篇文章中所示的不同内部矩阵。…

No Module named pytorchvideo.losses问题解决

问题描述 最近在跑X3D的源码时发现,在conda powershell prompt中安装了pytorchvideo,但是仍然报错:No Module named pytorchvideo.losses 解决方案: 直接去https://gitcode.com/gh_mirrors/py/pytorchvideo/overview?utm_sour…

24. 正则表达式

一、什么是正则表达式 正则表达式(regular expression)又称 规则表达式,是一种文本模式(pattern)。正则表达式使用一个字符串来描述、匹配具有相同规格的字符串,通常被用来检索、替换那些符合某个模式&…

golang对日期格式化

1.对日期格式化为 YYYY-mm-dd, 并且没有数据时,返回空 import ("encoding/json""time" )type DateTime time.Timetype SysRole struct {RoleId int64 gorm:"type:bigint(20);primary_key;auto_increment;角色ID;" json:&quo…