第十章:服务器消费者管理模块

devtools/2025/2/28 23:05:58/

目录

第一节:代码实现

        1-1.Consumer类

        1-2.QueueConsumer类

         1-3.QueueConsumerManger类

第二节:单元测试

下期预告:


        服务器的消费者管理模块在mqserver目录下实现。

第一节:代码实现

        创建一个名为mq_consumer.hpp的文件,打开并做好前置工作:

#ifndef __M_CONSUMER_H__
#define __M_CONSUMER_H__
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"
// 以消息队列为单元管理消费者
#include <iostream>
#include <vector>
#include <unordered_map>
#include <mutex>
#include <functional> namespace zd
{};#endif

        1-1.Consumer类

        要管理消费者,首先要有消费者,定义class Consumer类,它的实现如下:

    // 消费者的回调函数类型using ConsumerCallback = std::function<void(const std::string,const BasicProperties*,const std::string)>;class Consumer{public:using ptr = std::shared_ptr<Consumer>;std::string tag;            // 消费者唯一标识std::string qname;          // 消费者订阅的队列名称bool auto_ack;              // 自动确认标志ConsumerCallback callback;  // 订阅队列收到消息后调用,作用是推送消息给消费者Consumer(){}Consumer(const std::string& ctag,const std::string& queue_name,bool ack_flag,const ConsumerCallback& cb):tag(ctag),qname(queue_name),auto_ack(ack_flag),callback(cb){}};

        tag:消费者的唯一标识,由用户设置

        qname:消费者订阅的队列

        auto_ack:自动确认标志,如果设置为true,服务器再推送完消息后会直接删除消息,不等待消费者的确认请求。

        callback:消费者对消息的处理函数,在服务端它的功能是固定的:将消息发送给对应的客户端消费者,因为服务器的消费者并不是真正的消费者,客户端消费者才是真正的消费者。而客户端消费者的消息处理函数才由用户自己定义。

        1-2.QueueConsumer类

        这个类用来管理一个队列的所有订阅者, 而且当一条消息到来时,不是所有队列的订阅者都能获得,只有队列当前轮询的一个消费者可以获得这条信息,这种叫做队列模型。

        还有每个订阅者都能获得消息的订阅/发布模型,现在实现的是队列模型,项目基本完成后也会实现一下订阅/发布模型。

        先实现一下构造函数和成员变量:

    // 一个队列的消费者管理class QueueConsumer{public:using ptr = std::shared_ptr<QueueConsumer>;QueueConsumer(const std::string& qname):_qname(qname),_rr_ser(0){}private:std::mutex _mtx;std::string _qname; // 当前管理的队列名称size_t _rr_ser; // 轮转序号:决定当前把消息推送给哪个消费者std::vector<Consumer::ptr> _consumers; // 该队列的所有消费者};

        消费者管理接口:

            // 新增消费者Consumer::ptr create(const std::string& ctag,bool ack_flag,const ConsumerCallback& cb){std::unique_lock<std::mutex> lock(_mtx);// 1.判断消费者重复添加for(const auto& consumer:_consumers){if(consumer->tag == ctag)return nullptr;}// 2.构造消费者对象并添加Consumer::ptr consumer = std::make_shared<Consumer>(ctag,_qname,ack_flag,cb);_consumers.push_back(consumer);return consumer;}// 移除消费者void remove(const std::string& ctag){std::unique_lock<std::mutex> lock(_mtx);for(auto it = _consumers.begin();it != _consumers.end();it++){if((*it)->tag == ctag){_consumers.erase(it);return;}}}

        获取当前轮询的消费者,决定消息的去向:

            // 获取当前轮询的消费者Consumer::ptr choose(){std::unique_lock<std::mutex> lock(_mtx);if(_consumers.size() == 0)return nullptr;return _consumers[_rr_ser++%_consumers.size()];;}

        取模是为了不会越界访问。

        其他的功能函数:

            // 判空bool empty(){std::unique_lock<std::mutex> lock(_mtx);return _consumers.empty();}// 判断消费者是否存在bool exists(const std::string& ctag){std::unique_lock<std::mutex> lock(_mtx);for(const auto& consumer:_consumers){if(consumer->tag == ctag)return true;}return false;}// 清理所有消费者void clear(){std::unique_lock<std::mutex> lock(_mtx);_consumers.clear();_rr_ser = 0;}

        析构函数调用clear()接口,这样当队列被删除时,也会删除队列的QueueConsumer对象,析构函数就自动清理数据了:

            ~QueueConsumer(){clear();}

 

         1-3.QueueConsumerManger类

        这个类用来管理服务器的所有队列的消费者。

        基本上就是对class QueueConsumer的封装,但是在队列执行自己的函数的时候不要上锁,因为每个队列是独立的。

    // 所有队列的消费者管理class QueueConsumerManger{public:using ptr = std::shared_ptr<QueueConsumerManger>;QueueConsumerManger(){}// 插入一个消费者管理队列void initQueueConsumer(const std::string& qname){std::unique_lock<std::mutex> lock(_mtx);// 1.判断重复auto it = _queue_consumers.find(qname);if(it != _queue_consumers.end()){return;}// 2.构造并插入QueueConsumer::ptr queueConsumer = std::make_shared<QueueConsumer>(qname);_queue_consumers.insert(std::make_pair(qname,queueConsumer));}// 移除一个消费者管理队列void destoryQueueConsumer(const std::string& qname){std::unique_lock<std::mutex> lock(_mtx);// 1.判断存在auto it = _queue_consumers.find(qname);if(it == _queue_consumers.end()){return;}// 2.移除_queue_consumers.erase(it);}// 向指定队列新增一个消费者Consumer::ptr create(const std::string& qname,const std::string& ctag,bool ack_flag,const ConsumerCallback& cb){QueueConsumer::ptr queue;{std::unique_lock<std::mutex> lock(_mtx);// 1.判断队列存在auto qit = _queue_consumers.find(qname);if(qit == _queue_consumers.end()){LOG("没有找到消费者管理队列 %s",qname.c_str());return nullptr;}queue = qit->second;}// 2.调用队列管理的插入return queue->create(ctag,ack_flag,cb);}// 从指定队列中移除一个消费者void remove(const std::string& qname,const std::string& ctag){QueueConsumer::ptr queue;{std::unique_lock<std::mutex> lock(_mtx);// 1.判断队列存在auto qit = _queue_consumers.find(qname);if(qit == _queue_consumers.end()){LOG("没有找到消费者管理队列 %s",qname.c_str());return;}queue = qit->second;}// 调用队列管理的移除queue->remove(ctag);}// 获取一个消费者管理队列的当前轮询消费者Consumer::ptr choose(const std::string& qname){QueueConsumer::ptr queue;{std::unique_lock<std::mutex> lock(_mtx);// 1.判断队列存在auto qit = _queue_consumers.find(qname);if(qit == _queue_consumers.end()){LOG("没有找到消费者管理队列 %s",qname.c_str());return nullptr;}queue = qit->second;}   return queue->choose();}// 判空bool empty(const std::string& qname){QueueConsumer::ptr queue;{std::unique_lock<std::mutex> lock(_mtx);// 1.判断队列存在auto qit = _queue_consumers.find(qname);if(qit == _queue_consumers.end()){LOG("没有找到消费者管理队列 %s",qname.c_str());return false;}queue = qit->second;}return queue->empty();}// 判断某个队列的某个消费者是否存在bool exists(const std::string& qname,const std::string& ctag){QueueConsumer::ptr queue;{std::unique_lock<std::mutex> lock(_mtx);// 1.判断队列存在auto qit = _queue_consumers.find(qname);if(qit == _queue_consumers.end()){LOG("没有找到消费者管理队列 %s",qname.c_str());return false;}queue = qit->second;}return queue->exists(ctag);}// 清理void clear(){std::unique_lock<std::mutex> lock(_mtx);_queue_consumers.clear();}size_t size(){return _queue_consumers.size();}private:std::unordered_map<std::string,QueueConsumer::ptr> _queue_consumers; std::mutex _mtx;};

        

