msgqueue.hpp队列模块

news/2024/10/19 15:45:33/

目录

一.MsgQueue模块介绍

二.MsgQueue类的实现

成员变量

构造函数与析构函数

成员函数

参数设置函数 setArgs

参数获取函数 getArgs

三.MsgQueueMapper类的实现

成员变量

构造函数

成员函数

创建表格函数 createTable

删除表格函数 dropTable

插入数据函数 insert

删除数据函数 remove

数据恢复函数 recover

msgQueueMapCb

四.MsgQueueMapper类的实现

成员变量

构造函数

成员函数

声明队列函数 declareQueue

删除队列函数 removeQueue

查询队列函数 selectQueue

查询所有队列函数 selectAll

队列是否存在函数 exists

队列数量函数 size

清空所有队列函数 clear

一.MsgQueue模块介绍

二.MsgQueue类的实现

成员变量

MsgQueue 结构体用于描述一个消息队列的基本属性。

std::string _name; // 队列名称
bool _durable;     // 队列是否持久化
bool _exclusive;   // 队列是否独占
bool _auto_del;    // 队列是否自动删除
google::protobuf::Map<std::string, std::string> _args; // 队列的附加参数

构造函数与析构函数

结构体提供了一个带参数的构造函数用于初始化各成员变量。同时提供了一个默认构造函数以支持无参初始化。

MsgQueue(const std::string &name, bool durable, bool exclusive, bool auto_del, const google::protobuf::Map<std::string, std::string> &_args): _name(name), _durable(durable), _exclusive(exclusive), _auto_del(auto_del), _args(_args) {}

成员函数

MsgQueue 提供了两个主要的成员函数,用于设置和获取队列的附加参数。

参数设置函数 setArgs

setArgs 方法接收一个格式化的字符串参数,并将其解析为键值对,存储在 _args 中。
该方法使用了 StrHelper::split 函数来分割字符串,通过 = 符号区分键和值,并存储在 _args 中。

参数获取函数 getArgs

getArgs 方法用于将 _args 中的键值对组合成格式化字符串,返回给调用者。

std::string getArgs()
{std::string ret;for (auto &kv : _args){ret += kv.first + "=" + kv.second + "&";}return ret;
}

三.MsgQueueMapper类的实现

成员变量

SqliteHelper sql_helper; //数据库管理句柄
  • sql_helperSqliteHelper 是一个辅助类,封装了SQLite的基本操作,如执行SQL语句和管理数据库连接。

构造函数

MsgQueueMapper(const std::string &dbname) : sql_helper(dbname)
{std::string path = FileHelper::getParentDirName(dbname);FileHelper::createDir(path);if (!sql_helper.open()){ELOG("MsgQueueMapper open db failed:%s", dbname.c_str());assert(0);}createTable();
}

构造函数接受一个数据库名称参数,初始化 sql_helper,并在必要时创建数据库目录和表。

成员函数

创建表格函数 createTable
void createTable()
{std::stringstream sql;sql << "create table if not exists msg_queue(";sql << "name varchar(64) primary key,";sql << "durable int,";sql << "exclusive int,";sql << "auto_del int,";sql << "args varchar(64));";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper create table failed:%s", sql.str().c_str());assert(0);}
}
删除表格函数 dropTable

dropTable 方法用于删除现有的消息队列表格。该方法执行删除表格的SQL语句,如果表格存在,它将被移除。

void dropTable()
{std::stringstream sql;sql << "drop table if exists msg_queue;";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper drop table failed:%s", sql.str().c_str());assert(0);}
}
插入数据函数 insert

insert 方法将 MsgQueue 对象插入到数据库中。

void insert(MsgQueue::ptr &msg_queue_ptr)
{std::stringstream sql;sql << "insert into msg_queue(name, durable, exclusive, auto_del, args) values(";sql << "'" << msg_queue_ptr->_name << "',";sql << msg_queue_ptr->_durable << ",";sql << msg_queue_ptr->_exclusive << ",";sql << msg_queue_ptr->_auto_del << ",";sql << "'" << msg_queue_ptr->getArgs() << "');";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper insert msg_queue failed:%s", sql.str().c_str());assert(0);}
}
删除数据函数 remove

remove 方法用于从数据库中删除指定名称的消息队列。

void remove(const std::string &name)
{std::stringstream sql;sql << "delete from msg_queue where name = '" << name << "';";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper remove msg_queue failed:%s", sql.str().c_str());assert(0);}
}
数据恢复函数 recover

