rabbitMq------客户端模块

news/2024/12/22 1:22:24/

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 消费者模块
  • 信道管理模块
    • 管理的字段
    • 提供的接口
  • 信道内存管理
  • 连接管理类


前言

在RabbitMQ中,提供服务的是信道,因此在客⼾端的实现中,弱化了Client客⼾端的概念。在RabbitMQ中并不会向⽤⼾展⽰⽹络通信的概念出来,⽽是以⼀种提供服务的形式来体现。实现思想类似于普通的功能接⼝封装,⼀个接⼝实现⼀个功能,接⼝内部完成向客⼾端请求的过程,但是对外并不需要体现出客⼾端与服务端通信的概念,⽤⼾需要什么服务就调⽤什么接⼝就⾏。


消费者模块

客户端这块对于消费者模块是不需要管理的,当进行消息订阅的时候,就会创建出一个消费者,而这消费者的作用就是:
• 描述当前信道订阅了哪个队列的消息。
• 描述了收到消息后该如何对这条消息进⾏处理。
• 描述收到消息后是否需要进⾏确认回复。

using ConsumerCallBack = std::function<void(const std::string&,BasicProperties *,const std::string &)>;struct Consumer{using ptr = std::shared_ptr<Consumer>;std::string _qname; //消费者订阅的队列名称std::string _ctag;  //消费者标识bool _auto_ack;  //自动应答标志ConsumerCallBack _cb;   }

信道管理模块

同样的客户端也有信道,其功能与服务端几乎一致,只不过客户端的信道是为用户提供服务的,而服务器的信道是为客户端的请求提供服务的。也可以理解是⽤⼾通过客⼾端channel的接⼝调⽤来向服务端发送对应请求,获取请求的服务。

管理的字段

客户端的信道需要管理的字段中有一个哈希表,是请求id和通用响应结构的映射。
因为在muduo库中发送和接收都是异步的,例如我们声明一个交换机,这个请求可能还在发送缓冲区,并没有发送,我们此时如果去给这个交换机推送消息就会出问题。因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应来进⾏同步。

class Channel{private:std::string _cid;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec; Consumer::ptr _consumer;//由于muduo库的发送和接收都是异步的,例如我们声明一个交换机,这个请求可能还在发送缓冲区,并没有发送,我们此时如果去给这个交换机推送消息就会出问题。因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应来进⾏同步std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string,basicCommonResponsePtr> _basic_resp;}

提供的接口

客户端的channel和服务端的接口都是几乎一致的,客户端的接口中是组织请求,向服务端发起请求,服务端的接口是接收请求进行业务处理。

但客户端中的channel提供了打开信道和关闭信道这俩个接口,他们是向服务端发起请求,在服务器上创建信道。

//这两个接口是向服务端发送请求,在服务端创建对应信道bool openChannel(){//组织请求openChannelRequest req;std::string rid = UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);//发送_codec->send(_conn,req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}void closeChannel(){//组织请求closeChannelRequest req;std::string rid = UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);//发送_codec->send(_conn,req);waitResponse(rid);return;}

其中每个接口中都需要调用waitResponse,用来同步等待响应。

basicCommonResponsePtr waitResponse(std::string &rid){std::unique_lock<std::mutex> lock(_mutex);_cv.wait(lock,[&rid,this](){return _basic_resp.find(rid) != _basic_resp.end();});basicCommonResponsePtr basic_resp = _basic_resp[rid];_basic_resp.erase(rid);return basic_resp;}

信道内存管理

对信道的一个总的管理类。
有一个哈希表,是信道ID和信道对象的映射。
他提供了三个接口,创建信道/删除信道/获取指定信道。

class ChannelManager{private:std::mutex _mutex;std::unordered_map<std::string,Channel::ptr> _channels;}

连接管理类

在客⼾端这边,RabbitMQ弱化了客⼾端的概念,因为⽤⼾所需的服务都是通过信道来提供的,因此操作思想转换为先创建连接,通过连接创建信道,通过信道提供服务这⼀流程。
这个模块同样是针对muduo库客⼾端连接的⼆次封装,向⽤⼾提供创建channel信道的接⼝,创建信道后,可以通过信道来获取指定服务。

class Connection {public:using ptr = std::shared_ptr<Connection>;
private:muduo::CountDownLatch _latch;//实现同步的muduo::net::TcpConnectionPtr _conn;//客户端对应的连接muduo::net::TcpClient _client;//客户端ProtobufDispatcher _dispatcher;//请求分发器ProtobufCodecPtr _codec;//协议处理器AsyncWorker::ptr _worker;ChannelManager::ptr _channel_manager;}

在客户端这边,会收到两种响应,一种是基础响应,一种是消息推送响应。
我们需要注册对于这两种类型响应的回调函数。

基础响应,在基础响应中有一个cid字段,根据cid获取指定的信道对象,
调用信道对象中的putBasicResponse接口。
也就是往信道的hashMap中添加响应值。

void basicResponse(const muduo::net::TcpConnectionPtr& conn, const basicCommonResponsePtr& message, muduo::Timestamp) {//1. 找到信道Channel::ptr channel = _channel_manager->get(message->cid());if (channel.get() == nullptr) {DLOG("未找到信道信息!");return;}//2. 将得到的响应对象,添加到信道的基础响应hash_map中channel->putBasicResponse(message);
}

消息推送,在收到消息推送的响应后,需要更具响应中的rid,获取指定的信道对象,然后封装一个任务,这个任务就是调用信道中的consume接口,这个接口就是执行消费者中设置的回调函数。我们把这个任务放入到线程池中。

void consumeResponse(const muduo::net::TcpConnectionPtr& conn, const basicConsumerResponsePtr& message, muduo::Timestamp){
//1. 找到信道Channel::ptr channel = _channel_manager->get(message->cid());if (channel.get() == nullptr) {DLOG("未找到信道信息!");return;}//2. 封装异步任务(消息处理任务),抛入线程池_worker->pool.push([channel, message](){channel->consume(message);});
}

连接管理提供了两个接口,打开信道和关闭信道。这是给用户提供的,用来创建一个信道,通过信道进行服务的调用。

 Channel::ptr openChannel() {Channel::ptr channel = _channel_manager->create(_conn, _codec);bool ret = channel->openChannel();if (ret == false) {DLOG("打开信道失败!");return Channel::ptr();}return channel;
}void closeChannel(const Channel::ptr &channel) {channel->closeChannel();_channel_manager->remove(channel->cid());
}

http://www.ppmy.cn/news/1534173.html

相关文章

Stable Diffusion绘画 | 插件-Deforum:动态视频生成

Deforum 与 AnimateDiff 不太一样&#xff0c; AnimateDiff 是生成丝滑变化视频的&#xff0c;而 Deforum 的丝滑程度远远没有 AnimateDiff 好。 它是根据对比前面一帧的画面&#xff0c;然后不断生成新的相似图片&#xff0c;来组合成一个完整的视频。 Deforum 的优点在于可…

Word办公自动化的一些方法

1.Word部分内容介绍 word本身是带有格式的一种文档&#xff0c;有人说它本质是XML&#xff0c;所以一定要充分利用标记了【样式】的特性来迅速调整【格式】&#xff0c;从而专心编辑文档内容本身。 样式&#xff08;集&#xff09; 编号&#xff08;多级关联样式编号&#xff…

游览器输入URL并Enter时都发生了什么 面试完美回答

文章目录 前言URL解析DNS解析**浏览器缓存****操作系统缓存**&#xff1a;**路由器缓存**&#xff1a;ISP&#xff08;Internet service provider&#xff09;缓存DNS递归解析IP地址的获取缓存结果 建立TCP连接发送HTTP请求服务器响应TCP链接断开渲染页面解析一 HTML解析过程解…

LeetCode[中等] 17. 电话号码的字母组合

给定一个仅包含数字 2-9 的字符串&#xff0c;返回所有它能表示的字母组合。答案可以按 任意顺序 返回。 给出数字到字母的映射如下&#xff08;与电话按键相同&#xff09;。注意 1 不对应任何字母。 思路 回溯法 log&#xff1a;当前结果数组&#xff1b;level&#xff1a…

【10】纯血鸿蒙HarmonyOS NEXT星河版开发0基础学习笔记-泛型基础全解(泛型函数、泛型接口、泛型类)及参数、接口补充

序言&#xff1a; 本文详细讲解了关于ArkTs语言中的泛型&#xff0c;其中包含泛型函数、泛型接口、泛型约束、泛型类及其中参数的使用方法&#xff0c;补充了一部分接口相关的知识&#xff0c;包括接口的继承和具体实现&#xff0c;也写到了一些边边角角的小知识&#xff0c;剩…

Spring Boot在甘肃非遗文化网站开发中的应用

2 相关技术 2.1 SSM框架介绍 本课题程序开发使用到的框架技术&#xff0c;英文名称缩写是SSM&#xff0c;在JavaWeb开发中使用的流行框架有SSH、SSM、SpringMVC等&#xff0c;作为一个课题程序采用SSH框架也可以&#xff0c;SSM框架也可以&#xff0c;SpringMVC也可以。SSH框架…

【RabbitMQ 项目】服务端:信道模块

文章目录 一.概念辨析1.什么是信道2.为什么要有信道3.怎么实现信道 二.实现思路1.定义信道2.定义连接信道管理类 三.代码实践 一.概念辨析 1.什么是信道 信道是在用户层的一个逻辑概念&#xff0c;是比 TCP 连接更加细粒度的通信通道&#xff0c;客户端想要和服务端通信&#…

05.useIsomorphicEffect

在 React 应用开发中,特别是涉及到**服务器端渲染(SSR)**时,正确处理副作用是一个常见挑战。useIsomorphicEffect 钩子提供了一种智能的方式来在服务器端和客户端环境中使用适当的副作用钩子。这个自定义钩子可以帮助开发者避免与 SSR 相关的常见陷阱,提高应用的性能和可靠…