exchange.hpp交换机模块

server/2024/10/18 8:23:06/

一.Exchange相关类介绍

二.Exchange类的实现

交换机类型

对于 RabbitMQ 来说, 主要⽀持四种交换机类型: 
• Direct 
• Fanout 
• Topic 
• Header 
其中 Header 这种⽅式⽐较复杂, ⽐较少⻅。常⽤的是前三种交换机类型,项⽬中也主要实现这三种 
• Direct: ⽣产者发送消息时, 指定被该交换机绑定的所有队列中满足(RoutingKey==BindingKey)的
• Fanout: ⽣产者发送的消息会被复制到该交换机绑定的所有队列中  
• Topic: 绑定队列到交换机上时, 指定⼀个字符串为 BindingKey。发送消息指定⼀个字符串为 
RoutingKey。当 RoutingKey 和 BindingKey满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列

交换机类型的定义

// 交换机类型
enum ExchangeType
{UNKNOWNTYPE = 0;DIRECT = 1;FANOUT = 2;TOPIC = 3;
};// 消息投递模式(决定是否持久化)
enum DeliverMode
{NORMAL = 0;UNDURABLE = 1;DURABLE = 2;
};

Exchange类的相关接口

主要成员变量

  • _namestd::string 类型,表示交换机的名称。
  • _typemsg::ExchangeType 类型,表示交换机的类型。
  • _durablebool 类型,表示交换机是否持久化。
  • _auto_delbool 类型,表示交换机是否自动删除。
  • _argsgoogle::protobuf::Map<std::string, std::string> 类型,用于存储交换机的其他参数。这个 Map 用于键值对存储交换机的自定义属性。
std::string _name;                                     // 交换机名称
msg::ExchangeType _type;                               // 交换机类型
bool _durable;                                         // 是否持久化
bool _auto_del;                                        // 是否自动删除
google::protobuf::Map<std::string, std::string> _args; // 其它参数

公有成员函数

setArgs 函数
  • 用于从格式化字符串中解析出参数并存储到 _args 成员变量中。格式化字符串的格式是 key=val&key=val...,多个键值对通过 & 符号分隔。
  • 实现思路:首先将字符串按 & 进行分割,然后对于每个键值对,按 = 分割为键和值,并存入 _args 中。如果格式错误(找不到 =),则记录错误日志并终止程序。
void setArgs(const std::string &str_args)
{std::vector<std::string> args;size_t sz = StrHelper::split(str_args, "&", args);for (auto &kv : args){size_t pos = kv.find("=");if (pos == std::string::npos){ELOG("Exchange args format error:%s", kv.c_str());assert(0);}std::string key = kv.substr(0, pos);std::string val = kv.substr(pos + 1);_args[key] = val;}
}
getArgs 函数
  • _args 成员变量中的kv参数转换为格式化字符串,格式同上。它遍历 _args Map,将每个键值对拼接为字符串并以 & 符号分隔。
std::string getArgs()
{std::string ret;for (auto &kv : _args){ret += kv.first + "=" + kv.second + "&";}return ret;
}

三.ExchangeMapper类的实现

ExchangeMapper 类负责Exchange 对象的属性持久化到数据库中,并提供了对这些数据的插入、删除和查询等操作。它通过 SqliteHelper 类与 SQLite 数据库进行交互,确保 Exchange 对象的信息能够在数据库中保存和恢复

主要成员变量

  • sql_helperSqliteHelper 类型,用于管理与 SQLite 数据库的连接和操作

构造函数

  • ExchangeMapper(const std::string &dbname):构造函数接受一个数据库文件的路径 dbname,并使用 SqliteHelper 进行数据库连接的初始化。如果数据库文件所在路径不存在,则会自动创建。同时,构造函数会调用 createTable 方法创建存储 Exchange 信息的表

公有成员函数

createTable

用于在数据库中创建存储 Exchange 对象信息的表 exchange_table。表中包含以下字段:

