《Python实战进阶》No17: 数据库连接与 ORM(SQLAlchemy 实战)

news/2025/3/13 17:56:42/

No17: 数据库连接与 ORM(SQLAlchemy 实战)


摘要
本文深入探讨SQLAlchemy在复杂场景下的高级应用,涵盖四大核心主题:

  1. 会话生命周期管理:通过事件钩子实现事务监控与审计追踪
  2. 混合继承映射:结合单表/连接表继承优势实现多态模型
  3. 高性能操作:批量插入与核心SQL构造优化大数据处理
  4. 异步支持:基于asyncmy的MySQL异步引擎实践

实战案例包含电商系统的分库分表实现策略与基于ORM的审计日志系统设计。扩展部分解析多租户架构的三种隔离方案及SQL注入防御的ORM层最佳实践。配套代码演示了从分片路由到异步操作的完整电商系统实现,帮助开发者掌握构建高并发、高安全性的企业级数据库应用能力。

在这里插入图片描述

核心概念

1. 会话生命周期管理(Session事件钩子)

通过事件钩子实现精细化的会话控制,典型场景包括事务边界管理、变更追踪和审计日志。

from sqlalchemy import event
from sqlalchemy.orm import sessionmakerSession = sessionmaker()# 事务提交前事件
@event.listens_for(Session, 'before_commit')
def before_commit(session):print("即将提交事务,当前待处理变更:", session.dirty)# 事务回滚后事件
@event.listens_for(Session, 'after_rollback')
def after_rollback(session):print("事务已回滚,清理临时状态")

2. 混合继承映射策略

结合单表继承和连接表继承的优势,实现灵活的多态查询:

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationshipBase = declarative_base()class Product(Base):__tablename__ = 'product'id = Column(Integer, primary_key=True)type = Column(String(20))  # 单表继承标识__mapper_args__ = {'polymorphic_on': type}class Book(Product):__tablename__ = 'book'id = Column(Integer, ForeignKey('product.id'), primary_key=True)isbn = Column(String(13))__mapper_args__ = {'polymorphic_identity': 'book'}class Electronic(Product):__tablename__ = 'electronic'id = Column(Integer, ForeignKey('product.id'), primary_key=True)warranty = Column(Integer)__mapper_args__ = {'polymorphic_identity': 'electronic'}

3. 批量操作与核心SQL构造

高效处理大数据量操作的两种方式:

# ORM批量插入
session.bulk_insert_mappings(User, [{"name": "Alice", "age": 30},{"name": "Bob", "age": 25}
])# 核心SQL构造
from sqlalchemy import select, table, column
users = table('users', column('id'), column('name'), column('age')
)
stmt = users.insert().values([{'name': 'Charlie', 'age': 35},{'name': 'David', 'age': 40}
])
connection.execute(stmt)

4. 异步引擎与greenlet整合

使用asyncmy驱动实现MySQL异步操作:

from sqlalchemy.ext.asyncio import create_async_engine
import asyncioasync_engine = create_async_engine("mysql+asyncmy://user:pass@localhost/db",pool_size=5, max_overflow=10
)async def fetch_users():async with async_engine.connect() as conn:result = await conn.execute("SELECT * FROM users")return result.fetchall()asyncio.run(fetch_users())

实战案例

1. 电商系统的分表分库实现

使用ShardedSession实现用户表水平分片:

from sqlalchemy.ext.horizontal_shard import ShardedSessionshards = {'shard1': create_engine('sqlite:///./shard1.db'),'shard2': create_engine('sqlite:///./shard2.db')
}def shard_chooser(mapper, instance, clause=None):if instance:return "shard%d" % (instance.id % 2 + 1)else:return 'shard1'session = ShardedSession(shards=shards,shard_chooser=shard_chooser,id_chooser=lambda *args: list(shards.keys())
)# 自动路由到对应分片
user = User(id=101, name="Alice")
session.add(user)  # 自动路由到shard2

2. 版本控制与审计日志实现

通过事件监听实现变更追踪:

from sqlalchemy import inspectaudit_logs = []@event.listens_for(Session, 'before_flush')
def track_changes(session, context, instances):for obj in session.dirty:state = inspect(obj)for attr in state.attrs:hist = attr.load_history()if hist.has_changes():audit_logs.append({'object': obj,'attribute': attr.key,'old': hist.deleted,'new': hist.added})

扩展思考

1. 多租户架构的数据库隔离方案

三种常见实现方式:

# 方案1:schema隔离
class TenantSpecificQuery(Query):def get(self, ident):return super().filter_by(tenant_id=current_tenant.id).get(ident)# 方案2:行级隔离
class BaseModel(Base):__abstract__ = Truetenant_id = Column(Integer, nullable=False)# 方案3:动态schema切换
@event.listens_for(Pool, 'connect')
def connect(dbapi_con, record):dbapi_con.execute(f"SET search_path TO tenant_{current_tenant.id}")

