rabbitMq------信道管理模块

ops/2024/10/10 15:58:05/

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 信道管理的字段
  • 申明/删除交换机
  • 申明/删除队列
  • 绑定/解绑
  • 消息的发布
  • 消息确认
  • 订阅队列
  • 取消订阅
  • 信道内存管理类
    • 打开信道
    • 关闭信道/获取指定信道
  • 总结


前言

信道是在通信连接上更细粒度的一个划分,也就是一个通信连接上可以由多个信道,这些信道都是复用的同一条连接,为了充分利用资源。
在用户眼中没有了网络通信的概念了,相当于信道他屏蔽了底层的网络通信细节。
用户只需要使用信道提供的服务,不需要关心底层的网络通信细节。
在用户眼中信道和信道就是完全独立的。


信道管理的字段

需要有一个信道唯一标识。
信道关联的消费者,当信道关闭的时候,需要从消费者管理中销毁这个消费者。
信道关联的连接,在信道提供的操作中有一个订阅指定队列和给回复响应中。我们需要用到这个连接。
protobufCodec协议处理句柄和连接一样的用处。我们给客户端回复响应就是通过这个句柄中提供的send操作,他会为我们添加协议报头。
消费者管理句柄,信道提供的操作中需要使用到。
虚拟机管理局并,信道提供的操作中需要使用到。
线程池管理句柄,在收到消息后需要给客户端推送,把推送打包成一个任务,放入到线程池 。

class Channel{private:std::string _cid;                   // 信道唯一标识Consumer::ptr _consumer;            // 信道关联的消费者 muduo::net::TcpConnectionPtr _conn; // 信道关联的连接ProtobufCodecPtr _codec;            // protobuf协议处理句柄ConsumerManager::ptr _cmp;          // 消费者管理句柄VirtualHost::ptr _host;             // 虚拟机管理句柄ThreadPool::ptr _pool;              // 线程池管理句柄}

信道提供了10个操作供用户使用。分别是申明/删除交换机,申明/删除队列,
绑定/取消绑定,消息发布,消息确认,订阅队列和取消订阅。

申明/删除交换机

信道这里都是收到的一个一个的请求,我们从请求中提取所需字段,然后通过虚拟机句柄,消费者句柄来进行一个操作。

 // 声明/销毁交换机void declareExchange(const declareExchangeRequestPtr &req){bool ret = _host->declareExchange(req->exchange_name(), req->exchange_type(), req->durable(), req->auto_delete(), req->args());return basicResponse(ret, req->rid(), req->cid());}void deleteExchange(const deleteExchangeRequestPtr &req){_host->deleteExchange(req->exchange_name());return basicResponse(true, req->rid(), req->cid());}

可以看到信道会调用一个basicResponse接口来进行一个响应.
这个响应是通过protobufCodeC来进行的,通过他提供的一个send接口来进行发送,需要传入信道的连接和响应结构对象。

  // 给客户端回复响应
void basicResponse(bool ok, const std::string &rid, const std::string &cid)
{basicCommonResponse resp;resp.set_rid(rid);resp.set_cid(cid);resp.set_ok(ok);_codec->send(_conn, resp);
}

申明/删除队列

申明队列的时候也需要初始化队列消费者管理,队列信息管理已经在虚拟机管理中初始化了。
删除队列的时候也需要删除消费者管理对象。

// 创建/删除队列void declareQueue(const declareQueueRequestPtr &req){bool ret = _host->declareQueue(req->queue_name(), req->durable(), req->exclusive(), req->auto_delete(), req->args());if (ret == false){return basicResponse(false, req->rid(), req->cid());}// 初始化队列的消费者管理句柄_cmp->initQueueConsumer(req->queue_name());return basicResponse(true, req->rid(), req->cid());}void deleteQueue(const deleteQueueRequestPtr &req){//删除队列的同时也要删除队列的消费者_cmp->destroyQueueConsumer(req->queue_name());_host->deleteQueue(req->queue_name());return basicResponse(true, req->rid(), req->cid());}

绑定/解绑

// 绑定队列信息/解除绑定队列信息
void queueBind(const queueBindRequestPtr &req)
{_host->bind(req->exchange_name(), req->queue_name(), req->binding_key());return basicResponse(true, req->rid(), req->cid());
}
void queueUnBind(const queueUnBindRequestPtr &req)
{_host->unbind(req->exchange_name(), req->queue_name());return basicResponse(true, req->rid(), req->cid());
}

消息的发布

在服务端的信道收到了消息发布的请求后,需要获取到请求中的交换机字段,获取到交换机绑定的所有队列信息。然后通过路由匹配模块来进行匹配,匹配成功的队列就会通过虚拟机句柄进行消息发布操作,把消息插入到队列消息中的带推送链表中。

