【RabbitMQ 项目】服务端:服务器模块

news/2024/10/17 23:35:46/

文章目录

  • 一.编写思路
  • 二.代码实践
  • 三.服务端模块关系总结

一.编写思路

成员变量:

  1. muduo 库中的 TCP 服务器
  2. EventLoop 对象:用于主线程循环监控连接事件
  3. 协议处理句柄
  4. 分发器:用于初始化协议处理器,便于把不同请求派发给不同的业务处理函数
  5. 连接管理句柄
  6. 虚拟机句柄
  7. 消费者管理句柄
  8. 线程池管理句柄
    成员方法:
    向外提供的只有 2 个方法:
  9. start:启动服务
  10. 构造函数:
  • 完成各项成员的初始化,
  • 注册 TCP 服务器的两个回调函数:
    OnMessage:从接收缓冲区把数据读到用户缓冲区后的回调函数
    OnConnection:Tcp 连接建立或断开时的回调函数
  • 给分发器注册业务处理函数(私有成员方法,共 12 个)
    信道打开与与关闭;交换机,队列,绑定添加与删除,订阅与取消订阅,发布与确认消息
    私有成员(业务处理函数):
    如果是创建或关闭信道,直接用连接管理句柄新增或删除信道,然后构建响应返回
    如果是其他请求,先用连接管理句柄找到信道(请求中携带了信道 id),再使用信道提供的服务

二.代码实践

BrokerServer.hpp:

#pragma once
#include "muduo/protobuf/codec.h"
#include "muduo/protobuf/dispatcher.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"#include "VirtualHost.hpp"
#include "Connection.hpp"
#include "Consumer.hpp"
#include <functional>
#include <stdio.h>
#include <unistd.h>namespace ns_server
{using ConnectionManagerPtr = std::shared_ptr<ns_connection::ConnectionManager>;using VirtualHostPtr = std::shared_ptr<ns_data::VirtualHost>;using ConsumerManagerPtr = std::shared_ptr<ns_consumer::ConsumerManager>;using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;/************* 定义协议的结构化数据的智能指针(在分发器中注册时需要的格式)* *************/using OpenChannelRequestPtr = std::shared_ptr<ns_protocol::OpenChannelRequest>;using CloseChannelRequestPtr = std::shared_ptr<ns_protocol::CloseChannelRequest>;using DeclareExchangeRequestPtr = std::shared_ptr<ns_protocol::DeclareExchangeRequest>;using DeleteExchangeRequestPtr = std::shared_ptr<ns_protocol::DeleteExchangeRequest>;using DeclareMsgQueueRequestPtr = std::shared_ptr<ns_protocol::DeclareMsgQueueRequest>;using DeleteMsgQueueRequestPtr = std::shared_ptr<ns_protocol::DeleteMsgQueueRequest>;using BindRequestPtr = std::shared_ptr<ns_protocol::BindRequest>;using UnbindRequestPtr = std::shared_ptr<ns_protocol::UnbindRequest>;using PublishMessageRequestPtr = std::shared_ptr<ns_protocol::PublishMessageRequest>;using SubscribeQueueRequestPtr = std::shared_ptr<ns_protocol::SubscribeQueueRequest>;using CancelSubscribeRequestPtr = std::shared_ptr<ns_protocol::CancelSubscribeRequest>;using AckRequestPtr = std::shared_ptr<ns_protocol::AckRequest>;class BrokerServer{public:private:muduo::net::EventLoop _baseLoop;muduo::net::TcpServer _server;ProtobufDispatcher _dispatcher;ProtobufCodecPtr _codecPtr;VirtualHostPtr _vhPtr;ConsumerManagerPtr _consumerManagerPtr;ConnectionManagerPtr _connManagerPtr;ThreadPoolPtr _threadPoolPtr;public:BrokerServer(int serverPort, const std::string &dbName, const std::string &msgDir): _baseLoop(),_server(&_baseLoop, muduo::net::InetAddress("0.0.0.0", serverPort), "TcpServer", muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&BrokerServer::onUnknownMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3)){// 初始化成员_codecPtr = std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage,&_dispatcher, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3));_vhPtr = std::make_shared<ns_data::VirtualHost>(dbName, msgDir);_threadPoolPtr = std::make_shared<ns_tp::ThreadPool>();_threadPoolPtr->start();std::vector<std::string> qnames;_vhPtr->getAllQueueName(&qnames);_consumerManagerPtr = std::make_shared<ns_consumer::ConsumerManager>(qnames);_connManagerPtr = std::make_shared<ns_connection::ConnectionManager>();// 给_server注册两个回调函数_server.setConnectionCallback(std::bind(&BrokerServer::onConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codecPtr.get(), std::placeholders::_1,std::placeholders::_2, std::placeholders::_3));// 给分发器注册业务处理函数_dispatcher.registerMessageCallback<ns_protocol::OpenChannelRequest>(std::bind(&BrokerServer::onOpenChannel,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::CloseChannelRequest>(std::bind(&BrokerServer::onCloseChannel,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::DeclareExchangeRequest>(std::bind(&BrokerServer::onDeclareExchange,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::DeleteExchangeRequest>(std::bind(&BrokerServer::onDeleteExchange,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::DeclareMsgQueueRequest>(std::bind(&BrokerServer::onDeclareMsgQueue,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::DeleteMsgQueueRequest>(std::bind(&BrokerServer::onDeleteMsgQueue,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::BindRequest>(std::bind(&BrokerServer::onBind,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::UnbindRequest>(std::bind(&BrokerServer::onUnbind,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::SubscribeQueueRequest>(std::bind(&BrokerServer::onSubScribe,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::CancelSubscribeRequest>(std::bind(&BrokerServer::onCancelSubScribe,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::PublishMessageRequest>(std::bind(&BrokerServer::onPublishMessage,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ns_protocol::AckRequest>(std::bind(&BrokerServer::onAck,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));}void start(){// 开启监听状态_server.start();// 开始循环监控事件_baseLoop.loop();}private:// 给TcpServer设置的回调函数void onConnection(const muduo::net::TcpConnectionPtr &connPtr){if (connPtr->connected()){_connManagerPtr->newConnection(connPtr, _codecPtr, _vhPtr, _consumerManagerPtr, _threadPoolPtr);}else{_connManagerPtr->deleteConnection(connPtr);}}// 业务处理函数void onUnknownMessage(const muduo::net::TcpConnectionPtr &connPtr, MessagePtr msgPtr, muduo::Timestamp time){cout << "未知消息" << endl;connPtr->shutdown();}/************* 信道创建与删除* ***************/void onOpenChannel(const muduo::net::TcpConnectionPtr &connPtr, const OpenChannelRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "打开信道时, 未找到Connection" << endl;return;}myConnPtr->openChannel(*reqPtr);LOG(DEBUG) << "create new channel, channelId: " << reqPtr->channel_id() << endl;}void onCloseChannel(const muduo::net::TcpConnectionPtr &connPtr, const CloseChannelRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "关闭信道时, 未找到Connection" << endl;return;}myConnPtr->closeChannel(*reqPtr);LOG(DEBUG) << "close channel, channelId: " << reqPtr->channel_id() << endl;}/********** 交换机声明与删除* ********/void onDeclareExchange(const muduo::net::TcpConnectionPtr &connPtr, const DeclareExchangeRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "声明交换机时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->declareExchange(*reqPtr);LOG(DEBUG) << "声明交换机, exchangeName: " << reqPtr->exchange_name() << endl;}void onDeleteExchange(const muduo::net::TcpConnectionPtr &connPtr, const DeleteExchangeRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "删除交换机时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->deleteExchange(*reqPtr);LOG(DEBUG) << "删除信道, exchangeName: " << reqPtr->exchange_name() << endl;}/************* 队列声明与删除* ***************/void onDeclareMsgQueue(const muduo::net::TcpConnectionPtr &connPtr, const DeclareMsgQueueRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "声明队列时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->declareMsgQueue(*reqPtr);LOG(DEBUG) << "声明队列, queueName: " << reqPtr->queue_name() << endl;}void onDeleteMsgQueue(const muduo::net::TcpConnectionPtr &connPtr, const DeleteMsgQueueRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "删除队列时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->deleteMsgQueue(*reqPtr);LOG(DEBUG) << "删除队列, queueName: " << reqPtr->queue_name() << endl;}/*********** 绑定与解绑* ***********/void onBind(const muduo::net::TcpConnectionPtr &connPtr, const BindRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "添加绑定时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->bind(*reqPtr);LOG(DEBUG) << "绑定: " << reqPtr->ename() << "->" << reqPtr->qname() << ": " << reqPtr->binding_key() << endl;}void onUnbind(const muduo::net::TcpConnectionPtr &connPtr, const UnbindRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "删除绑定时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->unbind(*reqPtr);LOG(DEBUG) << "解绑: " << reqPtr->ename() << "->" << reqPtr->qname() << endl;}/************** 订阅与取消订阅* ************/void onSubScribe(const muduo::net::TcpConnectionPtr &connPtr, const SubscribeQueueRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "订阅队列时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->subscribeQueue(*reqPtr);LOG(DEBUG) << "订阅队列" << ", qname: " << reqPtr->qname() << endl;}void onCancelSubScribe(const muduo::net::TcpConnectionPtr &connPtr, const CancelSubscribeRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "取消订阅队列时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->cancelSubscribe(*reqPtr);LOG(DEBUG) << "取消订阅队列" << ", qname: " << reqPtr->qname() << endl;}/********* 发布与应答* **************/void onPublishMessage(const muduo::net::TcpConnectionPtr &connPtr, const PublishMessageRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "发布消息时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->publishMessage(*reqPtr);LOG(DEBUG) << "publish message: " << reqPtr->msg().saved_info().body() << endl;}void onAck(const muduo::net::TcpConnectionPtr &connPtr, const AckRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if (myConnPtr == nullptr){LOG(WARNING) << "确认消息时, 未找到Connection" << endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if (channelPtr == nullptr){LOG(WARNING) << "没有找到信道" << endl;return;}channelPtr->ackMessage(*reqPtr);LOG(DEBUG) << "应答消息, msgId: " << reqPtr->msg_id() << endl;}void sendCommonResponse(const muduo::net::TcpConnectionPtr &connPtr, const std::string &channelId,const std::string &responseId, bool ok){ns_protocol::CommomResponse resp;resp.set_channel_id(channelId);resp.set_response_id(responseId);resp.set_ok(ok);_codecPtr->send(connPtr, resp);}};
}

三.服务端模块关系总结

在这里插入图片描述


http://www.ppmy.cn/news/1532666.html

相关文章

数据交易平台中,怎样用大模型做数据集质量的评估

目录 大模型做数据集质量评估 场景设定 步骤一:数据预处理 步骤二:利用大模型进行质量评估 示例说明 数据交易平台中,怎样用大模型做数据集质量的评估 场景设定 步骤一:数据预处理 步骤二:选择大模型 步骤三:定义评估指标 步骤四:利用大模型进行评估 示例说明…

柯桥小语种学习英语口语培训|被点名时,中文喊“到”,那英文喊什么?

"今日体育课&#xff0c;张老师准时点名。阳光下&#xff0c;同学们精神抖擞&#xff0c;一一应答到。课堂氛围活跃&#xff0c;准备充分&#xff0c;期待精彩训练。"被点名时&#xff0c;中文喊“到”&#xff0c;那英文喊什么&#xff1f; “到”用英语怎么说&…

计算机网络(第二章 物理层)

文章目录 1.物理层的基本概念2.数据通信的基础知识2.1数据通信系统模型2.2有关信道的基本概念2.3信道极限容量 3.物理层3.2引导性传输媒体3.3非引导性传输媒体 4.信道复用技术4.1频分复用、时分复用和统计时分复用4.2波分复用 5.宽带接入技术 本文首先讨论物理层的基本概念。然…

(c++)局部(全局)、常量(变量)、静态变量在内存中的存放位置

//内存四区&#xff1a;1.代码区 2.全局区 3.栈区 4.堆区 1.放在代码区的有&#xff1a;1.写的代码 2.放在全局区的有&#xff1a;1.全局的&#xff08;变量或常量&#xff09; 2.静态的&#xff08;变量或常量&#xff09; 3.字符串常量 3.在栈区的有&#xff1a;1.局部…

TI DSP TMS320F280025 Note14:模数转换器ADC原理分析与应用

TMS320F280025 模数转换器ADC原理分析与应用 ` 文章目录 TMS320F280025 模数转换器ADC原理分析与应用逐次比较型ADC和双积分型ADC工作原理逐次比较型 ADC双积分型 ADC280025ADCADC原理分析ADC时钟SOCSOC内部原理ADC触发方式ADC采集(采样和保持)窗口通道寄生电容基准电压发生器模…

ADRC线性跟踪微分器TD详细测试(Simulink 算法框图+CODESYS ST+博途SCL完整源代码)

1、ADRC线性跟踪微分器 ADRC线性跟踪微分器(ST+SCL语言)_adrc算法在博途编程中scl语言-CSDN博客文章浏览阅读784次。本文介绍了ADRC线性跟踪微分器的算法和源代码,包括在SMART PLC和H5U平台上的实现。文章提供了ST和SCL语言的详细代码,并讨论了跟踪微分器在自动控制中的作用…

自动化学习1:pytest自动化框架的基本用法:注意事项/断言assert/测试结果分析

一.注意事项&#xff1a; ①创建test开头的文件&#xff08;test_&#xff09;/类/函数或方法 ②pytest中以每一个函数或方法&#xff0c;作为用例 ③pytest启动方式&#xff1a;pytest def test01(): # 函数&#xff08;写在类外边是函数&#xff09;passclass Test:def t…

CSV数据行(取值)的列数多于表头字段数-Pandas无法正常读取

CSV数据行(取值)的列数多于表头字段数-Pandas无法正常读取 问题描述&#xff1a;在使用Pandas正常读取csv文件时&#xff0c;报错提示“ ParserError: Error tokenizing data. C error: Expected 460 fields in line 3363, saw 472”。也就是数据行的值个数多于表头字段个数。…