rabbitMq-----消费者管理模块

news/2024/12/22 1:22:28/

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 消费者字段
  • 消费者内存管理类
    • 新增/删除消费者
  • 总的消费者管理类


前言

只有订阅客户端订阅了一个队列消息后,他才是一个消费者,而消费者存在的意义是,当这个队列上有消息的时候我们呢需要给订阅了这个队列的客户端推送消息,也就需要获取客户端的连接,所以我们呢可以通过消费者来找到对应的连接来进行消息的推送。


消费者字段

消费者订阅的队列名称,消费者也是按照队列问单元进行管理的。
消费者标志,标志消费者唯一性
自动应答标志,客户端在订阅队列消息时可以进行设置,如果为true,则将消息推送给消费客户端后不需要等待应带,直接将消息进行删除。
还有一个回调函数。当队列有消息到来时,就需要选择一个队列消费者进行消费,那么如何进行消费呢?就是调用这个回调函数。这个函数的逻辑固定,就是通过连接发送消息。是由我们broker服务器设置的。

//当队列收到一条消息后,需要选择一个消费者进行消费,那么如何消费呢?//就是调用这个回调函数,其内部逻辑是:找到消费者对应的连接,将消息发送给客户端。using ConsumerCallBack = std::function<void(const std::string&,const BasicProperties *,const std::string &)>;struct Consumer{using ptr = std::shared_ptr<Consumer>;std::string _qname; //消费者订阅的队列名称std::string _ctag;  //消费者标识bool _auto_ack;  //自动应答标志ConsumerCallBack _cb;
}

消费者内存管理类

消费者是以队列为单位进行管理的,我们使用一个vector管理订阅该队列的消费者。
目的是方便进行rr轮转。

class QueueConsumer{private:std::string _qname; //队列名称std::mutex _mutex;  uint64_t _rr_sep;   //轮转序号std::vector<Consumer::ptr> _consumers;}

新增/删除消费者

构造一个消费者对象进行添加。删除就是遍历删除。

 //新增一个消费者 Consumer::ptr create(const std::string &qname,const std::string &ctag,bool auto_ack,const ConsumerCallBack &cb){//1.加锁std::unique_lock<std::mutex> lock(_mutex);//2.判断消费者是否存在for(auto &cons : _consumers){if(cons->_ctag == ctag){return cons;}}//3.创建消费者并添加管理Consumer::ptr csp = std::make_shared<Consumer>(qname,ctag,auto_ack,cb);_consumers.push_back(csp);return csp;
}
//删除一个消费者:取消订阅、信道关闭、连接关闭的时候删除
void remove(const std::string &ctag){//1.加锁std::unique_lock<std::mutex> lock(_mutex);//2.遍历删除for(auto it = _consumers.begin(); it != _consumers.end(); ++it){if((*it)->_ctag == ctag){_consumers.erase(it);return;}}return;
}

选择一个消费者,rr轮转的思想。
在收到一条消息后,会进行路由匹配,获取到匹配成功的队列。然后把消息推送到队列上,同时取出一个消费者进行消费。

 //选择一个消费者,rr轮转的思想Consumer::ptr choose(){//1.加锁std::unique_lock<std::mutex> lock(_mutex);//如果没有消费者,则返回空对象if(_consumers.size() == 0){return Consumer::ptr();}//2.获取当前选择的消费者下标uint64_t idx = _rr_sep % _consumers.size();_rr_sep++;//3.获取对象并返回return _consumers[idx];}

总的消费者管理类

这个类才是对外提供的类。
就是用一个哈希表把队列名称和消费者管理对象关联起来进行管理。
有一initQueueConsumer函数就是用力啊初始化队列消费者管理对象的,需要判断当前以存在的队列,来进行初始化对应的消费者管理类。

 class ConsumerManager{private:    std::mutex _mutex;std::unordered_map<std::string,QueueConsumer::ptr> _qconsumers;public:using ptr = std::shared_ptr<ConsumerManager>;ConsumerManager(){}//初始化队列消费者管理void initQueueConsumer(const std::string &qname){//1. 加锁std::unique_lock<std::mutex> lock(_mutex);//2. 重复判断auto it = _qconsumers.find(qname);if (it != _qconsumers.end()) {return ;}//3. 新增auto qconsumers = std::make_shared<QueueConsumer>(qname);_qconsumers.insert(std::make_pair(qname, qconsumers));}
}

删除队列消费者管理类对象,当一个队列删除时,就选哦删除这个队列的消费者管理类对象,

 //删除队列消费者管理void destroyQueueConsumer(const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(qname);if (it == _qconsumers.end()) {return ;}_qconsumers.erase(it);}