 // 消息的发布
void basicPublish(const basicPublishRequestPtr &req)
{// 获取要发布到的交换机Exchange::ptr ep = _host->selectExchange(req->exchange_name());if (ep == nullptr){return basicResponse(false, req->rid(), req->cid());}// 进行路由交换,判断消息可以发布到交换机绑定的哪个队列MsgQueueBindingMap mqbm = _host->getExchangeBindings(req->exchange_name());BasicProperties *properties = nullptr;std::string routing_key;if (req->has_properties()){properties = req->mutable_properties();routing_key = properties->routing_key();}for (auto &binding : mqbm){if (Router::route(ep->type, routing_key, binding.second->binding_key)){// DLOG("%d,routing_key:%s binding_key:%s",ep->type,routing_key.c_str(),binding.second->binding_key.c_str());// 将消息添加到队列消息中_host->basicPublish(binding.first, properties, req->body());// 向线程池中提添加一个消息消费任务(向指定队列订阅者推送消息)auto task = std::bind(&Channel::consume, this, binding.first);_pool->push(task);}}return basicResponse(true, req->rid(), req->cid());
}

然后向线程池中推送一个任务。这个任务就是向该队列的队列消费者进行消息的推送。首先从队列消息类中取出一条消息,然后从队列消费者中取出一个消费者。通过调用这个消费者中回调函数来进行一个时间推送。这个回调函数就是在订阅队列是服务器绑定的。最后如果消费者的确认应答标志位为1的话会进行消息确认。

 // 向指定队列的某个订阅者推送消息
void consume(const std::string &qname)
{// 1. 从队列中取出一条消息mq::MessagePtr mp = _host->basicConsume(qname);if (mp.get() == nullptr){DLOG("执行消费任务失败,%s 队列没有消息!", qname.c_str());return;}// 2. 从队列订阅者中取出一个订阅者mq::Consumer::ptr cp = _cmp->choose(qname);if (cp.get() == nullptr){DLOG("执行消费任务失败,%s 队列没有消费者!", qname.c_str());return;}// 3. 调用订阅者对应的消息处理函数,实现消息的推送cp->_cb(cp->_ctag, mp->mutable_payload()->mutable_properties(), mp->payload().body());// 4. 判断如果订阅者是自动确认---不需要等待确认,直接删除消息,否则需要外部收到消息确认后再删除if (cp->_auto_ack)_host->basicAck(qname, mp->payload().properties().id());
}

消息确认

就是通过虚拟机管理句柄调用队列消息提供的操作,

 // 消息的确认void basicAck(const basicAckRequestPtr &req){_host->basicAck(req->queue_name(), req->message_id());return basicResponse(true, req->rid(), req->cid());}

订阅队列

订阅客户端可以通过信道提供的basicConsume服务来订阅一个队列。

// 订阅队列
void basicConsume(const basicConsumeRequestPtr &req)
{// 判断队列是否存在bool ret = _host->existsQueue(req->queue_name());if (ret == false){return basicResponse(false, req->rid(), req->cid());}// 创建队列的消费者
auto cb = std::bind(&Channel::callback, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3);
_consumer = _cmp->create(req->queue_name(), req->consumer_tag(), req->auto_ack(), cb);
return basicResponse(true, req->rid(), req->cid());
}

在服务端就需要创建一个消费者。而消费者中有一个回调函数,这个回调函数就是服务器绑定的。这个回调函数就是往客户端推送消息。

 // 当有订阅队列请求来时,消费者的回调函数
void callback(const std::string &tag, const BasicProperties *bp, const std::string &body)
{basicConsumerResponse resp;resp.set_cid(_cid);resp.set_consumer_tag(tag);resp.set_body(body);if (bp){resp.mutable_properties()->set_id(bp->id());resp.mutable_properties()->set_delivery_mode(bp->delivery_mode());resp.mutable_properties()->set_routing_key(bp->routing_key());}_codec->send(_conn, resp);
}

取消订阅

取消订阅就是删除队列消费者中的指定消费者。

  // 取消订阅
void basicCancel(const basicCancelRequestPtr &req){_cmp->remove(req->consumer_tag(), req->queue_name());return basicResponse(true, req->rid(), req->cid());}

信道内存管理类

需要一个哈希表,信道唯一标识和信道对象的映射。
他提供打开信道,关闭信道和获取指定信道三个操作。

class ChannelManager
{
private:std::mutex _mutex;std::unordered_map<std::string, Channel::ptr> _channels;
}

打开信道

这些参数都是连接管理传递进来的。创建一个信道管理对象。