args:存储其他参数的字符串。

auto_del:是否自动删除。

durable:是否持久化。

type:交换机类型。

name:交换机名称,作为主键。

void createTable()
{std::stringstream sql;sql << "create table if not exists exchange_table\('name' varchar(64) primary key,\type int,durable int,auto_del int,args varchar(128));";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("ExchangeMapper create table error!");assert(0);}
}
dropTable
  • 用于删除 exchange_table
void dropTable()
{std::stringstream sql;sql << "drop table if exists exchange_table;";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("ExchangeMapper drop table error!");assert(0);}
}
insert
  • 将一个 Exchange 对象插入到 exchange_table 表中。它将对象的各个成员变量转化为 SQL 语句中的对应字段值,并执行插入操作。
void insert(Exchange::ptr &exchange_ptr)
{std::stringstream sql;sql << "insert into exchange_table values('" << exchange_ptr->_name << "'," << exchange_ptr->_type << "," << exchange_ptr->_durable << ","<< exchange_ptr->_auto_del << ",'"<< exchange_ptr->getArgs() << "');";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("ExchangeMapper insert error!");assert(0);}
}
remove
  • exchange_table 表中删除指定名称的 Exchange 对象
void remove(const std::string &name)
{std::stringstream sql;sql << "delete from exchange_table where name='" << name << "';";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("ExchangeMapper remove error!");assert(0);}
}
recover
  • exchange_table 表中查询所有的 Exchange 对象,并将它们恢复为 Exchange 对象存储在一个哈希表(exchange_map)中返回。该函数使用一个回调函数 exchangeMapCb 处理查询结果,将每一行记录转化为 Exchange 对象。

using exchange_map = std::unordered_map<std::string, Exchange::ptr>;

exchange_map recover()
{exchange_map ret;std::stringstream sql;sql << "select * from exchange_table;";if (!sql_helper.exec(sql.str(), exchangeMapCb, &ret)){ELOG("ExchangeMapper recover error!");assert(0);}return ret;
}
exchangeMapCb
  • 该静态回调函数在 recover 函数中被调用,用于将数据库查询结果中的每一行转化为一个 Exchange 对象,并将其存储到 exchange_map 中。
static int exchangeMapCb(void *args, int rows, char **ret, char **fields)
{Exchange::ptr exchange_ptr = std::make_shared<Exchange>();exchange_map *map_ptr = (exchange_map *)args;exchange_ptr->_name = ret[0];exchange_ptr->_type = (msg::ExchangeType)std::stoi(ret[1]);exchange_ptr->_durable = std::stoi(ret[2]);exchange_ptr->_auto_del = std::stoi(ret[3]);if (ret[4]){exchange_ptr->setArgs(ret[4]);}map_ptr->insert(std::make_pair(exchange_ptr->_name, exchange_ptr));return 0;
}

四.ExchangeManager类的实现

ExchangeManager 类用于管理系统中的 Exchange 对象,并提供声明、删除、查询、判断存在性以及清理等功能。该类在内部通过一个 ExchangeMapper 实例与数据库交互,以实现 Exchange 对象的持久化和恢复。

主要成员变量

  • _mutexstd::mutex 类型,用于确保多线程环境下对 _exchanges 数据结构的安全访问。
  • _mapperExchangeMapper 类型,用于与数据库进行交互,负责 Exchange 对象的持久化操作。
  • _exchangesexchange_map 类型,用于在内存中存储所有当前管理的 Exchange 对象

构造函数

  • ExchangeManager(const std::string &dbname):构造函数接受一个数据库文件的路径 dbname,通过 ExchangeMapper 从数据库中恢复已存储的 Exchange 对象到内存中_exchanges 哈希表。

成员函数(增删查)

declareExchange 函数

声明一个 Exchange 对象,如果该名称的 Exchange 已存在则直接返回 true。如果不存在,则创建一个新的 Exchange 对象并插入到 _exchanges 中,并在持久化标志 _durabletrue 时,将其插入到数据库中。

