【RabbitMQ 项目】服务端:信道模块

news/2024/10/5 5:56:17/

文章目录

  • 一.概念辨析
    • 1.什么是信道
    • 2.为什么要有信道
    • 3.怎么实现信道
  • 二.实现思路
    • 1.定义信道
    • 2.定义连接信道管理类
  • 三.代码实践

一.概念辨析

1.什么是信道

信道是在用户层的一个逻辑概念,是比 TCP 连接更加细粒度的通信通道,客户端想要和服务端通信,必须先创建一个信道,通过信道来请求各种各样的服务。服务端在收到客户端创建信道的请求后,也要在本地创建一个相应的信道,来给客户端提供对应的服务。
即客户端通过信道给用户提供服务,服务端通过信道给客户端提供服务,信道就是提供服务的窗口。

2.为什么要有信道

信道只是一个逻辑的概念,只是在用户层管理的一些数据结构,真正网络发送数据的还是 TCP 连接,一个信道必然要和一个 TCP 连接关联,一个 TCP 连接可以被多个信道使用。做这种更加细粒度的划分,是为了复用 TCP 连接,实现 TCP 长连接。
因为用户有时候想要创建多个通信的句柄,如果直接创建 TCP 连接,资源不能得到充分利用,频繁创建和关闭 TCP 连接也影响性能,用一个逻辑上的信道代替,多个信道复用一个 TCP 连接,使资源得到充分利用

3.怎么实现信道

信道是用户层的一个概念,怎么实现?

  1. 客户端和服务端都要有描述信道的数据结构并管理起来。
  2. 网络协议定义的所有请求和响应,都带有信道 id 这个字段,指明我要和你服务端上的哪个信道通信
    例如:
    在客户端打开信道的 Request,含义是我客户端创建一个信道,你服务端也要创建一个相应的信道
    在客户端关闭信道的 Request,含义是我客户端关闭了信道,你服务端也要关闭对应的
    在创建交换机,队列等的 Request,含义是我的请求来自客户端的这个信道,你服务端也要用对应的信道给我提供服务
    这样就实现了信道和信道之间的独立性,互不干扰

二.实现思路

1.定义信道

成员变量:

  1. 虚拟机句柄:用于本地业务处理
  2. 协议处理句柄:用于发送响应
  3. TCP 连接(Muduo 库中的 TCPConnection 的智能指针):给协议处理句柄发送响应
  4. 关联的多个消费者:因为信道关闭时,要把与之关联的消费者移除,防止内存泄漏。那么什么时候会新增关联的消费者?当本信道收到订阅队列的消息时候。说明一下,这个字段信道不一定会用到,因为这个信道也有可能是为生产客户端服务的,生产客户端没有消费者
  5. 消费者管理句柄:因为涉及到新增消费者和删除消费者,所以需要消费者管理句柄
  6. 线程池句柄:消费消息的任务属于支线任务,muduo 库中的工作线程不想自己来做,因为它主要工作是监控 IO 事件以及从接收缓冲区读数据,所以消费消息的任务让线程池来做
    成员方法:
  7. 交换机声明与删除:先使用虚拟机句柄业务处理,再构建响应用协议处理句柄发送
  8. 队列声明与删除:与交换机声明删除类似,但是要声明队列还要初始化队列消费者管理句柄,删除队列后还要移除订阅该队列的消费者
  9. 绑定与解绑:绑定前需要判断 bingKey 是否合法
  10. 发布消息:先用虚拟机句柄处理业务,然后把消费消息的任务丢进线程池。怎么消费?先用消费者管理句柄,选出订阅该队列的消费者,然后调用消费者的回调函数,构建响应并发送
  11. 订阅队列:当信道收到订阅队列的请求,间接说明这个信道是给消费客户端服务的,因为生产客户端不会发送这样的请求。要做的就是用消费者管理句柄新增一个消费者,同时添加信道关联的消费者
  12. 取消订阅:和订阅队列做相反的工作
  13. 确认应答:用虚拟机句柄确认应答,再构建响应并发送
  14. 析构函数:使用消费者管理句柄删除信道关联的所有消费者

2.定义连接信道管理类

前置说明:
信道我们以连接为单位进行管理,并不提供所有信道的管理类,没有意义,因为信道是依托于连接的,连接没了,其中的所有信道也就没了

成员变量:

  1. < 信道 id,信道智能指针 > 的哈希表
    成员方法:
  2. 添加信道
  3. 删除信道
  4. 根据 id 获取信道
    补充说明:
    不实现这个管理类也可以,直接在 Connection 中使用哈希表组织信道更直观。因为信道是只依附于 Connection 的,不像消费者,它既和队列关联,又和信道关联,我们不得不选择一个专门的类来管理消费者并且内部以队列为单元组织,方便推送消息时选择一个消费者

