【已上线】C++ mysql连接池

devtools/2024/10/18 3:33:29/

目录

  • 1 为什么需要链接池
  • 2 实现原理
  • 3 代码
  • 4 编译
  • 5 参考

1 为什么需要链接池

  1. 可以复用已经建立好的链接,节约数据库建立链接的时间。原理上,和线程池类似。
  2. 我们项目中的一个实际需求,同时可能有多个线程同时访问数据库。这样每个线程都需要和数据库建立链接才能保证线程安全。有了链接池,即可以服用已有的链接,有保证的线程安全。

2 实现原理

db_connection.h和 db_connection.cpp是对数据库链接的封装
db_connection_pool.h和db_connection_pool.cpp是链接池的实现。在初始化的时候,会提前构造一些链接放到容器中,当池子空了之后,采用生产者-消费者模式构造链接。如果构造的链接个数到达指定上限,从链接池里获取链接的时候,就会失败。另外,我们也会有一个单独的线程,删除长时间不用的链接,让链接池的大小保持在初始大小。

3 代码

// db_connection.h#pragma once#include <string>
#include <chrono>
#include "mysql/mysql.h"class DBConnection {
public:DBConnection();~DBConnection();bool connect(std::string host, int port, std::string user, std::string password, std::string dbname);bool update(std::string sql);MYSQL_RES* query(std::string sql);// 如果这个链接被使用了,就刷新一下时间。因为链接池会把好久不用的多于链接删除void refreshAliveTime() { m_alivetime = std::chrono::system_clock::now(); }int getAliveeTime() const { return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - m_alivetime).count(); }private:MYSQL* m_conn;std::chrono::system_clock::time_point m_alivetime;
};
// db_connection.cpp#include "db_connection.h"DBConnection::DBConnection() {m_conn = mysql_init(nullptr);
}DBConnection::~DBConnection() {if (m_conn != nullptr)mysql_close(m_conn);
}bool DBConnection::connect(std::string host, int port, std::string user, std::string password, std::string dbname) {if (mysql_real_connect(m_conn, host.c_str(), user.c_str(), password.c_str(), dbname.c_str(), port, nullptr, 0) == nullptr) {return false;}// mysql_query(m_conn, "set names utf8mb4");mysql_set_character_set(m_conn, "utf8");return true;
}bool DBConnection::update(std::string sql) {if (mysql_query(m_conn, sql.c_str())) {return false;}return true;
}MYSQL_RES* DBConnection::query(std::string sql) {if (mysql_query(m_conn, sql.c_str())) {return nullptr;}return mysql_use_result(m_conn);
}
// db_connection_pool.h#pragma once#include "db_connection.h"#include <queue>
#include <mutex>
#include <thread>
#include <atomic>
#include <functional>
#include <condition_variable>class DBConnectionPool {
public:static DBConnectionPool* getInstance();bool init(const std::string& config);std::shared_ptr<DBConnection> getConnection();std::string getTable() const { return m_table; }private:DBConnectionPool() = default;bool loadConfigFile(const std::string& config);void produceConnectionTask();void scannerConnectionTask();bool createTable();private:std::string m_host;         // 数据库ipint m_port;                 // 数据端口号std::string m_user;         // 用户名std::string m_password;     // 密码std::string m_database;     // 数据库名std::string m_table;        // 表名int m_initSize;              // 初始链接数int m_maxSize;               // 最大链接数int m_maxIdleTime;           // 单位: sint m_connectionTimeout;     // 单位: sstd::queue<DBConnection*> m_connectionQueue;    // 存放链接的容器std::mutex m_connectionQueueMutex;              // 保证容器操作线程安全std::atomic_int m_connectionCnt;                // 容器大小std::condition_variable m_cv;                   // 用作线程通讯的信号
};
// db_connection_pool.cpp#include "db_connection_pool.h"#include <thread>#include "rapidjson/document.h"             // 用来解析json
#include "rapidjson/filereadstream.h"
#include "rapidjson/error/en.h"namespace handler_process {DBConnectionPool* DBConnectionPool::getInstance() {static DBConnectionPool pool;return &pool;
}bool DBConnectionPool::init(const std::string& config) {if (!loadConfigFile(config)) {ASYNC_LOG_ERROR("[DBConnectionPool::init] loadConfigFile failure");return false;}ASYNC_LOG_INFO("[DBConnectionPool::init] init success, create db connection pool, size: {}", m_initSize);m_connectionCnt = 0;for (int i = 0; i < m_initSize; ++i) {DBConnection* conn = new DBConnection();if (!conn->connect(m_host, m_port, m_user, m_password, m_database)) {ASYNC_LOG_ERROR("[DBConnectionPool::init] connect failure, host: {}, port: {}, user: {}, dbname: {}", m_host, m_port, m_user, m_database);delete conn;continue;}conn->refreshAliveTime();m_connectionQueue.push(conn);m_connectionCnt++;}if (m_connectionCnt.load() == 0) {ASYNC_LOG_ERROR("[DBConnectionPool::init] init failure, create db connection pool failure");return false;}ASYNC_LOG_INFO("[DBConnectionPool::init] init success, create db connection pool succ, size: {}", m_connectionCnt.load());// 如果池子空了,这个线程负责构造新的链接std::thread produce(std::bind(&DBConnectionPool::produceConnectionTask, this));produce.detach();// 这个线程负责删除多于的链接std::thread scanner(std::bind(&DBConnectionPool::scannerConnectionTask, this));scanner.detach();if (!createTable()) {ASYNC_LOG_ERROR("[DBConnectionPool::init] create table failure");return false;}return true;
}bool DBConnectionPool::loadConfigFile(const std::string& config) {ASYNC_LOG_INFO("[DBConnectionPool::loadConfigFile] config_file: {}", config);FILE* fp = fopen(config.c_str(), "r");if (!fp) {ASYNC_LOG_ERROR("[DBConnectionPool::loadConfigFile] unable to open file: {}", config);return false;}// 创建文件读取流char readBuffer[65536];rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));// 解析JSON数据rapidjson::Document document;if (document.ParseStream(is).HasParseError()) {ASYNC_LOG_ERROR("[DBConnectionPool::loadConfigFile] parse error");fclose(fp);return false;}fclose(fp);m_table = std::string(document["table"].GetString());m_host = std::string(document["host"].GetString());m_user = std::string(document["user"].GetString());m_password = std::string(document["password"].GetString());m_database = std::string(document["database"].GetString());m_port = document["port"].GetInt();m_initSize = document["connection_pool_init_size"].GetInt();m_maxSize = document["connection_pool_max_size"].GetInt();m_maxIdleTime = document["connection_pool_max_idle_time"].GetInt();m_connectionTimeout = document["connection_pool_max_wait_time"].GetInt();ASYNC_LOG_INFO("[DBConnectionPool::loadConfigFile] m_manualInterventionInfoTableName: {}, m_dynamicDBInfoTableName: {}, " \"m_host: {}, m_user: {}, m_database: {}, m_port: {}, m_initSize: {}, m_maxSize: {}, m_maxIdleTime: {}, m_connectionTimeout: {}",m_manualInterventionInfoTableName, m_dynamicDBInfoTableName,m_host, m_user, m_database, m_port, m_initSize, m_maxSize, m_maxIdleTime, m_connectionTimeout);return true;
}bool DBConnectionPool::createTable() {std::shared_ptr<DBConnection> dbConnection = getConnection();if (!dbConnection) {ASYNC_LOG_ERROR("[DBConnectionPool::createTable] get connection failure");return false;}std::string create_table_query_1 = "CREATE TABLE IF NOT EXISTS " + m_table + "(id INT AUTO_INCREMENT PRIMARY KEY)";return dbConnection->update(create_table_query_1);
}void DBConnectionPool::produceConnectionTask() {while (true) {std::unique_lock<std::mutex> lock(m_connectionQueueMutex);while (!m_connectionQueue.empty()) {m_cv.wait(lock);}if (m_connectionCnt.load() < m_maxSize) {DBConnection* conn = new DBConnection();if (!conn->connect(m_host, m_port, m_user, m_password, m_database)) {ASYNC_LOG_ERROR("[DBConnectionPool::produceConnectionTask] connect failure, host: {}, port: {}, user: {}, dbname: {}", m_host, m_port, m_user, m_database);delete conn;std::this_thread::sleep_for(std::chrono::seconds(3));continue;}conn->refreshAliveTime();m_connectionQueue.push(conn);m_connectionCnt++;ASYNC_LOG_INFO("[DBConnectionPool::produceConnectionTask] produce connection succ, m_connectionCnt: {}", m_connectionCnt.load());}m_cv.notify_all();}
}void DBConnectionPool::scannerConnectionTask() {while (true) {ASYNC_LOG_INFO("[DBConnectionPool::scannerConnectionTask] scanner connection, m_connectionCnt: {}, m_maxIdleTime: {}s", m_connectionCnt.load(), m_maxIdleTime);std::this_thread::sleep_for(std::chrono::seconds(m_maxIdleTime));std::unique_lock<std::mutex> lock(m_connectionQueueMutex);while (m_connectionCnt.load() > m_initSize) {DBConnection* conn = m_connectionQueue.front();if (conn->getAliveeTime() >= m_maxIdleTime) {m_connectionQueue.pop();m_connectionCnt--;delete conn;} else {break;  // 队头的连接没有超过m_maxIdleTime,其它连接肯定没有}}}
}std::shared_ptr<DBConnection> DBConnectionPool::getConnection() {std::unique_lock<std::mutex> lock(m_connectionQueueMutex);while (m_connectionQueue.empty()) {if (m_cv.wait_for(lock, std::chrono::seconds(m_connectionTimeout)) == std::cv_status::timeout) {if (m_connectionQueue.empty()) {ASYNC_LOG_ERROR("[DBConnectionPool::getConnection] get connection timeout: {}", m_connectionTimeout);return nullptr;}}}std::shared_ptr<DBConnection> conn(m_connectionQueue.front(), [&](DBConnection* pconn) {std::unique_lock<std::mutex> lock(m_connectionQueueMutex);pconn->refreshAliveTime();m_connectionQueue.push(pconn);m_connectionCnt++;});m_connectionQueue.pop();m_connectionCnt--;m_cv.notify_all();ASYNC_LOG_INFO("[DBConnectionPool::getConnection] get connection success, residue m_connectionCnt: {}", m_connectionCnt.load());return conn;
}

配置文件:

// db_conf.json
{"table": "test_table","host": "10.10.10.123","user": "root","password": "123456","database": "test_db","port": 3306,"connection_pool_init_size": 5,"connection_pool_max_size" : 50,"connection_pool_max_idle_time": 600,"connection_pool_max_wait_time": 5
}

使用方式:

// main.cpp#include <iostream>
#include "db_connection_pool.h"int main(int argc, char** argv) {if (!handler_process::DBConnectionPool::getInstance()->init("db_conf.json")) {std::cerr << "init db connection pool failure" << std::endl;return -1;}std::shared_ptr<DBConnection> dbConnection = DBConnectionPool::getInstance()->getConnection();std::string  querySql = "SELECT * FROM " + DBConnectionPool::getInstance()->getTable() + " WHERE id = 1"; MYSQL_RES* res = dbConnection->query(querySql);if (res == nullptr) {std::cerr << "query err" << std::endl;return -1;}MYSQL_ROW column;while((column = mysql_fetch_row(res)) != NULL) {try {int id = std::stoi(column[0]);} catch (const std::exception& e) {std::cerr << "exception: " << e.what() << std::endl;continue;}}mysql_free_result(res);return 0;
}

4 编译

如果是centos,使用yum安装mysql客户端

yum install -y mysql-devel mysql

CMakeLists.txt

...
link_directories("/usr/lib64/mysql")
set(MYSQL_LIBS mysqlclient)
add_executable(main main.cpp db_connection.cpp db_connection_pool.cpp)
target_link_libraries(main pthread ${MYSQL_LIBS})

5 参考

C++ mysql连接池 附带讲解与详细注释源码


http://www.ppmy.cn/devtools/99515.html

相关文章

js跳出循环方法

跳出一层循环&#xff0c;用return&#xff0c;break。continue&#xff0c;结束当前迭代 注意 foreach forEach() 和 map() 用不了 break/continue 原因 forEach 接受一个 回调函数(callback) 作为必要的参数 &#xff1b; 而 回调函数 又会接受以下三个参数&#xff1a;curre…

C++ 模板

模版收尾 模版的声明和定义不能分离&#xff0c;否则会报错. 写下面三个文件&#xff1a; Stack.h#pragma once #include<iostream> using namespace std; template <class T> T Add(const T& left, const T& right);Stack.cpp#include"Stack.h&qu…

React+Vis.js(07):vis.js设置关系网络图禁止缩放和设置关系的指向和名称

文章目录 设置关系图禁止缩放设置关系图的关系指向和关系名称设置关系图的关系指向:方式一设置关系图的关系指向:方式二设置关系显示的标签设置关系图禁止缩放 在 vis.js 中,你可以通过设置 interaction 配置项中的 zoomView 属性为 false 来禁止缩放功能。这样,用户将无法…

在Excel中“直接引用”字符串地址

indirect是Excel唯一可以拥有直接解析字符串引用地址参数能力的函数&#xff0c;是绝无仅有的宝贝疙瘩。 (笔记模板由python脚本于2024年08月21日 12:45:49创建&#xff0c;本篇笔记适合喜欢用Excel处理数据的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;ht…

大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…

Pytorch升级之旅——基础概念

目录 一、人工智能简史 三次浪潮 DL,ML,AI三者之间的关系 二、模型评价指标 混淆矩阵 Overall Accuracy ​编辑 Average accuracy Kappa系数 Recall Precision F1 PR曲线 置信度 IOU AP mAP 三、常用包Numpy、pandas、matplotlib Numpy pandas matplotlib…

校友林小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;树木管理管理&#xff0c;所属科管理&#xff0c;树木领取管理&#xff0c;树跟踪状态管理&#xff0c;用户信息统计管理&#xff0c;树木捐款管理&#xff0c;留言板管理 微信端…

springboot admin监控

服务端搭建 maven的依赖&#xff0c;包括服务端和客户端&#xff0c;以及注册到nacos上面 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XML…