bool declareExchange(const std::string &name, msg::ExchangeType type,bool durable,bool auto_del,google::protobuf::Map<std::string, std::string> args)
{std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it != _exchanges.end()){ILOG("ExchangeManager declareExchange:%s already exists!", name.c_str());return true;}Exchange::ptr exchange_ptr = std::make_shared<Exchange>(name, type, durable, auto_del, args);_exchanges.insert(std::make_pair(name, exchange_ptr));if (exchange_ptr->_durable == true)_mapper.insert(exchange_ptr);return true;
}
removeExchange 函数
  • 删除一个指定名称的 Exchange 对象。如果该对象存在且是持久化的,则同时从数据库中删除该记录。
bool removeExchange(const std::string &name)
{std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it == _exchanges.end()){ILOG("ExchangeManager removeExchange not found!");return true;}_exchanges.erase(it);if (it->second->_durable)_mapper.remove(name);return true;
}
selectExchange 函数
  • 获取指定名称的 Exchange 对象的智能指针,如果未找到则返回一个空的智能指针
Exchange::ptr selectExchange(const std::string &name)
{std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it == _exchanges.end())return Exchange::ptr();elsereturn it->second;
}
exists 函数
bool exists(const std::string &name)
{std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);return it != _exchanges.end();
}
clear 函数

清理内存中的 _exchanges 哈希表,并删除数据库中的 exchange_table 表。

void clear()
{std::unique_lock<std::mutex> lock(_mutex);_exchanges.clear();_mapper.dropTable();
}
size 函数
size_t size()
{std::unique_lock<std::mutex> lock(_mutex);return _exchanges.size();
}

五.exchange.hpp所有代码

