python中单例模式应用

server/2025/3/3 2:59:55/

数据库连接池单例模式
 

1. 为什么使用单例模式

创建数据库连接是一个昂贵的过程(涉及网络通信、认证等)。单例模式的连接池可以在程序启动时初始化一组连接,并在整个生命周期中重用这些连接,而不是每次请求都新建连接。同时还可以控制连接数量,防止资源耗尽。


2.代码

下面这段代码实现了一个数据库连接池,并且通过单例模式确保整个程序中只有一个连接池实例,包含扩容机制。避免频繁创建和销毁数据库连接,通过单例复用连接池:

通过单例模式确保全局只有一个连接池实例。
使用线程锁和条件变量实现线程安全,处理多线程环境下的并发访问。
提供连接的创建、获取、归还和关闭功能。
通过多线程测试验证连接池的并发性能。

import pymysql
import threading
from pymysql import Errorclass DatabasePool:_instance = Nonedef __new__(cls):if not cls._instance:cls._instance = super().__new__(cls)# 数据库配置cls._instance.config = {'host': 'localhost','user': 'root','password': '111111','database': 'test','charset': 'utf8mb4','cursorclass': pymysql.cursors.DictCursor}# 使用可重入锁和条件变量cls._instance.lock = threading.RLock()cls._instance.condition = threading.Condition(cls._instance.lock)cls._instance.max_connections = 10cls._instance.connections = []  # 未使用的连接cls._instance.in_use = set()   # 正在使用的连接cls._instance.init_pool()return cls._instance    # 返回唯一的连接池实例def init_pool(self):"""初始化连接池(线程安全)"""with self.lock:for _ in range(self.max_connections):self.add_connection()def create_connection(self):"""创建单个数据库连接(无需加锁)"""try:conn = pymysql.connect(**self.config)print(f"成功创建连接:{conn._sock.getsockname()}")return connexcept Error as e:print(f"连接创建失败: {e}")return Nonedef add_connection(self):"""向连接池添加连接(线程安全)"""if len(self.connections) + len(self.in_use) < self.max_connections:conn = self.create_connection()if conn:self.connections.append(conn)def get_connection(self):"""获取连接(线程安全)"""with self.condition:while not self.connections:if len(self.in_use) < self.max_connections:self.add_connection()else:print("连接池已满,等待连接归还...")self.condition.wait()  # 等待连接归还conn = self.connections.pop()self.in_use.add(conn)print(f"获取连接:{conn._sock.getsockname()}")return conndef release_connection(self, conn):"""归还连接(线程安全)"""with self.condition:if conn in self.in_use:self.in_use.remove(conn)if conn.open:self.connections.append(conn)print(f"归还连接:{conn._sock.getsockname()}")self.condition.notify()  # 通知等待的线程else:print("警告:连接已关闭,直接丢弃")def close_pool(self):"""关闭所有连接(线程安全)"""with self.lock:for conn in self.connections + list(self.in_use):if conn.open:conn.close()self.connections.clear()self.in_use.clear()print("所有数据库连接已关闭")# 多线程测试示例
if __name__ == "__main__":import concurrent.futuresdef worker(thread_id):pool = DatabasePool()conn = pool.get_connection()try:with conn.cursor() as cursor:cursor.execute("SELECT SLEEP(1)")  # 模拟耗时操作print(f"线程 {thread_id} 执行查询")finally:pool.release_connection(conn)# 创建连接池pool = DatabasePool()# 使用15个线程并发测试,超过最大连接数with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:futures = [executor.submit(worker, i) for i in range(15)]for future in concurrent.futures.as_completed(futures):future.result()pool.close_pool()

3. 核心:__new__ 方法

class DatabasePool:_instance = None  # 类变量,用于保存唯一的实例def __new__(cls):if not cls._instance:cls._instance = super().__new__(cls)# 数据库配置cls._instance.config = {'host': 'localhost','user': 'root','password': '111111','database': 'test','charset': 'utf8mb4','cursorclass': pymysql.cursors.DictCursor}# 使用可重入锁和条件变量cls._instance.lock = threading.RLock()cls._instance.condition = threading.Condition(cls._instance.lock)cls._instance.max_connections = 10cls._instance.connections = []  # 未使用的连接cls._instance.in_use = set()   # 正在使用的连接cls._instance.init_pool()return cls._instance    # 返回唯一的连接池实例
  • 作用:确保无论创建多少次 DatabasePool(),都只会生成同一个实例
  • 示例
    pool1 = DatabasePool()  # 第一次创建,初始化连接池
    pool2 = DatabasePool()  # 直接返回 pool1 的实例
    print(pool1 is pool2)   # 输出 True