2. SQL注入防御的ORM层实践

安全操作示例:

# 错误方式(存在注入风险)
query = User.query.filter(f"username = '{input_username}'")# 正确方式
query = User.query.filter(User.username == input_username)# 动态查询安全构造
from sqlalchemy import text
stmt = text("SELECT * FROM users WHERE username = :username")
session.execute(stmt, {"username": input_username})

完整代码示例

包含分表分库、审计日志、异步操作的完整电商系统示例:

# 安装依赖
# pip install sqlalchemy aiosqlitefrom sqlalchemy import Column, Integer, String, Float, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import asyncio# 定义模型
Base = declarative_base()class Product(Base):__tablename__ = 'products'id = Column(Integer, primary_key=True)name = Column(String(50))price = Column(Float)shard_id = Column(Integer, nullable=False)# 创建异步引擎
engines = {'shard1': create_async_engine('sqlite+aiosqlite:///./shard1.db', echo=True),'shard2': create_async_engine('sqlite+aiosqlite:///./shard2.db', echo=True)
}# 创建异步会话工厂
async_sessions = {shard_id: sessionmaker(bind=engine,class_=AsyncSession,expire_on_commit=False)for shard_id, engine in engines.items()
}# 根据 shard_id 选择分片
def get_shard_id(shard_id):return f'shard{shard_id % 2 + 1}'# 异步操作示例
async def main():# 创建表for engine in engines.values():async with engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)# 插入数据product = Product(name="Laptop", price=999.99, shard_id=101)shard_id = get_shard_id(product.shard_id)async with async_sessions[shard_id]() as session:session.add(product)await session.commit()print(f"产品已添加到 {shard_id}")# 插入数据product = Product(name="手机", price=10000, shard_id=102)shard_id = get_shard_id(product.shard_id)async with async_sessions[shard_id]() as session:session.add(product)await session.commit()print(f"产品已添加到 {shard_id}")# 查询所有分片的数据all_products = []for shard_id, session_factory in async_sessions.items():async with session_factory() as session:result = await session.execute(select(Product))products = result.scalars().all()all_products.extend(products)print(f"从 {shard_id} 查询到 {len(products)} 个产品")# 显示所有产品print("\n所有产品:")for product in all_products:print(f"产品: {product.name}, 价格: {product.price}, 分片ID: {product.shard_id}")# 运行异步主函数
if __name__ == "__main__":asyncio.run(main())

输出:

d:\python_projects\jupyter_demo\sqlalchemy_demo.py:11: MovedIn20Warning: The ``declarative_base()`` function is now available as sqlalchemy.orm.declarative_base(). (deprecated since: 2.0) (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)Base = declarative_base()
2025-03-09 19:50:49,625 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,626 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("products")    
2025-03-09 19:50:49,626 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,627 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("products")    
2025-03-09 19:50:49,627 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,628 INFO sqlalchemy.engine.Engine
CREATE TABLE products (id INTEGER NOT NULL,name VARCHAR(50),price FLOAT,shard_id INTEGER NOT NULL,PRIMARY KEY (id)
)
2025-03-09 19:50:49,629 INFO sqlalchemy.engine.Engine [no key 0.00072s] ()
2025-03-09 19:50:49,636 INFO sqlalchemy.engine.Engine COMMIT
2025-03-09 19:50:49,638 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,639 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("products")    
2025-03-09 19:50:49,639 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,640 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("products")
2025-03-09 19:50:49,640 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,641 INFO sqlalchemy.engine.Engine
CREATE TABLE products (id INTEGER NOT NULL,name VARCHAR(50),price FLOAT,shard_id INTEGER NOT NULL,PRIMARY KEY (id)
)
2025-03-09 19:50:49,642 INFO sqlalchemy.engine.Engine [no key 0.00067s] ()
2025-03-09 19:50:49,647 INFO sqlalchemy.engine.Engine COMMIT
2025-03-09 19:50:49,649 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,650 INFO sqlalchemy.engine.Engine INSERT INTO products (name, price, shard_id) VALUES (?, ?, ?)
2025-03-09 19:50:49,650 INFO sqlalchemy.engine.Engine [generated in 0.00035s] ('Laptop', 999.99, 101)
2025-03-09 19:50:49,652 INFO sqlalchemy.engine.Engine COMMIT
产品已添加到 shard2
2025-03-09 19:50:49,657 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,658 INFO sqlalchemy.engine.Engine INSERT INTO products (name, price, shard_id) VALUES (?, ?, ?)
2025-03-09 19:50:49,659 INFO sqlalchemy.engine.Engine [generated in 0.00077s] ('手机', 10000.0, 102)
2025-03-09 19:50:49,661 INFO sqlalchemy.engine.Engine COMMIT
产品已添加到 shard1
2025-03-09 19:50:49,665 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,666 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
2025-03-09 19:50:49,665 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,666 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
2025-03-09 19:50:49,666 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
ducts.price, products.shard_id
FROM products
2025-03-09 19:50:49,667 INFO sqlalchemy.engine.Engine [generated in 0.00054s] ()
从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
FROM products
2025-03-09 19:50:49,667 INFO sqlalchemy.engine.Engine [generated in 0.00054s] ()
从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,667 INFO sqlalchemy.engine.Engine [generated in 0.00054s] ()
从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, pro从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
2025-03-09 19:50:49,670 INFO sqlalchemy.engine.Engine [generated in 0.00029s] ()
从 shard2 查询到 1 个产品
ducts.price, products.shard_id
FROM products
2025-03-09 19:50:49,670 INFO sqlalchemy.engine.Engine [generated in 0.00029s] ()
从 shard2 查询到 1 个产品
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
FROM products
2025-03-09 19:50:49,670 INFO sqlalchemy.engine.Engine [generated in 0.00029s] ()
从 shard2 查询到 1 个产品
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
从 shard2 查询到 1 个产品
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
所有产品:
产品: 手机, 价格: 10000.0, 分片ID: 102
所有产品:
产品: 手机, 价格: 10000.0, 分片ID: 102
产品: Laptop, 价格: 999.99, 分片ID: 101
产品: 手机, 价格: 10000.0, 分片ID: 102
产品: Laptop, 价格: 999.99, 分片ID: 101
产品: Laptop, 价格: 999.99, 分片ID: 101