创建/删除指定队列的消费者管理类对象中的消费者
根据队列的名称,选择指定的消费者管理类对象进行一个操作即可,

Consumer::ptr create(const std::string &qname,const std::string &ctag,bool auto_ack,const ConsumerCallBack &cb){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(qname);if(it == _qconsumers.end()){ELOG("没有找到队列 %s 的消费者管理句柄!",qname.c_str());return Consumer::ptr();}qcp = it->second;}//这个create不需要我们这里的mutex保护,他有自己的mutexreturn qcp->create(qname,ctag,auto_ack,cb);
}
void remove(const std::string &ctag,const std::string &qname){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(qname);if(it == _qconsumers.end()){ELOG("没有找到队列 %s 的消费者管理句柄!",qname.c_str());return ;}qcp = it->second;}qcp->remove(ctag);
}

选择指定队列的消费者管理类对象中的消费者


Consumer::ptr choose(const std::string &qname){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(qname);if(it == _qconsumers.end()){ELOG("没有找到队列 %s 的消费者管理句柄!",qname.c_str());return Consumer::ptr();}qcp = it->second;}return qcp->choose();
}

http://www.ppmy.cn/news/1537171.html

相关文章

文本区域分割系统源码&数据集分享

文本区域分割系统源码&#xff06;数据集分享 [yolov8-seg-EfficientHead&#xff06;yolov8-seg-EfficientRepBiPAN等50全套改进创新点发刊_一键训练教程_Web前端展示] 1.研究背景与意义 项目参考ILSVRC ImageNet Large Scale Visual Recognition Challenge 项目来源AAAI …

Python+requests—get请求模板

目录 专栏导读库的介绍库的安装代码总结 专栏导读 &#x1f338; 欢迎来到Python办公自动化专栏—Python处理办公问题&#xff0c;解放您的双手 &#x1f3f3;️‍&#x1f308; 博客主页&#xff1a;请点击——> 一晌小贪欢的博客主页求关注 &#x1f44d; 该系列文章专…

JMeter中线程组、HTTP请求的常见参数解释

在JMeter中&#xff0c;线程组和HTTP请求是进行性能测试的两个核心组件。以下是它们的一些常见相关参数的解释&#xff1a; 线程组参数 线程数 指定模拟的用户数&#xff0c;即并发执行的线程数。 Ramp-Up时间&#xff08;秒&#xff09; 指定所有线程启动的时间间隔。在这…

try、catch、finally、return执行顺序超详解析与throw与throws区别

try、catch、finally、return执行顺序超详解析&#xff08;针对面试题&#xff09; 有关try、catch、finally和return执行顺序的题目在面试题中可谓是频频出现。总结一下此类问题几种情况。 写在前面 不管try中是否出现异常&#xff0c;finally块中的代码都会执行&#xff1b…

NVP的含义?如何理解其在AEM|FLUKE线缆认证测试中的意义?不同的NVP会出现怎样的结果?

在AEM|FLUKE铜缆认证测试中&#xff0c;有很多朋友对NVP设置有疑问&#xff0c;不知道应该怎么去设置它&#xff0c;并很好的应用它&#xff0c;那我们基于此&#xff0c;做一个简单的分析。 什么是NVP? NVP是Nominal Velocity of Propagation的缩写&#xff1f;简单直接译过…

论文阅读——联邦忘却学习研究综述

文章基本信息 作者&#xff1a; 王鹏飞魏宗正周东生宋威肖蕴明孙庚于硕张强 机构&#xff1a; 大连理工大学计算机科学与技术学院大连理工大学社会计算与认知智能教育部重点实验室大连大学先进设计与智能计算教育部重点实验室美国西北大学计算机科学系吉林大学计算机科学与…

AI在医学领域:Arges框架在溃疡性结肠炎上的应用

溃疡性结肠炎&#xff08;UC&#xff09;是一种慢性炎症性肠病&#xff08;IBD&#xff09;&#xff0c;在全球大约影响着500万人&#xff0c;导致肠道炎症和溃疡。在UC的临床试验中&#xff0c;通常通过内窥镜视频来评估结肠疾病的严重程度&#xff0c;并使用如Mayo内窥镜下分…

React常见优化问题

在React开发中&#xff0c;性能优化是一个重要且持续的过程&#xff0c;旨在提升应用的响应速度和用户体验。以下是一些常见的React优化问题详解&#xff0c;并附上相应的代码示例。 1. 避免不必要的组件渲染 React组件的渲染是由其props或state的变化触发的。但是&#xff0c;…