3. 总结

我们可以把连接池想象成一个共享铅笔盒(全班共享)

  • :管理员(锁)确保一次只有一个人能拿铅笔。
  • 连接:铅笔盒里的铅笔(初始10支)。
  • 动态扩容:当铅笔用完时,管理员临时制作新铅笔。
  • 归还机制:用完后必须归还,否则其他人无法使用。

http://www.ppmy.cn/server/171961.html

相关文章

uniapp中页面跳转及encodeURIComponent转码的使用详解

文章目录 一、uniapp页面跳转方法汇总1. uni.navigateTo2. uni.redirectTo3. uni.reLaunch4. uni.switchTab5. uni.navigateBack 二、encodeURIComponent转码的使用场景1. 参数中有特殊字符时2. 参数值可能变化时 一、uniapp页面跳转方法汇总 1. uni.navigateTo 保留当前页面…

DeepSeek R1 + 飞书机器人实现AI智能助手

效果 TFChat项目地址 https://github.com/fish2018/TFChat 腾讯大模型知识引擎用的是DeepSeek R1&#xff0c;项目为sanic和redis实现&#xff0c;利用httpx异步处理流式响应&#xff0c;同时使用buffer来避免频繁调用飞书接口更新卡片的网络耗时。为了进一步减少网络IO消耗&…

Python正则

1.正则表达式 1.1含义&#xff1a;记录文本规则的代码&#xff0c;字符串处理工具 注意&#xff1a;需要导入re模块 1.2特点&#xff1a; 1.语法比较负杂&#xff0c;可读性较差 2.通用性很强&#xff0c;适用于多种编程语言 1.3步骤&#xff1a; 1.导入re模块 import…

docker创建nginx

docker run -d -p 8080:80 --name my-nginx-container nginx docker&#xff1a;命令 run&#xff1a;命令 -d&#xff1a;在后台运行容器 -p&#xff1a;8080:80&#xff1a;将容器内部的80端口映射到宿主机的8080端口。 --name my-nginx-container&#xff1a;为容器指定一个…

Spring MVC 的执行流程

Spring MVC 是一个基于 Java 的请求驱动型 Web 框架&#xff0c;其核心设计围绕 前端控制器模式&#xff0c;通过 DispatcherServlet 协调各个组件处理 HTTP 请求。以下是其完整的执行流程&#xff0c;共分为 8 个核心步骤&#xff1a; 1. HTTP请求到达DispatcherServlet 入口…

Kafka 赋能高效消息队列管理:从原理到实战

Kafka 赋能高效消息队列管理:从原理到实战 作者:Echo_Wish 引言:为什么选择 Kafka? 在现代分布式系统中,消息队列已经成为微服务架构、实时数据处理、日志采集等场景的基石。而 Apache Kafka 以其高吞吐、低延迟、分布式存储的特性,成为众多大厂的首选。 那么,Kafka 究…

ES scroll=1m:表示快照的有效时间为1分钟。怎么理解

在Elasticsearch中&#xff0c;scroll1m 表示你创建的 scroll 上下文 的有效时间为 1分钟。这个参数控制了你可以在多长时间内继续使用这个 scroll_id 来获取更多的数据。 什么是 Scroll 上下文&#xff1f; 当你使用 scroll API 时&#xff0c;Elasticsearch 会为你的查询创…

【Python爬虫(84)】当强化学习邂逅Python爬虫:解锁高效抓取新姿势

【Python爬虫】专栏简介:本专栏是 Python 爬虫领域的集大成之作,共 100 章节。从 Python 基础语法、爬虫入门知识讲起,深入探讨反爬虫、多线程、分布式等进阶技术。以大量实例为支撑,覆盖网页、图片、音频等各类数据爬取,还涉及数据处理与分析。无论是新手小白还是进阶开发…