#pragma once#include "../common_mq/helper.hpp"
#include "../common_mq/logger.hpp"
#include "../common_mq/msg.pb.h"
#include <string>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <cassert>
#include <cstring>
#include <google/protobuf/map.h>
namespace mq
{struct Exchange{using ptr = std::shared_ptr<Exchange>;Exchange() {}Exchange(const std::string &name, msg::ExchangeType type,bool durable,bool auto_del,const google::protobuf::Map<std::string, std::string> &args): _name(name),_type(type),_durable(durable),_auto_del(auto_del),_args(args) {}// args存储键值对,在存储数据库的时候,会组织一个格式字符串进行存储 key=val & key=val....// 内部解析str_args字符串,将内容存储到成员中void setArgs(const std::string &str_args){std::vector<std::string> args;size_t sz = StrHelper::split(str_args, "&", args);for (auto &kv : args){size_t pos = kv.find("=");if (pos == std::string::npos){ELOG("Exchange args format error:%s", kv.c_str());assert(0);}std::string key = kv.substr(0, pos);std::string val = kv.substr(pos + 1);_args[key] = val;// ILOG("Exchange args:%s", kv.c_str());}}std::string getArgs(){std::string ret;for (auto &kv : _args){ret += kv.first + "=" + kv.second + "&";}return ret;}std::string _name;                                     // 交换机名称msg::ExchangeType _type;                               // 交换机类型bool _durable;                                         // 是否持久化bool _auto_del;                                        // 是否自动删除google::protobuf::Map<std::string, std::string> _args; // 其它参数
};using exchange_map = std::unordered_map<std::string, Exchange::ptr>;// 在数据库中 存储exchange信息class ExchangeMapper{public:ExchangeMapper(const std::string &dbname): sql_helper(dbname){// 数据库有path即可,open时自动创建文件std::string path = FileHelper::getParentDirName(dbname);FileHelper::createDir(path);if (!sql_helper.open()){ELOG("ExchangeMapper open error!");assert(0);}// std::cout<<"创建表!!!"<<std::endl;createTable();}~ExchangeMapper() {}// 1.创建/删除Exchange表void createTable(){// std::cout<<"创建表!!!"<<std::endl;std::stringstream sql;sql << "create table if not exists exchange_table\('name' varchar(64) primary key,\type int,durable int,auto_del int,args varchar(128));";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("ExchangeMapper create table error!");assert(0);}}void dropTable(){std::stringstream sql;sql << "drop table if exists exchange_table;";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("ExchangeMapper drop table error!");assert(0);}}// 2.插入/删除Exchangevoid insert(Exchange::ptr &exchange_ptr){std::stringstream sql;sql << "insert into exchange_table values('" << exchange_ptr->_name << "'," << exchange_ptr->_type << "," << exchange_ptr->_durable << ","<< exchange_ptr->_auto_del << ",'"<< exchange_ptr->getArgs() << "');";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("ExchangeMapper insert error!");assert(0);}}void remove(const std::string &name){std::stringstream sql;sql << "delete from exchange_table where name='" << name << "';";if (!sql_helper.exec(sql.str(), nullptr, nullptr)){ELOG("ExchangeMapper remove error!");assert(0);}}// 3.查询所有的Exchange,返回一个哈希表// 将数据库中的Exchange恢复到内存中,组织一个map并返回exchange_map recover(){exchange_map ret;std::stringstream sql;sql << "select * from exchange_table;";if (!sql_helper.exec(sql.str(), exchangeMapCb, &ret)){ELOG("ExchangeMapper recover error!");assert(0);}return ret;}private:static int exchangeMapCb(void *args, int rows, char **ret, char **fields){Exchange::ptr exchange_ptr = std::make_shared<Exchange>();exchange_map *map_ptr = (exchange_map *)args;exchange_ptr->_name = ret[0];exchange_ptr->_type = (msg::ExchangeType)std::stoi(ret[1]);exchange_ptr->_durable = std::stoi(ret[2]);exchange_ptr->_auto_del = std::stoi(ret[3]);// 避免args为空时出错if (ret[4]){// ILOG("有其它参数,setArgs");// std::cout<<"ret[4]"<<ret[4]<<std::endl;exchange_ptr->setArgs(ret[4]);}else{ILOG("没有其它参数");}map_ptr->insert(std::make_pair(exchange_ptr->_name, exchange_ptr));return 0;}private:SqliteHelper sql_helper;};// 对外Exchange管理类class ExchangeManager{private:std::mutex _mutex;ExchangeMapper _mapper;exchange_map _exchanges;public:using ptr = std::shared_ptr<ExchangeManager>;ExchangeManager(const std::string &dbname): _mapper(dbname){_exchanges = _mapper.recover();// std::cout<<"recover后的个数"<<_exchanges.size()<<std::endl;}// 1.声明交换机,没有则创建bool declareExchange(const std::string &name, msg::ExchangeType type,bool durable,bool auto_del,google::protobuf::Map<std::string, std::string> args){std::unique_lock<std::mutex> lock(_mutex);// std::cout<<"声明之前----"<<std::endl;// std::cout<<_exchanges.size()<<std::endl;auto it = _exchanges.find(name);if (it != _exchanges.end()) // 找到了{std::cout << _exchanges[name]->_args.size() << std::endl;ILOG("ExchangeManager declareExchange:%s already exists!", name.c_str());return true;}Exchange::ptr exchange_ptr = std::make_shared<Exchange>(name, type, durable, auto_del, args);// std::cout<<"打印的是构建出的对象"<<std::endl;// const auto &tmp = exchange_ptr->_args;// for(const auto &tmp : exchange_ptr->_args)// {//     std::cout<<"key:"<<tmp.first<<" value:"<<tmp.second<<std::endl;// }_exchanges.insert(std::make_pair(name, exchange_ptr));// 若持久化则插入数据库if (exchange_ptr->_durable == true)_mapper.insert(exchange_ptr);return true;}// 2.删除交换机bool removeExchange(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it == _exchanges.end()) // 没找到{ILOG("ExchangeManager removeExchange not found!");return true;}// 删除交换机_exchanges.erase(it);// 若持久化则在数据库中也removeif (it->second->_durable)_mapper.remove(name);return true;}// 3.获取指定交换机/判断是否存在Exchange::ptr selectExchange(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it == _exchanges.end()) // 没找到返回空的ptrreturn Exchange::ptr();elsereturn it->second;}bool exists(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it == _exchanges.end()) // 没找到return false;elsereturn true;}// 4.清理数据结构void clear(){std::unique_lock<std::mutex> lock(_mutex);_exchanges.clear();_mapper.dropTable();}size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _exchanges.size();}};
};