第二节:单元测试

        打开mqtest目录,创建mq_consumer_test.cc,添加以下代码进行测试:

#include "../mqserver/mq_consumer.hpp"
#include <gtest/gtest.h>
#include <iostream>
#include <unordered_map>zd::QueueConsumerManger::ptr qcmp;
void cb(const std::string,const zd::BasicProperties*,const std::string){};
// 全局测试套件------------------------------------------------
// 自己初始化自己的环境,使不同单元测试之间解耦
class ConsumerTest :public testing::Environment
{
public:// 全部单元测试之前调用一次virtual void SetUp() override{// std::cout << "单元测试执行前的环境初始化" << std::endl;qcmp = std::make_shared<zd::QueueConsumerManger>();}   // 全部单元测试之后调用一次virtual void TearDown() override{// std::cout << "单元测试执行后的环境清理" << std::endl;// emp->clear();}
};// 单元测试
// 测试名称与类名称相同,则会先调用SetUp
// 测试队列的新增和移除
TEST(ConsumerTest,ConsumerTest_test1_Test)
{  std::cout << "单元测试-1" << std::endl;// 新增队列qcmp->initQueueConsumer("q1");ASSERT_EQ(qcmp->size(),1);qcmp->initQueueConsumer("q2");ASSERT_EQ(qcmp->size(),2);qcmp->initQueueConsumer("q3");ASSERT_EQ(qcmp->size(),3);qcmp->initQueueConsumer("q4");ASSERT_EQ(qcmp->size(),4);qcmp->initQueueConsumer("q5");ASSERT_EQ(qcmp->size(),5);qcmp->initQueueConsumer("q1");ASSERT_EQ(qcmp->size(),5);// 移除队列qcmp->destoryQueueConsumer("q2");ASSERT_EQ(qcmp->size(),4);qcmp->destoryQueueConsumer("q6");ASSERT_EQ(qcmp->size(),4); // q1 q3 q4 q5
}
// 测试消费者的新增和移除
TEST(ConsumerTest,ConsumerTest_test2_Test)
{std::cout << "单元测试-2" << std::endl;// 向队列新增消费者qcmp->create("q1","consumer-1",false,cb);qcmp->create("q1","consumer-2",false,cb);qcmp->create("q1","consumer-3",false,cb);qcmp->create("q1","consumer-4",false,cb);qcmp->create("q1","consumer-5",false,cb);ASSERT_EQ(qcmp->exists("q1","consumer-1"),true);ASSERT_EQ(qcmp->exists("q1","consumer-6"),false);// 从队列移除消费者qcmp->remove("q1","consumer-2");ASSERT_EQ(qcmp->exists("q1","consumer-2"),false);ASSERT_EQ(qcmp->exists("q2","consumer-2"),false);  // q2之前已经被移除了// q1:c1 c3 c4 c5
}
// 测试当前轮询接口
TEST(ConsumerTest,ConsumerTest_test3_Test)
{std::cout << "单元测试-3" << std::endl;zd::Consumer::ptr cp1 = qcmp->choose("q1");zd::Consumer::ptr cp2 = qcmp->choose("q1");zd::Consumer::ptr cp3 = qcmp->choose("q1");zd::Consumer::ptr cp4 = qcmp->choose("q1");std::cout << std::endl;std::cout << cp1->tag << " " << cp2->tag << " " << cp3->tag << " " << cp4->tag << std::endl;std::cout << std::endl;zd::Consumer::ptr cp5 = qcmp->choose("q3");ASSERT_EQ(cp5.get(),nullptr);
}
// 单元测试全部结束后调用TearDown// ----------------------------------------------------------
int main(int argc,char** argv)
{testing::InitGoogleTest(&argc,argv);testing::AddGlobalTestEnvironment(new ConsumerTest); // 注册Test的所有单元测试if(RUN_ALL_TESTS() != 0) // 运行所有单元测试{printf("单元测试失败!\n");}return 0;
}

        编译:

mq_consumer_test:mq_consumer_test.cc g++ -std=c++14 $^ -o $@ -lgtest -lprotobuf

        执行结果:

                ​​​​​​​        ​​​​​​​        

        没有错误,打印也符合预期。

        服务器消费者管理模块就完成了。

下期预告:

        完成消费者管理模块之后,下一个要完成的是信道管理模块。

        对于服务器来说,一个信道就可以是一个消费者。如果客户端创建了一个信道(c-1),客户端就会发送创建信道的请求给服务器,服务器收到请求后也会创建一个对应的信道(c-1')。当客户端使用信道进行订阅的时候,服务器的对应信道就会承担消费者的角色,服务器会把消息推送给这个信道,这个信道再把消息推送给客户端的信道。

        以上过程中,消息都被推送给了服务器的c-1',所以对于服务器来说,c-1'才是消费者,而不是真正使用消息的客户端信道c-1。


http://www.ppmy.cn/devtools/163466.html

相关文章

若依Vue,tab切换时实现缓存之前的查询条件、不刷新页面、不请求

1、菜单里面的路由地址和缓存 2、页面中对应路由地址&#xff0c;首字母大写驼峰命名

【Python爬虫(80)】当Python爬虫邂逅边缘计算:探索数据采集新境界

【Python爬虫】专栏简介:本专栏是 Python 爬虫领域的集大成之作,共 100 章节。从 Python 基础语法、爬虫入门知识讲起,深入探讨反爬虫、多线程、分布式等进阶技术。以大量实例为支撑,覆盖网页、图片、音频等各类数据爬取,还涉及数据处理与分析。无论是新手小白还是进阶开发…

SocketTool、串口调试助手、MQTT中间件基础

目录 一、SocketTool 二、串口通信 三、MQTT中间件 一、SocketTool 1、TCP 通信测试&#xff1a; 1&#xff09;创建 TCP Server 2&#xff09;创建 TCP Client 连接 Socket 4&#xff09;数据收发 在TCP Server发送数据12345 在 TCP Client 端的 Socket 即可收到数据12…

react使用拖拽,缩放组件,采用react-rnd解决

项目中需求&#xff0c;要求给商品图片添加促销标签&#xff0c;并且可拉伸大小&#xff0c;和拖拽位置 最后选择用react-rnd来实现 话不多说&#xff0c;直接上代码&#xff01;&#xff01;&#xff01; 1.在项目根目录下执行以下代码&#xff0c;引入react-rnd yarn add r…

【Arxiv 大模型最新进展】北大 Parenting 方法登场:参数魔法解锁检索增强语言模型新高度!

【Arxiv 大模型最新进展】北大 Parenting 方法登场&#xff1a;参数魔法解锁检索增强语言模型新高度&#xff01; &#x1f31f; 嗨&#xff0c;你好&#xff0c;我是 青松 &#xff01; &#x1f308; 自小刺头深草里&#xff0c;而今渐觉出蓬蒿。 NLP Github 项目推荐&#…

利用python和gpt写一个conda环境可视化管理工具

最近在学习python&#xff0c;由于不同的版本之间的差距较大&#xff0c;如果是用环境变量来配置python的话&#xff0c;会需要来回改&#xff0c;于是请教得知可以用conda来管理&#xff0c;但是conda在管理的时候老是要输入命令&#xff0c;感觉也很烦&#xff0c;于是让gpt帮…

windows下适用msvc编译ffmpeg 适用于ffmpeg-7.1

需要的工具: visual studio 2019 (可以是其他版本&#xff0c;只是本人电脑上装的为2019) msys2 ffmpeg-7.1源码 1. 修改msys2_shell.cmd 在msys2目录修改msys2_shell.cmd 打开后找到行set MSYS2_PATH_TYPEinherit 删除开头的rem 2. 运行msys2 运行x64 Native Tools Command …

电脑键盘知识

1、键盘四大功能区 1. 功能区 2. 主要信息输入区 3. 编辑区 4. 数字键盘区 笔记本电脑键盘的功能区&#xff0c;使用前需先按Fn键 1.1、功能区 ESC&#xff1a;退出 F1&#xff1a;显示帮助信息 F2&#xff1a;重命名 F4&#xff1a;重复上一步操作 F5&#xff1a;刷新网页 …