 bool openChannel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn,const ThreadPool::ptr &pool) {std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(id);if (it != _channels.end()) {DLOG("信道:%s 已经存在!", id.c_str());return false;}auto channel = std::make_shared<Channel>(id, host, cmp, codec, conn, pool);_channels.insert(std::make_pair(id, channel));return true;
}

关闭信道/获取指定信道

void closeChannel(const std::string &id){std::unique_lock<std::mutex> lock(_mutex);_channels.erase(id);}Channel::ptr getChannel(const std::string &id) {std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(id);if (it == _channels.end()) {return Channel::ptr();}return it->second;}

总结

这个信道管理是服务端的信道管理,而在客户端也会有一个信道,客户端的信道和服务端的信道是一一对应的。但他们的操作却不一样,客户端的信道是为用户提供服务,他屏蔽饿底层的网络细节,用户只需要调用信道提供的操作,不需要关心网络通信。而服务器的信道就是在真正进行业务处理的操作的。


http://www.ppmy.cn/ops/123568.html

相关文章

Vue3常用API总结

因为这个月的月初给自己定了个小目标&#xff0c;学完Vue3的基本使用&#xff0c;并使用Vue3亲手做一个小项目&#xff08;稍微透露一下&#xff0c;我制作的是一个小工具&#xff0c;现在已经完成了90&#xff05;了&#xff0c;这个月月底之前会通过博客的形式向大家展示&…

JMeter中线程组、HTTP请求的常见参数解释

在JMeter中&#xff0c;线程组和HTTP请求是进行性能测试的两个核心组件。以下是它们的一些常见相关参数的解释&#xff1a; 线程组参数 线程数 指定模拟的用户数&#xff0c;即并发执行的线程数。 Ramp-Up时间&#xff08;秒&#xff09; 指定所有线程启动的时间间隔。在这…

RISC-V笔记——基础

1. 前言 RISC-V旨在支持广泛的定制和专业化。RISC-V的ISA是由一个基本整型ISA和其它对基本ISA的可选扩展组成。每个整型ISA可以使用一个或多个可选的ISA扩展进行扩展。 基本整型ISA精选了最小的一组指令&#xff0c;这些指令足以为编译器、汇编器、链接器和操作系统提供足够的…

叉车毫米波雷达防撞技术,保护叉车作业安全

在叉车作业频繁的仓库与物流中心&#xff0c;安全隐患往往隐藏于细微之处&#xff0c;稍有不便可能引发重大事故。我们的叉车毫米波防撞系统方案&#xff0c;正是针对这一痛点而精心设计的创新之作。该系统通过集成的毫米波雷达技术&#xff0c;实现了对叉车周边环境的实时、精…

邦芒干货:职场高手的四大成功习惯

​​职场上竞争激烈&#xff0c;对员工的综合能力等方面要求越来越高。如果你自身不具备良好的竞争力&#xff0c;很难在职场上持续发展。 ​职场上那些厉害的高人们&#xff0c;都是怎么做到保持竞争力的呢&#xff1f;他们基本都有这几个习惯&#xff0c;看看你身上有没有。 …

鸿蒙开发之ArkUI 界面篇 二十八 ForEach

ForEach语法格式如下&#xff1a; ForEach(数组名字,(Item&#xff0c;index) >{item要做的事情}) 实现下图效果&#xff1a; 代码如下&#xff1a; Entry Component struct IndexTest {State titles:string[] [学鸿蒙,赢取白富美,走向人生巅峰,影音娱乐,海外旅游]build…

MySQL 实验1:Windows 环境下 MySQL5.5 安装与配置

MySQL 实验1&#xff1a;Windows 环境下 MySQL5.5 安装与配置 目录 MySQL 实验1&#xff1a;Windows 环境下 MySQL5.5 安装与配置一、MySQL 软件的下载二、安装 MySQL三、配置 MySQL1、配置环境变量2、安装并启动 MySQL 服务3、设置 MySQL 字符集4、为 root 用户设置登录密码 一…

是YOLOv3的网络构建

YOLOv3的网络构建是一个从配置文件&#xff08;cfg文件&#xff09;到创建模型并定义其前向传播过程的复杂过程&#xff0c;以下是详细说明&#xff1a; 1. cfg文件 作用和支持的层类型 在YOLOv3中&#xff0c;cfg文件用于描述网络结构。通过修改cfg文件&#xff0c;可以很容…