仿RabbitMq实现简易消息队列正式篇(消费者篇)

ops/2024/10/18 14:28:06/
@TOC

消费者管理模块

客户端由两种:发布消息,订阅消息

因此订阅了指定队列消息的客户端才是一个消费者。

消费者数据存在的意义:当指定队列有了消息以后,就需要将消息推送给这个消费者客户端(推送的时候就需要找到这个客户端相关的信息--连接)

消费者信息:

消费者标识 --tag

订阅队列名称:当当前队列有消息就会推送给这个客户端,以及当客户端收到消息,需要对指定队列的消息进行确认

消费处理回调函数:队列有一个消息后,通过哪个函数进行处理

客户端消费者设计

#ifndef __M_CONSUMER_H__
#define __M_CONSUMER_H__
#include "../mqcommon/logger.hpp"
#include "../mqcommon/helper.hpp"
#include "../mqcommon/msg.pb.h"
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <vector>
#include <functional>namespace mymq
{using ConsumerCallback = std::function<void(const std::string, const BasicProperties* bp, const std::string)>;struct Consumer{using ptr = std::shared_ptr<Consumer>;std::string tag;    //消费者标识std::string qname;  //消费者订阅的队列名称bool auto_ack;      //自动确认标志ConsumerCallback callback;Consumer(){DLOG("new Consumer: %p", this);}Consumer(const std::string& ctag, const std::string& queue_name, bool auto_flag, const ConsumerCallback& cb):tag(ctag), qname(queue_name),auto_ack(auto_flag),callback(std::move(cb)){DLOG("new Consumer: %p", this);}~Consumer(){DLOG("del Consumer: %p", this);}};
} #endif

服务端消费者管理

管理思想

以队列为单位进行管理

每个消费者订阅的都是指定队列的消息,消费者对消息进行确认也是以队列进行确认

最关键的是:当指定队列中有消息了,必然是获取订阅了这个队列的消费者信息进行消息推送

队列消费者管理结构

数据信息
  • 消费者数组
  • RR轮转盘
  • 存储消费者的队列名
管理操作
  • 新增消费者
  • RR轮转获取一个消费者
  • 删除消费者
  • 队列消费者数量
  • 是否为空
代码展示
    // 以队列为单元的消费者管理结构class QueueConsumer{public:using ptr = std::shared_ptr<QueueConsumer>;QueueConsumer(const std::string &qname): _qname(qname),_rr_seq(0){}// 队列新增消费者Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack_flag, const ConsumerCallback &cb){// 1. 加锁std::unique_lock<std::mutex> lock(_mutex);// 2. 判断消费者是否重复for (auto e : _consumers){if (e->tag == ctag){return Consumer::ptr();}}// 3. 没有重复则新增 -- 构造对象auto consumer = std::make_shared<Consumer>(ctag, _qname, ack_flag, cb);// 4. 添加管理后返回对象_consumers.push_back(consumer);return consumer;}// 队列移除消费者void remove(const std::string &ctag){// 1. 加锁std::unique_lock<std::mutex> lock(_mutex);// 2. 遍历 - 删除for (auto it = _consumers.begin(); it != _consumers.end(); it++){if ((*it)->tag == ctag){_consumers.erase(it);return;}}return;}// 队列获取消费者:RR轮转获取Consumer::ptr choose(){// 1. 加锁std::unique_lock<std::mutex> lock(_mutex);// 2. 获取当前轮转的下标if (_consumers.size() == 0){return Consumer::ptr();}int index = _rr_seq % _consumers.size();// 3. 获取对象, 返回_rr_seq++;return _consumers[index];}// 是否为空bool empty(){std::unique_lock<std::mutex> lock(_mutex);return _consumers.size() == 0;}// 判断指定消费者是否存在bool exists(const std::string &ctag){std::unique_lock<std::mutex> lock(_mutex);for (auto it = _consumers.begin(); it != _consumers.end(); it++){if ((*it)->tag == ctag){return true;}}return false;}void clear(){std::unique_lock<std::mutex> lock(_mutex);_consumers.clear();_rr_seq = 0;}private:std::string _qname;std::mutex _mutex;uint64_t _rr_seq;std::vector<Consumer::ptr> _consumers;};

管理操作

  • 初始化队列消费者结构
  • 删除队列消费者结构
  • 向指定队列添加消费者
  • 获取指定队列消费者
  • 删除指定队列消费者
class ConsumerManager{public:using ptr = std::shared_ptr<ConsumerManager>;ConsumerManager(){}void initQueueConsumer(const std::string &qname){// 1. 加锁std::unique_lock<std::mutex> lock(_mutex);// 2. 重复判断auto it = _qconsumers.find(qname);if (it != _qconsumers.end()){return;}// 3. 新增QueueConsumer::ptr qcp = std::make_shared<QueueConsumer>(qname);_qconsumers.insert(std::make_pair(qname, qcp));}void destroyQueueConsumer(const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);_qconsumers.erase(qname);}// 这里不理解,为啥找到了还要进行创建Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack_flag, const ConsumerCallback &cb){// 获取队列的消费者管理单元句柄,通过句柄完成新建QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()){DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return Consumer::ptr();}qcp = it->second;}return qcp->create(ctag, queue_name, ack_flag, cb);}void remove(const std::string &ctag, const std::string &queue_name){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()){DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return;}qcp = it->second;}return qcp->remove(ctag);}Consumer::ptr choose(const std::string &queue_name){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()){DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return Consumer::ptr();}qcp = it->second;}return qcp->choose();}bool empty(const std::string &queue_name){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()){DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return false;}qcp = it->second;}return qcp->empty();}bool exists(const std::string &ctag, const std::string &queue_name){QueueConsumer::ptr qcp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()){DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return false;}qcp = it->second;}return qcp->exists(ctag);}void clear(){std::unique_lock<std::mutex> lock(_mutex);_qconsumers.clear();}private:std::mutex _mutex;std::unordered_map<std::string, QueueConsumer::ptr> _qconsumers;};}


http://www.ppmy.cn/ops/94262.html

相关文章

【数据结构】顺序表实现

0. 前言 小伙伴们大家好&#xff0c;从今天开始&#xff0c;我们就开始学习《数据结构》这门课程~ 首先想给大家讲讲什么是数据结构&#xff1f; 0.1 数据结构是什么&#xff1f; 数据结构是由“数据”和“结构”两词组合⽽来。 什么是数据&#xff1f; 比如常⻅的数值1、…

动手学深度学习(pytorch)学习记录7-线性回归的从零开始实现[学习记录]

注&#xff1a;本代码在jupyter notebook上运行 封面图片来源 1、生成数据集 %matplotlib inline import random import torch from d2l import torch as d2l构造数据集&#xff1a;生成一个包含1000个样本的数据集&#xff0c; 每个样本包含从标准正态分布中采样的2个特征。…

八股面试大总结⑤ —— Redis篇

5.1 什么是Redis 内存型数据库&#xff0c;读写速度快有多种数据类型用于不同业务执行命令由单线程完成&#xff0c;不存在并发竞争 5.2 为什么用redis做mysql的缓存 redis高性能&#xff1a;用户第一次访问mysql的数据会从硬盘读取比较慢&#xff0c;之后会将数据缓存在red…

工业互联网边缘计算实训室解决方案

一、引言 随着物联网&#xff08;IoT&#xff09;、5G通信技术的快速发展&#xff0c;工业互联网已成为推动制造业转型升级的重要力量。边缘计算作为云计算的延伸和补充&#xff0c;在实时数据分析、降低数据传输延迟、提升处理效率及增强数据安全性方面展现出巨大潜力。在此背…

【网络】局域网LAN、广域网WAN、TCP/IP协议、封装和分用

文章目录 局域网 LAN广域网 WAN网络中的重要概念IP 地址端口号 认识协议协议分层是什么OSI 七层网络模型TCP/IP 五层网络模型&#xff08;或四层&#xff09;物理层传输层网络层数据链表层应用层网络设备所在分层 封装和分用[站在发送方视角]&#xff08;封装&#xff09;[站在…

代码随想录算法训练营第五十八天|拓扑排序精讲 、dijkstra(朴素版)精讲

拓扑排序 117. 软件构建 from collections import deque, defaultdictdef topological_sort(n, edges):inDegree [0] * n # inDegree 记录每个文件的入度umap defaultdict(list) # 记录文件依赖关系# 构建图和入度表for s, t in edges:inDegree[t] 1umap[s].append(t)# 初…

C语言中10个字符串函数详解

目录 1.strlen 2.strcpy 3.strcat 4.strcmp 5.strncpy 6.strncat 7.strncmp 8.strstr 9.strtok 10.strerror 1.strlen 基本结构&#xff1a;size_t strlen(const char *str)&#xff1b;功能&#xff1a;用于计算字符串的长度&#xff1b;字符串已经 0作为结束标志…

合合信息的OCR技术在智能文档处理方面有哪些具体的应用案例?

智能文档处理(IDP)是利用人工智能技术,自动从复杂的非结构化和半结构化文档中抽取关键数据,并将其转换成结构化数据的技术。能够自动识别、提取并结构化处理文档中的关键信息。这种技术通常基于自然语言处理&#xff08;NLP&#xff09;和计算机视觉等先进技术&#xff0c;可以…