使用异步ORM SQLAlchemy提升web服务性能

news/2024/11/17 18:47:06/

介绍

对于一个web服务,性能的瓶颈最终基本上都会出现在数据库读取的这一步上,如果能够在数据库读取数据的这一段时间自动切换去处理其他请求的话,服务的性能会得到非常显著的提升,因此需要选择一个合适的异步驱动和工具包

SQLAlchemy是一个python中发展比较成熟的ORM数据库工具包,在比较早期的时候它只是一个同步ORM,在1.4版本的时候引入了协程并支持了异步的功能,下面以最通用和常用为出发点,选择aiomysql + sqlalchemy介绍一些比较好的实践

本篇文章内容主要包含:

  1. SQLAlchemy的一些用法和最佳实践介绍
  2. 一个使用SQLAlchemy连接实现pandas的异步read_sql_query方法的实现(也是对上一篇文章末尾提的建议做一个结束)

创建异步引擎并执行SQL查询

from sqlalchemy.engine import URL
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngineengine: AsyncEngine = create_async_engine(URL.create("mysql+aiomysql", "root", "root", "localhost", 3306, "mysql"),
)

创建方式和同步的create_engine()方法没有很大的差别,参数都是一样的

创建好查询引擎之后,接下来使用创建的引擎操作数据库读取数据

# -*- coding: utf-8 -*-
import asynciofrom sqlalchemy import text
from sqlalchemy.engine import URL
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngineloop = asyncio.get_event_loop()
engine: AsyncEngine = create_async_engine(URL.create("mysql+aiomysql", "root", "root", "localhost", 3306, "services"),
)# test_table: 
#    id   name value
# 0   1    abc   123
# 1   2    def   456
# 2   3    fgi   789
# 3   4  fdsas   654
# 4   5   asda   111async def get_data():async with engine.connect() as conn:# query = text("select * from test_table where id > :id or id = :id2").bindparams(id=3, id2=1)query = text("select * from test_table").bindparams(**{})result = await conn.execute(query)ttl = result.rowcountprint(f"总行数:{ttl}")data = result.fetchone()print(data, type(data))  # 获取到的data是一个Row对象print(dict(data))  # 可以直接转成字典length = 1print(f"进度:{length / ttl}")# 继续fetchone()会从下一条数据开始获取data = result.fetchone()print(data)length += 1print(f"进度:{length / ttl}")data = result.fetchmany(2)print(data)# 将数据转成比较通用的"records"格式print(list(map(dict, data)))length += len(data)print(f"进度:{length / ttl}")# fetchmany()也会按继续执行,没有数据之后会返回空列表[]data = result.fetchmany(5)print(data)print(list(map(dict, data)))length += len(data)print(f"进度:{length / ttl}")data = result.fetchall()print(data)print(list(map(dict, data)))length += len(data)print(f"进度:{length / ttl}")

使用engin.connect()从实例中维护的连接池中获取一个连接,然后调用其execute方法执行查询即可,注意这里的sql需要用SQL alchemy的text包起来才能查询,然后就是这里的execute得到的对象和单独使用aiomysql框架的连接时稍有不同,execute结果会返回一个CursorResult对象,然后使用同步的方式对他执行fetchone(), fetchmany(), fetchall()等方法,而且调用多次fetchone()会逐行返回而调用fetchmany()方法也是会从上一次的fetch之后继续读取,相当于再调用一个生成器的next方法

前面说了sql传入执行之前要用SQL alchemy的text方法包起来,这样子做稍微显得有点麻烦,不过这样子做可以让我们很方便地使用占位符:

async def get_data2():async with engine.connect() as conn:query = text("select * from test_table where id > :id or id = :id2").bindparams(id=2, id2=1)# query = text("select * from test_table").bindparams(**{})result = await conn.execute(query)# await conn.commit()   # 如果修改数据库记得要提交data = result.fetchall()print(data)print(list(map(dict, data)))[(1, 'abc', '123'), (3, 'fgi', '789'), (4, 'fdsas', '654'), (5, 'asda', '111')]
[{'id': 1, 'name': 'abc', 'value': '123'}, {'id': 3, 'name': 'fgi', 'value': '789'}, {'id': 4, 'name': 'fdsas', 'value': '654'}, {'id': 5, 'name': 'asda', 'value': '111'}]

其他删除更新等操作可以通过写sql然后放到text中去执行,也可以使用SQL alchemy支持的相关方法,可以访问官网介绍进行更多学习
下面用SQLAlchemy结合pandas的read_sql_query方法修改成异步方式的实践作为结尾

异步SQLAlchemy引擎实现异步的pandas read_sql_query方法

# -*- coding:utf-8 -*-
from __future__ import annotationsimport asyncio
import loggingimport pandas as pd
from pandas._typing import DtypeArg
from pandas.io.sql import _convert_params, _wrap_result
from sqlalchemy import text
from sqlalchemy.engine import URL
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.ext.asyncio import create_async_engineloop_ = asyncio.get_event_loop()class AsyncEngineSQLDatabase:def __init__(self, engine):self.conn: AsyncEngine = engineself.progress = Noneasync def execute(self, *args, **kwargs):async with self.conn.connect() as conn:result = await conn.execute(*args, **kwargs)await conn.commit()return resultasync def async_read_query(self,sql,index_col=None,coerce_float: bool = True,params=None,parse_dates=None,chunksize: int | None = None,dtype: DtypeArg | None = None,):args = _convert_params(sql, params)result = await self.execute(*args)columns = result.keys()if chunksize is not None:return self._query_iterator(result,chunksize,columns,index_col=index_col,coerce_float=coerce_float,parse_dates=parse_dates,dtype=dtype,)else:data = result.fetchall()frame = _wrap_result(data,columns,index_col=index_col,coerce_float=coerce_float,parse_dates=parse_dates,dtype=dtype,)return frameasync_read_sql = async_read_queryasync def _query_iterator(self,result,chunksize: int,columns,index_col=None,coerce_float=True,parse_dates=None,dtype: DtypeArg | None = None,):"""Return generator through chunked result set"""has_read_data = Falsetotal_size = result.rowcountread_size = 0while True:# 本地读取太快了,sleep来方便调试data = result.fetchmany(chunksize)read_size += len(data)progress = f"{round(read_size / total_size, 4): .2%}"self.progress = progressif not data:if not has_read_data:yield _wrap_result([],columns,index_col=index_col,coerce_float=coerce_float,parse_dates=parse_dates,)breakelse:has_read_data = Trueyield _wrap_result(data,columns,index_col=index_col,coerce_float=coerce_float,parse_dates=parse_dates,dtype=dtype,)class AsyncMysqlEngine:def __init__(self, **kwargs):self.engine: AsyncEngine = create_async_engine(URL.create("mysql+aiomysql", "root", "root", "localhost", 3306, "services"))async def execute(self, sql, bind_params=None, is_fetchone=False, fetchmany=None):if hasattr(sql, "get_sql"):sql = sql.get_sql()bind_params = bind_params or {}async with self.engine.connect() as conn:try:sql = text(sql).bindparams(**bind_params)result = await conn.execute(sql)rowcount = result.rowcountif is_fetchone:data = result.fetchone()data = dict(data)else:data = result.fetchall()data = list(map(dict, data))await conn.commit()  # update时需要commit,但其他类型查询例如select时commit也无妨return rowcount, dataexcept Exception as e:logging.error(f"SQL execution met an unexpected error, "f"hint:\n {e}\n, "f"the error query sql is: \n{sql}\n")await conn.rollback()raise easync def async_read_query(self,sql,index_col=None,coerce_float: bool = True,params=None,parse_dates=None,chunksize: int | None = None,dtype: DtypeArg | None = None,):conn = self.engineasync_engine_database = AsyncEngineSQLDatabase(conn)params = params or {}sql = text(sql).bindparams(**params)frame_or_async_generator = await async_engine_database.async_read_query(sql,index_col,coerce_float,params,parse_dates,chunksize,dtype)if chunksize:frames = [pd.DataFrame()]async for frame_part in frame_or_async_generator:progress = async_engine_database.progressprint(progress)frames.append(frame_part)df = pd.concat(frames, ignore_index=True)return dfreturn frame_or_async_generatorasync_engine = AsyncMysqlEngine()async def run_test2():sql = "select * from test_table"df = await async_engine.async_read_query(sql)print(df)async def run_test3():sql = "select * from test_table where id > :id"df = await async_engine.async_read_query(sql, params={"id": 1}, chunksize=3)print(df)if __name__ == '__main__':# loop_.run_until_complete(run_test2())loop_.run_until_complete(run_test3())pass