通过本章学习,您应该能够:

  1. 熟练使用SQLAlchemy事件系统管理会话生命周期
  2. 实现复杂的继承映射策略
  3. 处理大规模数据操作的性能优化
  4. 构建高并发的异步数据库应用
  5. 设计企业级的数据库架构方案

http://www.ppmy.cn/news/1578845.html

相关文章

Linux软件包管理与Vim编辑器指南

一、Linux软件包管理 1. 什么是软件包? 在Linux下安装软件,一种常见的方法是下载程序的源代码,并进行编译,得到可执行程序。然而,这种方法既耗时又繁琐。为了方便,人们将一些常用的软件提前编译好&#x…

仅仅使用pytorch来手撕transformer架构(3):编码器模块和编码器类的实现和向前传播

仅仅使用pytorch来手撕transformer架构(2):编码器模块和编码器类的实现和向前传播 往期文章: 仅仅使用pytorch来手撕transformer架构(1):位置编码的类的实现和向前传播 最适合小白入门的Transformer介绍 仅仅使用pytorch来手撕transformer…

wordpress禁止用户在不同地点同时登录

wordpress禁止用户在不同地点同时登录,管理员除外。 function pcl_user_has_concurrent_sessions() {return (is_user_logged_in() && count(wp_get_all_sessions()) > 2); }add_action("init", function () {// 除了管理员,其他人…

C语言_数据结构总结7:顺序队列(循环队列)

纯C语言实现,不涉及C 队列 简称队,也是一种操作受限的线性表。只允许表的一端进行插入,表的另一端进行删除 特性:先进先出 针对顺序队列存在的“假溢出”问题,引出的循环队列概念。 循环队列 将顺序队列臆造为一个…

【git】 贮藏 stash

贮藏是我在sourcetree上看到的名词。之前只是浅浅的用来收藏一下修改的文件,没有完整的使用过。今天有幸使用了一次就来展开说说。 使用原因就不赘述了,错误的操作少提为好,操作步骤如下: 查看贮藏列表git stash list #输出&…

机器学习(吴恩达)

一, 机器学习 机器学习定义: 计算机能够在没有明确的编程情况下学习 特征: 特征是描述样本的属性或变量,是模型用来学习和预测的基础。如: 房屋面积, 地理位置 标签: 监督学习中需要预测的目标变量,是模型的输出目标。如: 房屋价格 样本: 如: {面积100㎡…

OpenCV开发笔记(八十三):图像remap实现哈哈镜效果

若该文为原创文章,转载请注明原文出处 本文章博客地址:https://hpzwl.blog.csdn.net/article/details/146213444 长沙红胖子Qt(长沙创微智科)博文大全:开发技术集合(包含Qt实用技术、树莓派、三维、OpenCV…

Windows CMD 命令大全(综合开发整理版)

CMD Windows CMD 命令大全(综合整理版)基础操作与文件管理类系统维护与配置类网络与连接类开发者常用命令CMD 黑窗口使用技巧1. **效率操作**2. **高级功能**3. **开发者高效技巧**注意事项**微软官方文档****其他实用资源****如何高效使用官方文档**Windows CMD 命令大全(综…