目录
- 1 为什么需要链接池
- 2 实现原理
- 3 代码
- 4 编译
- 5 参考
1 为什么需要链接池
- 可以复用已经建立好的链接,节约数据库建立链接的时间。原理上,和线程池类似。
- 我们项目中的一个实际需求,同时可能有多个线程同时访问数据库。这样每个线程都需要和数据库建立链接才能保证线程安全。有了链接池,即可以服用已有的链接,有保证的线程安全。
2 实现原理
db_connection.h和 db_connection.cpp是对数据库链接的封装
db_connection_pool.h和db_connection_pool.cpp是链接池的实现。在初始化的时候,会提前构造一些链接放到容器中,当池子空了之后,采用生产者-消费者模式构造链接。如果构造的链接个数到达指定上限,从链接池里获取链接的时候,就会失败。另外,我们也会有一个单独的线程,删除长时间不用的链接,让链接池的大小保持在初始大小。
3 代码
// db_connection.h#pragma once#include <string>
#include <chrono>
#include "mysql/mysql.h"class DBConnection {
public:DBConnection();~DBConnection();bool connect(std::string host, int port, std::string user, std::string password, std::string dbname);bool update(std::string sql);MYSQL_RES* query(std::string sql);// 如果这个链接被使用了,就刷新一下时间。因为链接池会把好久不用的多于链接删除void refreshAliveTime() { m_alivetime = std::chrono::system_clock::now(); }int getAliveeTime() const { return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - m_alivetime).count(); }private:MYSQL* m_conn;std::chrono::system_clock::time_point m_alivetime;
};
// db_connection.cpp#include "db_connection.h"DBConnection::DBConnection() {m_conn = mysql_init(nullptr);
}DBConnection::~DBConnection() {if (m_conn != nullptr)mysql_close(m_conn);
}bool DBConnection::connect(std::string host, int port, std::string user, std::string password, std::string dbname) {if (mysql_real_connect(m_conn, host.c_str(), user.c_str(), password.c_str(), dbname.c_str(), port, nullptr, 0) == nullptr) {return false;}// mysql_query(m_conn, "set names utf8mb4");mysql_set_character_set(m_conn, "utf8");return true;
}bool DBConnection::update(std::string sql) {if (mysql_query(m_conn, sql.c_str())) {return false;}return true;
}MYSQL_RES* DBConnection::query(std::string sql) {if (mysql_query(m_conn, sql.c_str())) {return nullptr;}return mysql_use_result(m_conn);
}
// db_connection_pool.h#pragma once#include "db_connection.h"#include <queue>
#include <mutex>
#include <thread>
#include <atomic>
#include <functional>
#include <condition_variable>class DBConnectionPool {
public:static DBConnectionPool* getInstance();bool init(const std::string& config);std::shared_ptr<DBConnection> getConnection();std::string getTable() const { return m_table; }private:DBConnectionPool() = default;bool loadConfigFile(const std::string& config);void produceConnectionTask();void scannerConnectionTask();bool createTable();private:std::string m_host; // 数据库ipint m_port; // 数据端口号std::string m_user; // 用户名std::string m_password; // 密码std::string m_database; // 数据库名std::string m_table; // 表名int m_initSize; // 初始链接数int m_maxSize; // 最大链接数int m_maxIdleTime; // 单位: sint m_connectionTimeout; // 单位: sstd::queue<DBConnection*> m_connectionQueue; // 存放链接的容器std::mutex m_connectionQueueMutex; // 保证容器操作线程安全std::atomic_int m_connectionCnt; // 容器大小std::condition_variable m_cv; // 用作线程通讯的信号
};
// db_connection_pool.cpp#include "db_connection_pool.h"#include <thread>#include "rapidjson/document.h" // 用来解析json
#include "rapidjson/filereadstream.h"
#include "rapidjson/error/en.h"namespace handler_process {DBConnectionPool* DBConnectionPool::getInstance() {static DBConnectionPool pool;return &pool;
}bool DBConnectionPool::init(const std::string& config) {if (!loadConfigFile(config)) {ASYNC_LOG_ERROR("[DBConnectionPool::init] loadConfigFile failure");return false;}ASYNC_LOG_INFO("[DBConnectionPool::init] init success, create db connection pool, size: {}", m_initSize);m_connectionCnt = 0;for (int i = 0; i < m_initSize; ++i) {DBConnection* conn = new DBConnection();if (!conn->connect(m_host, m_port, m_user, m_password, m_database)) {ASYNC_LOG_ERROR("[DBConnectionPool::init] connect failure, host: {}, port: {}, user: {}, dbname: {}", m_host, m_port, m_user, m_database);delete conn;continue;}conn->refreshAliveTime();m_connectionQueue.push(conn);m_connectionCnt++;}if (m_connectionCnt.load() == 0) {ASYNC_LOG_ERROR("[DBConnectionPool::init] init failure, create db connection pool failure");return false;}ASYNC_LOG_INFO("[DBConnectionPool::init] init success, create db connection pool succ, size: {}", m_connectionCnt.load());// 如果池子空了,这个线程负责构造新的链接std::thread produce(std::bind(&DBConnectionPool::produceConnectionTask, this));produce.detach();// 这个线程负责删除多于的链接std::thread scanner(std::bind(&DBConnectionPool::scannerConnectionTask, this));scanner.detach();if (!createTable()) {ASYNC_LOG_ERROR("[DBConnectionPool::init] create table failure");return false;}return true;
}bool DBConnectionPool::loadConfigFile(const std::string& config) {ASYNC_LOG_INFO("[DBConnectionPool::loadConfigFile] config_file: {}", config);FILE* fp = fopen(config.c_str(), "r");if (!fp) {ASYNC_LOG_ERROR("[DBConnectionPool::loadConfigFile] unable to open file: {}", config);return false;}// 创建文件读取流char readBuffer[65536];rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));// 解析JSON数据rapidjson::Document document;if (document.ParseStream(is).HasParseError()) {ASYNC_LOG_ERROR("[DBConnectionPool::loadConfigFile] parse error");fclose(fp);return false;}fclose(fp);m_table = std::string(document["table"].GetString());m_host = std::string(document["host"].GetString());m_user = std::string(document["user"].GetString());m_password = std::string(document["password"].GetString());m_database = std::string(document["database"].GetString());m_port = document["port"].GetInt();m_initSize = document["connection_pool_init_size"].GetInt();m_maxSize = document["connection_pool_max_size"].GetInt();m_maxIdleTime = document["connection_pool_max_idle_time"].GetInt();m_connectionTimeout = document["connection_pool_max_wait_time"].GetInt();ASYNC_LOG_INFO("[DBConnectionPool::loadConfigFile] m_manualInterventionInfoTableName: {}, m_dynamicDBInfoTableName: {}, " \"m_host: {}, m_user: {}, m_database: {}, m_port: {}, m_initSize: {}, m_maxSize: {}, m_maxIdleTime: {}, m_connectionTimeout: {}",m_manualInterventionInfoTableName, m_dynamicDBInfoTableName,m_host, m_user, m_database, m_port, m_initSize, m_maxSize, m_maxIdleTime, m_connectionTimeout);return true;
}bool DBConnectionPool::createTable() {std::shared_ptr<DBConnection> dbConnection = getConnection();if (!dbConnection) {ASYNC_LOG_ERROR("[DBConnectionPool::createTable] get connection failure");return false;}std::string create_table_query_1 = "CREATE TABLE IF NOT EXISTS " + m_table + "(id INT AUTO_INCREMENT PRIMARY KEY)";return dbConnection->update(create_table_query_1);
}void DBConnectionPool::produceConnectionTask() {while (true) {std::unique_lock<std::mutex> lock(m_connectionQueueMutex);while (!m_connectionQueue.empty()) {m_cv.wait(lock);}if (m_connectionCnt.load() < m_maxSize) {DBConnection* conn = new DBConnection();if (!conn->connect(m_host, m_port, m_user, m_password, m_database)) {ASYNC_LOG_ERROR("[DBConnectionPool::produceConnectionTask] connect failure, host: {}, port: {}, user: {}, dbname: {}", m_host, m_port, m_user, m_database);delete conn;std::this_thread::sleep_for(std::chrono::seconds(3));continue;}conn->refreshAliveTime();m_connectionQueue.push(conn);m_connectionCnt++;ASYNC_LOG_INFO("[DBConnectionPool::produceConnectionTask] produce connection succ, m_connectionCnt: {}", m_connectionCnt.load());}m_cv.notify_all();}
}void DBConnectionPool::scannerConnectionTask() {while (true) {ASYNC_LOG_INFO("[DBConnectionPool::scannerConnectionTask] scanner connection, m_connectionCnt: {}, m_maxIdleTime: {}s", m_connectionCnt.load(), m_maxIdleTime);std::this_thread::sleep_for(std::chrono::seconds(m_maxIdleTime));std::unique_lock<std::mutex> lock(m_connectionQueueMutex);while (m_connectionCnt.load() > m_initSize) {DBConnection* conn = m_connectionQueue.front();if (conn->getAliveeTime() >= m_maxIdleTime) {m_connectionQueue.pop();m_connectionCnt--;delete conn;} else {break; // 队头的连接没有超过m_maxIdleTime,其它连接肯定没有}}}
}std::shared_ptr<DBConnection> DBConnectionPool::getConnection() {std::unique_lock<std::mutex> lock(m_connectionQueueMutex);while (m_connectionQueue.empty()) {if (m_cv.wait_for(lock, std::chrono::seconds(m_connectionTimeout)) == std::cv_status::timeout) {if (m_connectionQueue.empty()) {ASYNC_LOG_ERROR("[DBConnectionPool::getConnection] get connection timeout: {}", m_connectionTimeout);return nullptr;}}}std::shared_ptr<DBConnection> conn(m_connectionQueue.front(), [&](DBConnection* pconn) {std::unique_lock<std::mutex> lock(m_connectionQueueMutex);pconn->refreshAliveTime();m_connectionQueue.push(pconn);m_connectionCnt++;});m_connectionQueue.pop();m_connectionCnt--;m_cv.notify_all();ASYNC_LOG_INFO("[DBConnectionPool::getConnection] get connection success, residue m_connectionCnt: {}", m_connectionCnt.load());return conn;
}
配置文件:
// db_conf.json
{"table": "test_table","host": "10.10.10.123","user": "root","password": "123456","database": "test_db","port": 3306,"connection_pool_init_size": 5,"connection_pool_max_size" : 50,"connection_pool_max_idle_time": 600,"connection_pool_max_wait_time": 5
}
使用方式:
// main.cpp#include <iostream>
#include "db_connection_pool.h"int main(int argc, char** argv) {if (!handler_process::DBConnectionPool::getInstance()->init("db_conf.json")) {std::cerr << "init db connection pool failure" << std::endl;return -1;}std::shared_ptr<DBConnection> dbConnection = DBConnectionPool::getInstance()->getConnection();std::string querySql = "SELECT * FROM " + DBConnectionPool::getInstance()->getTable() + " WHERE id = 1"; MYSQL_RES* res = dbConnection->query(querySql);if (res == nullptr) {std::cerr << "query err" << std::endl;return -1;}MYSQL_ROW column;while((column = mysql_fetch_row(res)) != NULL) {try {int id = std::stoi(column[0]);} catch (const std::exception& e) {std::cerr << "exception: " << e.what() << std::endl;continue;}}mysql_free_result(res);return 0;
}
4 编译
如果是centos,使用yum安装mysql客户端
yum install -y mysql-devel mysql
CMakeLists.txt
...
link_directories("/usr/lib64/mysql")
set(MYSQL_LIBS mysqlclient)
add_executable(main main.cpp db_connection.cpp db_connection_pool.cpp)
target_link_libraries(main pthread ${MYSQL_LIBS})
5 参考
C++ mysql连接池 附带讲解与详细注释源码