recover 方法用于从数据库中恢复所有消息队列。该方法执行SQL查询语句,遍历结果集,并将每个消息队列恢复到内存中的 _msgQueues 容器中。

   msgqueue_map recover(){msgqueue_map ret;std::string sql = "select * from msg_queue;";if (!sql_helper.exec(sql, MsgQueueMapper::msgQueueMapCb, &ret)){ELOG("MsgQueueMapper recover failed:%s", sql.c_str());assert(0);}return ret;}
msgQueueMapCb
  • 该静态回调函数在 recover 函数中被调用,用于将数据库查询结果中的每一行转化为一个 MsgQueue对象,并将其存储到 msgqueue_map中。
      static int msgQueueMapCb(void *arg, int col_count, char **col_values, char **col_names){msgqueue_map *ret = (msgqueue_map *)arg;MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>();msg_queue_ptr->_name = col_values[0];msg_queue_ptr->_durable = std::stoi(col_values[1]);msg_queue_ptr->_exclusive = std::stoi(col_values[2]);msg_queue_ptr->_auto_del = std::stoi(col_values[3]);if (col_values[4]){msg_queue_ptr->setArgs((std::string)col_values[4]);}elseELOG("没有其它参数");ret->insert(std::make_pair(msg_queue_ptr->_name, msg_queue_ptr));return 0;}

四.MsgQueueMapper类的实现

MsgQueueManager 类用于管理内存中的消息队列对象,并与 MsgQueueMapper 协作实现消息队列的持久化。

成员变量

  • _msgQueues:一个键值对映射,用于存储当前内存中的所有消息队列。
  • _mapperMsgQueueMapper 对象,用于与数据库进行持久化操作。
  • _mutex:用于保护消息队列操作的线程安全。

构造函数

构造函数在初始化时会调用 MsgQueueMapperrecover 方法,从数据库恢复所有消息队列。

MsgQueueManager(const std::string &dbname) : _mapper(dbname)
{_mapper.recover(_msgQueues);
}

成员函数

声明队列函数 declareQueue

declareQueue 方法用于声明一个新的消息队列,并将其加入到内存和数据库中。

该方法首先检查队列是否已经存在,若不存在,则创建并插入新的队列。如果队列需要持久化,则还会将其插入到数据库中。

bool declareQueue(const std::string &name, bool durable, bool exclusive, bool auto_del, const google::protobuf::Map<std::string, std::string> &args)
{std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it != _msgQueues.end()){ILOG("MsgQueueManager declareQueue:%s already exists", name.c_str())return true;}MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>(name, durable, exclusive, auto_del, args);_msgQueues.insert(std::make_pair(name, msg_queue_ptr));if (msg_queue_ptr->_durable)_mapper.insert(msg_queue_ptr);return true;
}
删除队列函数 removeQueue

removeQueue 方法用于删除指定名称的消息队列。

bool removeQueue(const std::string &name)
{std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ELOG("MsgQueueManager removeQueue:%s not exists", name.c_str());return false;}if (it->second->_durable)_mapper.remove(name);_msgQueues.erase(it);return true;
}
查询队列函数 selectQueue

selectQueue 方法根据队列名称查询并返回消息队列对象

该方法用于在内存中查找指定名称的队列,并返回指向该队列的智能指针。

MsgQueue::ptr selectQueue(const std::string &name)
{std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ELOG("MsgQueueManager selectQueue:%s not exists", name.c_str());return nullptr;}return it->second;
}
查询所有队列函数 selectAll
  msgqueue_map selectAll(){return _msgQueues;}
队列是否存在函数 exists

exists 方法用于判断指定名称的消息队列是否存在。


bool exists(const std::string &name) { std::lock_guard<std::mutex> lock(_mutex); return _msgQueues.find(name) != _msgQueues.end(); }

该方法返回一个布尔值,指示队列是否存在。

队列数量函数 size

size 方法用于返回当前存在的消息队列数量。

size_t size() { std::lock_guard<std::mutex> lock(_mutex); return _msgQueues.size(); }

该方法返回一个整数值,表示内存中队列的数量。

清空所有队列函数 clear

clear 方法用于清空所有当前存在的消息队列。


void clear() { std::lock_guard<std::mutex> lock(_mutex); _msgQueues.clear(); _mapper.dropTable(); _mapper.createTable(); }

该方法清空内存中的所有队列,并在数据库中删除和重建消息队列表格。

五.MsgQueue.hpp所有代码