三.代码实践

#pragma once
#include "VirtualHost.hpp"
#include "Consumer.hpp"
#include "muduo/net/TcpConnection.h"
#include "muduo/protobuf/codec.h"
#include "../common/ThreadPool.hpp"
#include "../common/protocol.pb.h"
#include "Route.hpp"
#include <mutex>namespace ns_channel
{class Channel;using ChannelPtr = std::shared_ptr<Channel>;using VirtualHostPtr = std::shared_ptr<ns_data::VirtualHost>;using ConsumerManagerPtr = std::shared_ptr<ns_consumer::ConsumerManager>;using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;class Channel{private:std::string _id;std::unordered_map<std::string, ns_consumer::ConsumerPtr> _consumers; // 一个信道内可能关联多个消费者,信道关闭时需要把与之关联的消费者都删除muduo::net::TcpConnectionPtr _connPtr;                                // 本地业务处理完后,要构建响应发送ProtobufCodecPtr _codecPtr;                                           // 构建响应后要添加协议数据VirtualHostPtr _vhPtr;                                                // 本地业务处理ConsumerManagerPtr _consumerManagerPtr;                               // 信道关闭时要删除消费者,队列有新消息时要选择消费者ThreadPoolPtr _threadPoolPtr;                                         // 处理推送消息的任务std::mutex _mtx;                                                      // 用来保护_consumerspublic:Channel(const std::string &id, const muduo::net::TcpConnectionPtr &connPtr, const ProtobufCodecPtr &codecPtr,const VirtualHostPtr &vhPtr, const ConsumerManagerPtr &consumerManagerPtr, const ThreadPoolPtr &threadPoolPtr): _id(id),_consumers(),_connPtr(connPtr),_codecPtr(codecPtr),_vhPtr(vhPtr),_consumerManagerPtr(consumerManagerPtr),_threadPoolPtr(threadPoolPtr),_mtx(){}~Channel(){LOG(DEBUG) << "channel析构" << endl;for (auto &kv : _consumers){_consumerManagerPtr->removeConsumer(kv.second->_qname, kv.second->_id);LOG(DEBUG) << "由于信道关闭,关联的消费者也被移除,consumerId: " << kv.second->_id << endl;}}/************* 以下用于处理生产客户端的请求* ***********/void declareExchange(const ns_protocol::DeclareExchangeRequest &req){// 业务处理bool ret = _vhPtr->declareExchange(req.exchange_name(), req.exchange_type(), req.is_durable());// 响应sendCommonResponse(req.request_id(), ret);}void deleteExchange(const ns_protocol::DeleteExchangeRequest &req){_vhPtr->deleteExchange(req.exchange_name());sendCommonResponse(req.request_id(), true);}/************** 声明队列* 记得要初始化队列消费者管理句柄* ***********/void declareMsgQueue(const ns_protocol::DeclareMsgQueueRequest &req){// 业务处理bool ret = _vhPtr->declareMsgQueue(req.queue_name(), req.is_durable());if (ret){_consumerManagerPtr->initQueueConsumerManager(req.queue_name());}// 响应sendCommonResponse(req.request_id(), ret);}/**************** 删除队列* 记得要删除队列关联的消费者* *************/void deleteMsgQueue(const ns_protocol::DeleteMsgQueueRequest &req){_vhPtr->deleteMsgQueue(req.queue_name());_consumerManagerPtr->removeQueueConsumerManager(req.queue_name());sendCommonResponse(req.request_id(), true);}/*********** 绑定与解绑* ************/void bind(const ns_protocol::BindRequest &req){if (!ns_route::Router::isLegalBindingKey(req.binding_key())){LOG(INFO) << "Binding的bindingKey非法, bindingKey: " << req.binding_key() << endl;sendCommonResponse(req.request_id(), false);return;}bool ret = _vhPtr->bind(req.ename(), req.qname(), req.binding_key());sendCommonResponse(req.request_id(), ret);}void unbind(const ns_protocol::UnbindRequest &req){_vhPtr->unbind(req.ename(), req.qname());sendCommonResponse(req.request_id(), true);}void publishMessage(const ns_protocol::PublishMessageRequest &req){const std::string ename = req.exchange_name();// 获取交换机相关的所有绑定std::unordered_map<std::string, ns_data::BindingPtr> bindings;bool isExchangeExists = _vhPtr->getExchangeBindings(ename, &bindings);if (!isExchangeExists){LOG(INFO) << "用户将消息发布到一个不存在的交换机上, exchangeName: " << ename << endl;sendCommonResponse(req.request_id(), false);return;}// 判断routingKey是否合法const std::string &routingKey = req.msg().saved_info().routing_key();if (!ns_route::Router::isLegalRoutingKey(routingKey)){LOG(INFO) << "消息的routingKey不合法, routingKey: " << routingKey << endl;sendCommonResponse(req.request_id(), false);return;}// 交换路由,选出队列并发布auto exchangePtr = _vhPtr->getExchange(ename);assert(exchangePtr);ns_protocol::ExchangeType type = exchangePtr->_type;for (const auto &kv : bindings){auto &bindingPtr = kv.second;if (ns_route::Router::isMatched(routingKey, bindingPtr->_bindingKey, type)){LOG(DEBUG) << "消息路由到" << kv.first << "上, " << routingKey << "<=>" << bindingPtr->_bindingKey << endl;const auto &msg = req.msg();const std::string &qname = kv.first;_vhPtr->publish(qname, msg.saved_info().id(), routingKey,msg.saved_info().body(), msg.saved_info().delivery_mode());// 把消费的任务交给线程池auto task = std::bind(&Channel::consume, this, qname);_threadPoolPtr->push(task);}}sendCommonResponse(req.request_id(), true);}/************ 以下用于处理消费客户端请求* **************/void subscribeQueue(ns_protocol::SubscribeQueueRequest &req){std::unique_lock<std::mutex> lck(_mtx);if (_consumers.count(req.consumer_id())){sendCommonResponse(req.request_id(), true);}auto consumerPtr = _consumerManagerPtr->addConsumer(req.consumer_id(),req.qname(),std::bind(&Channel::consumerCallback, this,std::placeholders::_1, std::placeholders::_2,std::placeholders::_3),req.auto_ack());if (consumerPtr == nullptr){sendCommonResponse(req.request_id(), false);}else{_consumers[consumerPtr->_id] = consumerPtr;LOG(DEBUG) << "信道新增关联消费者, consumerId: " << consumerPtr->_id << endl;sendCommonResponse(req.request_id(), true);}// 把消费的任务交给线程池auto task = std::bind(&Channel::consume, this, req.qname());_threadPoolPtr->push(task);}void cancelSubscribe(ns_protocol::CancelSubscribeRequest &req){_consumerManagerPtr->removeConsumer(req.qname(), req.consumer_id());std::unique_lock<std::mutex> lck(_mtx);_consumers.erase(req.consumer_id());sendCommonResponse(req.request_id(), true);}void ackMessage(ns_protocol::AckRequest &req){_vhPtr->ack(req.qname(), req.msg_id());sendCommonResponse(req.request_id(), true);}private:void sendCommonResponse(const std::string &responseId, bool ok){ns_protocol::CommomResponse resp;resp.set_channel_id(_id);resp.set_response_id(responseId);resp.set_ok(ok);_codecPtr->send(_connPtr, resp);}/************** 这个任务是给线程池做的* 消费队列消息:从队列中取出一条消息,选择一个订阅队列的消费者,然后推动给它* 注意该接口会多次消费,直到队列消息被消费完* *******************/void consume(const std::string &qname){while (true){auto msgPtr = _vhPtr->consume(qname); // 线程安全的if (msgPtr == nullptr){return;}// 选择一个消费者auto consumerPtr = _consumerManagerPtr->chooseConsumer(qname);if (consumerPtr == nullptr){return;}// 让消费者去处理消息并发送consumerPtr->_callback(consumerPtr->_qname, consumerPtr->_id, msgPtr);// 如果该消费者是自动应答则立马ackif (consumerPtr->_autoAck){_vhPtr->ack(qname, msgPtr->saved_info().id());}}}void consumerCallback(const std::string &qname, const std::string &consumerId, ns_data::MessagePtr msgPtr){ns_protocol::PushMessageResponse resp;resp.set_channel_id(_id);resp.set_consumer_id(consumerId);resp.set_qname(qname);resp.mutable_msg()->mutable_saved_info()->set_id(msgPtr->saved_info().id());resp.mutable_msg()->mutable_saved_info()->set_body(msgPtr->saved_info().body());resp.mutable_msg()->mutable_saved_info()->set_delivery_mode(msgPtr->saved_info().delivery_mode());resp.mutable_msg()->mutable_saved_info()->set_routing_key(msgPtr->saved_info().routing_key());_codecPtr->send(_connPtr, resp);}};/******************************* 信道管理句柄,注意以Connection为单元进行管理* ***************************/class ConnectionChannelManager{private:std::mutex _mtx;std::unordered_map<std::string, ChannelPtr> _channels;public:ConnectionChannelManager(){}bool openChannel(const std::string &id, const muduo::net::TcpConnectionPtr &connPtr,const ProtobufCodecPtr &codecPtr, const VirtualHostPtr &vhPtr,const ConsumerManagerPtr &consumerManagerPtr, const ThreadPoolPtr &threadPoolPtr){std::unique_lock<std::mutex> lck(_mtx);if (_channels.count(id)){LOG(WARNING) << "信道已经打开, channelId: " << id << endl;return false;}auto channelPtr = std::make_shared<Channel>(id, connPtr, codecPtr, vhPtr, consumerManagerPtr, threadPoolPtr);_channels[id] = channelPtr;return true;}void closeChannel(const std::string &id){std::unique_lock<std::mutex> lck(_mtx);_channels.erase(id);}ChannelPtr getChannel(const std::string &id){std::unique_lock<std::mutex> lck(_mtx);if (_channels.count(id) == 0){LOG(WARNING) << "信道不存在, channelId: " << id;return nullptr;}return _channels[id];}};
}

http://www.ppmy.cn/news/1534165.html

相关文章

05.useIsomorphicEffect

在 React 应用开发中,特别是涉及到**服务器端渲染(SSR)**时,正确处理副作用是一个常见挑战。useIsomorphicEffect 钩子提供了一种智能的方式来在服务器端和客户端环境中使用适当的副作用钩子。这个自定义钩子可以帮助开发者避免与 SSR 相关的常见陷阱,提高应用的性能和可靠…

vue2与vue3知识点

1.vue2&#xff08;optionsAPI&#xff09;选项式API 2.vue3&#xff08;composition API&#xff09;响应式API vue3 setup 中this是未定义&#xff08;undefined&#xff09;vue3中已经开始弱化this vue2通过this可以拿到vue3setup定义得值和方法 setup语法糖 ref > …

vmvare虚拟机centos 忘记超级管理员密码怎么办?

vmvare虚拟机centos 忘记超级管理员密码怎么办?如何重置密码呢? 一、前置操作 重启vmvare虚拟机的过程中,长按住Shift键 选择第一个的时候,按下按键 e 进入编辑状态。 然后就会进入到类似这个界面中。 在下方界面 添加 init=/bin/sh,然后按下Ctrl+x进行保存退出。 init=/bi…

UI设计师面试整理-面试中的实际设计挑战

在UI设计师的面试中,面试官常常会给出一个实际设计挑战。这类挑战旨在评估你的设计思维、解决问题的能力、创意以及你如何在有限的时间内应对设计任务。应对这些挑战需要冷静、条理清晰和策略性地展示你的技能。以下是如何准备和应对面试中的实际设计挑战的建议: 1. 理解设计…

代码随想录算法训练营DAY10之动态规划(二)背包问题

01背包理论基础 406、分割等和子集 力扣题目链接 题目描述 给定一个只包含正整数的非空数组。是否可以将这个数组分割成两个子集&#xff0c;使得两个子集的元素和相等。 注意: 每个数组中的元素不会超过 100 数组的大小不会超过 200 示例 1: 输入: [1, 5, 11, 5]输出: true …

UniVue大版本更新:UniVue2.0.0-preview

大版本发布说明 距离上次更新好像已经过去很久了&#xff0c;最近太忙了没时间维护新版本&#xff0c;也是自己在使用的过程中发现了很多问题也有了更多的灵感&#xff0c;由于和之前的版本区别太大&#xff0c;决定重新开一个大版本。这个UniVue2之后的版本追求是性能&#xf…

鸿蒙 HarmonyNext 与 Flutter 的异同之处

HarmonyNext 是华为推出的面向未来的应用开发框架&#xff0c;依托于鸿蒙&#xff08;HarmonyOS&#xff09;生态系统&#xff0c;特别适用于多设备协同、物联网&#xff08;IoT&#xff09;等场景。Flutter 是 Google 开发的跨平台 UI 框架&#xff0c;旨在通过单套代码运行在…

React -AppVarContext.Provider 提供者组件

AppVarContext.Provider 是一个 React 上下文提供者&#xff0c;通常用于在组件树中提供共享的状态或数据。下面将详细解释 AppVarContext.Provider 的作用和如何使用它。展示如何使用 AppVarContext.Provider 来管理全局状态 1. 什么是上下文&#xff08;Context&#xff09;…