http://www.ppmy.cn/server/99667.html

相关文章

docker拉取MySQL后数据库连接失败解决方案

如果数据库连接失败&#xff0c;检查以下几个地方&#xff1a; 1&#xff1a;防火墙&#xff0c;查看防火墙是否打开&#xff1a; systemctl status firewalld 关闭状态&#xff1a; 开启状态&#xff1a; 如果是开启状态&#xff0c;则很有可能是防火墙拦截掉了3306端口的访问…

使用Anaconda安装多个版本的Python并与Pycharm进行对接

1、参考链接 Anaconda安装使用教程解决多Python版本问题_anaconda安装多个python版本-CSDN博客 基于上面的一篇博客的提示&#xff0c;我做了尝试。并在Pycharm的对接上做了拓展。 2、首先安装Anaconda 这个比较简单&#xff0c;直接安装即可&#xff1a; 3、设置conda.exe的…

振德医疗选择泛微千里聆RPA,助力电商、人事业务流程自动化

振德医疗用品股份有限公司成立于1994年&#xff0c;中国A股上市公司&#xff0c;是医用敷料和感控防护产品主要的供应商之一。 &#xff08;图片素材来自振德医疗官网&#xff09; 振德医疗的业务在线上线下齐发力。目前拥有5个国内生产基地&#xff0c;3个海外工厂&#xff0…

【AI】简单了解人工智能的三个飞跃

人工智能&#xff1a;推理能力的实质性飞跃 我们说起人工智能&#xff0c;目前大部分都在说大模型&#xff0c;特别是LLM的相关能力。 在科技日新月异的今天&#xff0c;人工智能&#xff08;AI&#xff09;正以前所未有的速度发展&#xff0c;其在众多领域的应用和突破令人瞩…

kotlin协程之runBlocking

前言 上一篇: Callback转挂起函数 文章中,介绍了在Kotlin协程中如何把传统的回调风格的异步操作转换为协程风格的挂起函数,这个在开发过程中是非常常用的,主要用于把 callback 风格的代码转换为协程中的挂起函数,以便于我们在协程环境中调用。 但是,有时候我们也会遇到…

leetcode300. 最长递增子序列,动态规划附状态转移方程

leetcode300. 最长递增子序列 给你一个整数数组 nums &#xff0c;找到其中最长严格递增子序列的长度。 子序列 是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改变其余元素的顺序。例如&#xff0c;[3,6,2,7] 是数组 [0,3,1,6,2,2…

MySQL第5讲--DML(数据操作语言)的基本操作

文章目录 前言DML(数据操作语言)基本操作增加数据案例展示修改数据案例展示删除数据 DML总结 前言 在第四讲MySQL第4讲–图像化界面工具DataGrip介绍中我们讲述了数据库的可视化操作界面&#xff1b;今天我们讲一下数据库中DML(数据操作语言)的基本操作&#xff1b; DML(数据操…

pnpm的使用

https://blog.csdn.net/m0_56416743/article/details/136122153 https://juejin.cn/post/7301610680457543743 为什么要使用 pnpm 当我们使用 npm 或 yarn 来安装项目依赖&#xff0c;如果我们的电脑里有很多的项目依赖了同个包&#xff0c;比如 axios&#xff0c;那么 axio…