#pragma once
#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"
#include "../common_mq/msg.pb.h"
#include <string>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <cassert>
#include <cstring>
#include <google/protobuf/map.h>namespace mq
{struct MsgQueue{std::string _name;bool _durable;   // 是否持久化bool _exclusive; // 是否独占bool _auto_del;  // 是否自动删除google::protobuf::Map<std::string, std::string> _args;using ptr = std::shared_ptr<MsgQueue>;MsgQueue(const std::string &name, bool durable, bool exclusive,bool auto_del, const google::protobuf::Map<std::string, std::string> &_args): _name(name),_durable(durable),_exclusive(exclusive),_auto_del(auto_del),_args(_args){}MsgQueue() {}void setArgs(const std::string &str_args){std::vector<std::string> args;size_t sz = StrHelper::split(str_args, "&", args);for (auto &kv : args){size_t pos = kv.find("=");if (pos == std::string::npos){ELOG("MsgQueue args format error:%s", kv.c_str());assert(0);}std::string key = kv.substr(0, pos);std::string val = kv.substr(pos + 1);_args[key] = val;}}std::string getArgs(){std::string ret;for (auto &kv : _args){ret += kv.first + "=" + kv.second + "&";}return ret;}};using msgqueue_map = std::unordered_map<std::string, std::shared_ptr<MsgQueue>>;class MsgQueueMapper{private:SqliteHelper sql_helper;public:MsgQueueMapper(const std::string &dbname): sql_helper(dbname){// 数据库有path即可,open时自动创建文件std::string path = FileHelper::getParentDirName(dbname);FileHelper::createDir(path);if (!sql_helper.open()){ELOG("MsgQueueMapper open db failed:%s", dbname.c_str());assert(0);}createTable();}// 1.创建,删除表void createTable(){std::stringstream sql;sql << "create table if not exists msg_queue(";sql << "name varchar(64) primary key,";sql << "durable int,";sql << "exclusive int,";sql << "auto_del int,";sql << "args varchar(64));";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper create table failed:%s", sql.str().c_str());assert(0);}}void dropTable(){std::stringstream sql;sql << "drop table if exists msg_queue;";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper drop table failed:%s", sql.str().c_str());assert(0);}}// 2.插入/删除数据void insert(MsgQueue::ptr &msg_queue_ptr){std::stringstream sql;sql << "insert into msg_queue (name,durable,exclusive,auto_del,args) values(";sql << "'" << msg_queue_ptr->_name << "',";sql << msg_queue_ptr->_durable << ",";sql << msg_queue_ptr->_exclusive << ",";sql << msg_queue_ptr->_auto_del << ",";sql << "'" << msg_queue_ptr->getArgs() << "');";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper insert failed:%s", sql.str().c_str());assert(0);}}void remove(const std::string &name){std::stringstream sql;sql << "delete from msg_queue where name='" << name << "';";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("MsgQueueMapper remove failed:%s", sql.str().c_str());assert(0);}}// 3.recovermsgqueue_map recover(){msgqueue_map ret;std::string sql = "select * from msg_queue;";if (!sql_helper.exec(sql, MsgQueueMapper::msgQueueMapCb, &ret)){ELOG("MsgQueueMapper recover failed:%s", sql.c_str());assert(0);}return ret;}private:static int msgQueueMapCb(void *arg, int col_count, char **col_values, char **col_names){msgqueue_map *ret = (msgqueue_map *)arg;MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>();msg_queue_ptr->_name = col_values[0];msg_queue_ptr->_durable = std::stoi(col_values[1]);msg_queue_ptr->_exclusive = std::stoi(col_values[2]);msg_queue_ptr->_auto_del = std::stoi(col_values[3]);if (col_values[4]){msg_queue_ptr->setArgs((std::string)col_values[4]);}elseELOG("没有其它参数");ret->insert(std::make_pair(msg_queue_ptr->_name, msg_queue_ptr));return 0;}};class MsgQueueManager{public:using ptr = std::shared_ptr<MsgQueueManager>;private:msgqueue_map _msgQueues;std::mutex _mutex;MsgQueueMapper _mapper;public:MsgQueueManager(const std::string &dbname): _mapper(dbname){_msgQueues = _mapper.recover();}// 1. 插入/删除数据bool declareQueue(const std::string &name,bool durable,bool exclusive,bool auto_del,const google::protobuf::Map<std::string, std::string> &args){std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it != _msgQueues.end()){ILOG("MsgQueueManager declareQueue:%s already exists", name.c_str())return true;}MsgQueue::ptr msg_queue_ptr = std::make_shared<MsgQueue>(name, durable, exclusive, auto_del, args);_msgQueues.insert(std::make_pair(name, msg_queue_ptr));if (msg_queue_ptr->_durable)_mapper.insert(msg_queue_ptr);// std::cout<<"declare队列:"<<msg_queue_ptr->_name<<std::endl;// std::cout<<"队列的个数:"<<_msgQueues.size()<<std::endl;return true;}bool removeQueue(const std::string &name){std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ILOG("MsgQueueManager removeQueue:%s not exists", name.c_str());return true;}_msgQueues.erase(it);if (it->second->_durable)_mapper.remove(name);return true;}// 2. 查询一个/所有 queueMsgQueue::ptr selectQueue(const std::string &name){std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ILOG("MsgQueueManager select:%s not exists", name.c_str());return MsgQueue::ptr();}return it->second;}msgqueue_map selectAll(){return _msgQueues;}// 3. 其它操作bool exists(const std::string &name){std::lock_guard<std::mutex> lock(_mutex);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){ILOG("MsgQueueManager :%s not exists", name.c_str());return false;}return true;}size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _msgQueues.size();}void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.dropTable();_msgQueues.clear();}};
};


http://www.ppmy.cn/news/1509396.html

相关文章

Stable Diffusion绘画 | 提示词基础原理

提示词之间使用英文逗号“,”分割 例如&#xff1a;1girl,black long hair, sitting in office 提示词之间允许换行 但换行时&#xff0c;记得在结尾添加英文逗号“,”来进行区分 权重默认为1&#xff0c;越靠前权重越高 每个提示词自身的权重默认值为1&#xff0c;但越靠…

数据预处理和探索性数据分析(上)

目录 数据预处理 数据清洗 处理缺失值&#xff1a; 异常值检测与处理&#xff1a; 类别特征编码&#xff1a; 特征工程 创建新特征&#xff1a; 特征缩放&#xff1a; 探索性数据分析 (EDA) 使用Matplotlib进行可视化 绘制直方图&#xff1a; 绘制箱线图&#xff1…

【网络编程】组播的实现(C语言,linux,Ubuntu)

组播 1> 组播也是实现一对多的通信方式&#xff0c;对于广播而言&#xff0c;网络需要对每个消息进行复制转发&#xff0c;会占用大量的带宽&#xff0c;导致网络拥塞 2> 组播可以实现小范围的数据传播&#xff1a;将需要接收数据的接收端加入多播组&#xff0c;发送端…

LeetCode 45. 跳跃游戏 II 题解

引言 在LeetCode的算法题库中&#xff0c;“跳跃游戏 II”是一个经典的贪心算法问题。这个问题不仅考验了我们对数组操作的理解&#xff0c;还锻炼了我们如何利用贪心策略来优化问题求解。本文将详细解析这个问题&#xff0c;并提供Java语言的解决方案。 问题描述 给定一个非…

Java语言程序设计基础篇_编程练习题16.22(播放、循环播放和停止播放一个音频剪辑)

题目&#xff1a;16.22&#xff08;播放、循环播放和停止播放一个音频剪辑&#xff09; 编写一个满足下面要求的程序&#xff1a; 使用AudioClip获取一个音频文件&#xff0c;该文件存放在类目录下。放置三个标记为Play、Loop和Stop的按钮&#xff0c;如图16-46a所示。单击Pla…

F.Enchanted

https://codeforces.com/gym/105139/problem/F24湖北省赛F 看了一下前面两种操作&#xff0c;做法不是很明显 后面两种操作&#xff0c;一看就是可持久化线段树&#xff0c;单点修改&#xff0c;版本复制 接下来解决前面的两种操作 第一个操作 两个相同的合成一个新的(33-&…

K8S上安装LongHorn(分布式块存储) --use

要在 Kubernetes上安装 LongHorn&#xff0c;您可以按照以下步骤进行操作&#xff1a; 准备工作 参考 官网教程将LongHorn只部署在k8s-worker5节点上。https://github.com/longhorn/longhorn 安装要求 Each node in the Kubernetes cluster where Longhorn is installed must f…

信奥C++,输入一个n*m的矩阵,在输入一个变量s,如果s==0,则水平翻转矩阵,否则垂直翻转矩阵

属于信奥题目&#xff0c;具体解法如下。已经通过调试&#xff0c;欢迎使用。 有更有解法欢迎在评论区沟通&#xff0c;谢谢&#xff01; #include <iostream> using namespace std; int main(){ int m,n; cin>>m>>n; int matrix[100][100]; for(int i0;…