http://www.ppmy.cn/news/8401.html

相关文章

诞生两年+,三翼鸟的“场景”思维有进化吗?

出品 | 何玺 排版 | 叶媛 2020年9月,三翼鸟品牌正式发布。截止2022年12月,三翼鸟已经走过了2年多的历程。诞生两年,三翼鸟有什么样的发展,它倡导的“场景”思维有进化吗?我们一起来看看。 01 从三翼鸟的“全球首个场…

java接口的静态方法

目前java接口中已经支持定义静态方法 但需要注意一个点 我们先把代码写出来 我们创建一个包 下面创建一个接口 subInterface 接口参考代码如下 public interface subInterface {static void show2() {System.out.println("来自接口的静态方法");} }这里 我们就将…

串口通信协议

同步通信和异步通信 同步通信:需要时钟信号的约束,在时钟信号的驱动下两方进行数据交换,一般会选择在上升沿或者下降沿进行数据的采样,以及时钟极性和时钟相位【eg.SPI,IIC】。 异步通信:不需要时钟信号的同步,通过(…

Go 并发

来自 《Go 语言从入门到实战》 的并发章节学习笔记,欢迎阅读斧正,感觉该专栏整体来说对有些后端编程经验的来说比无后端编程经验的人更友好。。 Thread VS Groutine 创建时默认 Stack 大小:前者默认 1M,Groutint 的 Stack 初始化…

uniCloud云开发----4、uniCloud云开发进阶使用方法

uniCloud云开发进阶使用方法前言1、云对象的importObject的创建和使用(1)创建云对象(2)编辑云对象(3)在.vue文件中调用云对象(4)在.vue文件中调用方法2、客户端直接连接数据库(1)直接在客户端引…

Threejs实现鼠标点击人物行走/镜头跟随人物移动/鼠标点击动画/游戏第三人称/行走动作

1,功能介绍 Threejs获取鼠标点击位置、实现鼠标点击人物行走、人物头顶显示名称标签、镜头跟随人物移动并且镜头围绕人物旋转,类似游戏中第三人称、鼠标点击位置有动画效果,如下效果图 2,功能实现 获取鼠标点击位置,…

《Nuitka打包实战指南》实战打包OpenCV-Python

实战打包OpenCV-Python 打包时解决掉的问题: ModuleNotFoundError: No Module named cv2ImportError: numpy.core.multiarray failed to import打包示例源码: 请看文章末尾 版本信息: opencv-python==4.5.1.48 numpy==1.23.2 Nuitka==0.6.19.1 打包系统: Windows10 64…

【Python】sklearn机器学习之Birch聚类算法

文章目录基本原理sklearn调用基本原理 BIRCH,即Balanced Iterative Reducing and Clustering Using Hierarchies,利用分层的平衡迭代规约和聚类,特点是扫描一次数据就可以实现聚类, 而根据经验,一般这种一遍成功的算…