文章目录
- 同步操作
- 同步连Mysql
- 同步连redis
- 同步连mongodb
- 异步操作
- 异步连mysql
- 异步连redis
- 异步连mongodb
同步操作
同步连Mysql
python 连接mysql可以使用pymysql、mysqlclient等。
安装:
# win
pip install pymysql
连接mysql:
# __author__ = "laufing"
import pymysql# 连接
conn = pymysql.connect(host="localhost", port=3306, user="lauf", password="xxx", # 太长直接换行database="test_lauf", charset="utf8")
# 开启事务
conn.begin()# 获取游标
cursor = conn.cursor()
# 执行操作
cursor.execute("create table if not exists stu(id int primary key auto_increment, name varchar(50) unique , age int, price decimal, ""birth date)engine=innodb;") # innodb 支持事务、外键;行级锁
cursor.execute("insert into stu(id, name, age, price, birth) values (1, 'jack', 12, 20.4, '2000-01-20'),""(2, 'lucy', 18, 300.5, '1990-05-23');")# ...# 尝试回滚 不会删除已创建的表
# conn.rollback()# 提交
conn.commit()# 最后关闭
cursor.close()
conn.close()
以上cursor操作部分,可以使用with来操作。
自定义上下文管理器,实现数据库的连接:
- 进入with作用域前,连接db、开启事务、 并获取游标;
- with作用域实现sql操作;
- with作用域结束退出时
# __author__ = "laufing"
import pymysql# 自定义上下文管理器
class LaufConnectMysql():def __init__(self, **kwargs):# kwargs 收集 pymysql连接db的参数 + transaction: booltransaction = kwargs.pop("transaction")self.conn = pymysql.connect(**kwargs)self.cursor = Noneif transaction:# 开启事务self.conn.begin()def __enter__(self):""" 进入with作用域前 即执行"""print("进入with作用域前的准备工作(as)...")# 返回什么,with xxx as xxx, as 后面拿到的就是什么# 返回cursor游标对象self.cursor = self.conn.cursor()self.cursor.conn = self.connreturn self.cursordef __exit__(self, exc_type, exc_val, exc_tb):print("退出with作用域的收尾工作...")# 关闭游标self.cursor.close()# 关闭连接self.conn.close()# 防止内存泄漏if __name__ == '__main__':# 连接db的参数kw_args = {"host": "localhost","port": 3306,"user": "lauf","password": "xxx","database": "test_lauf","charset": "utf8","transaction": True}# with 操作with LaufConnectMysql(**kw_args) as cursor: #print("with作用域:", cursor, type(cursor))# 执行操作try:cursor.execute("create table if not exists stu(id int primary key auto_increment, name varchar(50) unique , age int, price decimal, ""birth date)engine=innodb;") # innodb 支持事务、外键;行级锁cursor.execute("insert into stu(id, name, age, price, birth) values (1, 'jack', 12, 20.4, '2000-01-20'),""(1, 'lucy', 18, 300.5, '1990-05-23');")except Exception as e:print("sql操作异常:", e.args)cursor.conn.rollback()finally:cursor.conn.commit()
同步连redis
python 同步连接redis可以使用redis包。
安装
# win
pip install redis
连接redis
import redisconn = redis.Redis(host="localhost", port=6379, db=0)
print(conn.exists("user_1"))
使用连接池:
import redis# 创建连接池
pool = redis.ConnectionPool(host="localhost", port=6379, db=0, max_connections=30)# 创建一个连接
conn = redis.Redis(connection_pool=pool)
print(conn.type("user_1"))
# 创建第二个连接
conn1 = redis.Redis(connection_pool=pool)
print(conn1.exists("count1"))
同步连mongodb
python同步连接mongodb可以使用pymongo包。
异步操作
- 基于协程的异步;
- 基于线程池、进程池的异步;
异步连mysql
异步连接mysql使用aiomysql(基于协程);
若没有对应的异步操作模块,则考虑使用线程池&进程池实现。
安装:
pip install aiomysql
异步连接mysql:
import asyncio
import aiomysql # 基于pymysql实现async def main():# 异步连接mysqlconn = await aiomysql.connect(host="localhost", port=3306, user="lauf",password="xxx", db="test_lauf")# 开启事务await conn.begin()# 获取游标cur = await conn.cursor()# 执行sqltry:await cur.execute("create table if not exists asyncstu(id int primary key auto_increment, ""name varchar(50));")await cur.execute("insert into asyncStu(id, name) values(1, '666');")except Exception as e:print("sql操作异常:", e.args)# 回滚await conn.rollback()finally:# 提交await conn.commit()# 查询await cur.execute("select * from asyncstu;")result = await cur.fetchall()print("查询的结果:", result)# 关闭游标await cur.close()conn.close()# 普通的阻塞式函数
def func():# 开启事件循环 asyncio.run(main())if __name__ == '__main__':func()
使用async def定义的协程函数,必须执行才返回协程对象;
协程对象必须在asyncio的事件循环中执行;
异步连redis
# pip install aioredisimport asyncio
import aioredisasync def connect(host, port):# 连接是IO操作conn = await aioredis.Redis(host=host, port=port)# 读写是IO操作result = await conn.keys("*")print("all keys:", result)# 断开连接是IO操作await conn.close()if __name__ == '__main__':asyncio.run(connect("localhost", 6379))
异步连mongodb
python中无aiopymongo,可以考虑使用线程池完成异步连接mongodb。