第十三章:服务器模块的整合

devtools/2025/3/1 1:08:47/

目录

第一节:基本构架

        1-1.成员变量

        1-2.构造函数

        1-3.start 函数

        1-4.onConnection 函数

        1-5.onUnknowMessage

        1-6.绑定回调函数

第二节:业务处理函数

        2-1.函数声明

        2-2.函数实现 

        2-3.函数绑定

下期预告:


        服务器模块在mqserver目录下实现。

第一节:基本构架

        服务器模块提供的业务函数都来自于3个子模块,它自己也有几个接口需要实现,现将它的基本框架搭建出来,然后再添加这些业务处理函数。 

        创建mq_server.hpp文件,添加以下内容:

#ifndef __M_TCPSERVER_H__
#define __M_TCPSERVER_H__
// 服务器所需文件
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"
// 子模块
#include "mq_virtualhost.hpp"
#include "mq_connection.hpp"
#include "mq_consumer.hpp"#include "../mqcommon/mq_msg.pb.h"
#include "../mqcommon/mq_proto.pb.h"namespace zd
{};#endif

        1-1.成员变量

    private:muduo::net::EventLoop _baseloop; // IO监听器muduo::net::TcpServer _server;   // 服务器ProtobufCodecPtr _codec;         // protobuf协议处理器--编码与解码ProtobufDispatcher _dispatcher;  // 请求分发器VirtualHost::ptr _vhp;           // 虚拟机管理句柄QueueConsumerManger::ptr _cmp;   // 消费者管理句柄ConnectionManager::ptr _connp;   // 连接管理句柄threadpool::ptr _pool;           // 线程池管理句柄

        _baseloop:IO监听器,当服务器收到消息、获取新连接时会调用注册的回调函数进行处理

        _server:服务器,提供接收消息、获取新连接的功能

        _codec:协议处理器,自动将发送的消息序列化并加上报头;收到请求时,也会将请求反序列化,然后自动调用绑定的请求分发器。

        _dispatcher:请求分发器,通过请求的类型调用绑定的业务函数

        _vhp、_cmp、_connp、_pool:虚拟机管理句柄、消费者管理句柄、连接管理句柄、线程池句柄

        1-2.构造函数

       成员变量虽然多,但是只需要创建对象即可,不需要太多的参数,服务器只需呀端口号、虚拟机名称、保存数据的文件路径即可:

        public:typedef std::shared_ptr<google::protobuf::Message> MessagePtr;public:MqServer(int port,const std::string hname,const std::string& base_dir):_server(&_baseloop,muduo::net::InetAddress("0.0.0.0",port),"MqServer",muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&MqServer::onUnknowMessage,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage,&_dispatcher,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3))),_vhp(std::make_shared<VirtualHost>(hname,base_dir,base_dir+DBFILE)),_cmp(std::make_shared<QueueConsumerManger>()),_connp(std::make_shared<ConnectionManager>()),_pool(std::make_shared<threadpool>()){// 根据历史消息中的所有队列,初始化队列的消费者管理结构// 1.获取所有队列信息MsgQueueMap qm = _vhp->getAllMsgQueue();// 2.遍历队列,并将队列名添加到消费者管理结构for(const auto& queue:qm){_cmp->initQueueConsumer(queue.first);}}

        需要注意的是_dispatcher必须在_codec之前初始化,因为_codec要绑定_dispathcher。

        1-3.start 函数

        用于服务器的启动,启动服务器监听和IO事件监听:

        // 启动服务器void start(){_server.start();_baseloop.loop();}

        1-4.onConnection 函数

        当服务器建立连接/断开连接后,自动调用的回调函数,提示连接是否建立成功/断开:

        // 服务器建立连接时的回调函数void onConnection(const muduo::net::TcpConnectionPtr& conn){if(conn->connected()){LOG("连接建立成功");_connp->createConnection(_vhp,_cmp,_codec,conn,_pool);}else{LOG("连接断开");_connp->removeConnection(conn);}}

        1-5.onUnknowMessage

        请求分发器初始化时就注册的函数,如果客户端发来不符合通讯协议的请求格式,就调用它:

        // 服务器收到未知消息时的回调函数void onUnknowMessage(const muduo::net::TcpConnectionPtr& conn,const MessagePtr& message,muduo::Timestamp){std::cout << "未知的消息:" << message->GetTypeName() << std::endl;conn->shutdown(); // 断开连接}

        1-6.绑定回调函数

        以下内容加到构造函数的函数体内,作用是当服务器收到请求时,把消息进行解码,然后使用请求分发器调用绑定的业务处理函数。

        第二个就是在连接连接/断开时调用的回调函数。

            // 绑定服务器收到消息时,获取新连接时自动调用的回调函数// ProtobufCodec::onMessage 会将消息解码后分发给上面注册的不同业务处理函数_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage,_codec.get(),std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 绑定连接成功/断开连接时,自动调用的函数_server.setConnectionCallback(std::bind(&MqServer::onConnection,this,std::placeholders::_1));

        

第二节:业务处理函数

        2-1.函数声明

        先在服务器MqServer中声明各种业务的处理函数,注意它们的第二个参数是业务对应的请求类型的智能指针:

        private:/*-----业务处理函数-----*/// 1.打开信道void onOpenChannel(const muduo::net::TcpConnectionPtr& conn,const ChannelOpenRequestPtr& message,muduo::Timestamp receiveTime);// 2.关闭信道void onCloseChannel(const muduo::net::TcpConnectionPtr& conn,const ChannelCloseRequestPtr& message,muduo::Timestamp receiveTime);// 3.声明交换机void onDecExchange(const muduo::net::TcpConnectionPtr& conn,const ExchangeDeclareRequestPtr& message,muduo::Timestamp receiveTime);// 4.移除交换机void onRemExchange(const muduo::net::TcpConnectionPtr& conn,const ExchangeDeleteRequestPtr& message,muduo::Timestamp receiveTime);// 5.声明队列void onDecMsgQueue(const muduo::net::TcpConnectionPtr& conn,const MsgQueueDeclareRequestPtr& message,muduo::Timestamp receiveTime);// 6.移除队列void onRemMsgQueue(const muduo::net::TcpConnectionPtr& conn,const MsgQueueDeleteRequestPtr& message,muduo::Timestamp receiveTime);// 7.绑定void onQueueBind(const muduo::net::TcpConnectionPtr& conn,const QueueBindRequestPtr& message,muduo::Timestamp receiveTime);// 8.解绑void onQueueUnBind(const muduo::net::TcpConnectionPtr& conn,const QueueUnbindRequestPtr& message,muduo::Timestamp receiveTime);// 9.消息发布void onBasicPublish(const muduo::net::TcpConnectionPtr& conn,const BasicPublishRequestPtr& message,muduo::Timestamp receiveTime);// 10.消息确认void onBasicAck(const muduo::net::TcpConnectionPtr& conn,const BasicAckRequestPtr& message,muduo::Timestamp receiveTime);// 11.订阅队列void onBasicConsume(const muduo::net::TcpConnectionPtr& conn,const BasicConsumerRequestPtr& message,muduo::Timestamp receiveTime);// 12.取消订阅void onCancelBasicConsume(const muduo::net::TcpConnectionPtr& conn,const BasicCancelRequestPtr& message,muduo::Timestamp receiveTime);

        2-2.函数实现 

        创建qm_server.cc文件,业务处理函数的实现都放在里面,除了信道是直接由对应连接的句柄执行之外,其他的都需要找到对应的信道去执行。

        函数的实现只是封装已经写好的管理句柄,十分简单,直接给出代码:

#include "mq_server.hpp"namespace zd
{// 1.打开信道void MqServer::onOpenChannel(const muduo::net::TcpConnectionPtr& conn,const ChannelOpenRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("打开信道时,没有找到连接对应的connection对象!");conn->shutdown();return;}zd_conn->openChannel(message);}// 2.关闭信道void MqServer::onCloseChannel(const muduo::net::TcpConnectionPtr& conn,const ChannelCloseRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("关闭信道时,没有找到连接对应的connection对象!");conn->shutdown();return;}zd_conn->closeChannel(message);}// 3.声明交换机void MqServer::onDecExchange(const muduo::net::TcpConnectionPtr& conn,const ExchangeDeclareRequestPtr& message,muduo::Timestamp receiveTime){    // 找到发送请求的连接Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("声明交换机时,没有找到连接对应的connection对象!");conn->shutdown();return;}// 找到具体的信道Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("声明交换机时,没有找到信道");return;}channel->declareExchange(message);}// 4.移除交换机void MqServer::onRemExchange(const muduo::net::TcpConnectionPtr& conn,const ExchangeDeleteRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("移除交换机时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("移除交换机时,没有找到信道");return;}channel->destoryExchange(message);}// 5.声明队列void MqServer::onDecMsgQueue(const muduo::net::TcpConnectionPtr& conn,const MsgQueueDeclareRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("声明队列时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("声明队列时,没有找到信道");return;}channel->declareMsgQueue(message);}// 6.移除队列void MqServer::onRemMsgQueue(const muduo::net::TcpConnectionPtr& conn,const MsgQueueDeleteRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("移除队列时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("移除队列时,没有找到信道");return;}channel->destoryMsgQueue(message);}// 7.绑定void MqServer::onQueueBind(const muduo::net::TcpConnectionPtr& conn,const QueueBindRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("绑定时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("绑定时,没有找到信道");return;}channel->queueBind(message);}// 8.解绑void MqServer::onQueueUnBind(const muduo::net::TcpConnectionPtr& conn,const QueueUnbindRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("解绑时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("解绑时,没有找到信道");return;}channel->queueUnBind(message);}// 9.消息发布void MqServer::onBasicPublish(const muduo::net::TcpConnectionPtr& conn,const BasicPublishRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("发布消息时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("发布消息时,没有找到信道");return;}channel->basicPublish(message);}// 10.消息确认void MqServer::onBasicAck(const muduo::net::TcpConnectionPtr& conn,const BasicAckRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("确认消息时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("确认消息时,没有找到信道");return;}channel->basicAck(message);}// 11.订阅队列void MqServer::onBasicConsume(const muduo::net::TcpConnectionPtr& conn,const BasicConsumerRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("订阅队列时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("订阅队列时,没有找到信道");return;}channel->basicConsume(message);}// 12.取消订阅void MqServer::onCancelBasicConsume(const muduo::net::TcpConnectionPtr& conn,const BasicCancelRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("取消订阅时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("取消订阅时,没有找到信道");return;}channel->basicCancel(message);}
};

        2-3.函数绑定

        最后将业务处理函数与对应的请求类型在请求分发器中绑定起来即可,这在构造函数的函数体中完成,绑定的函数只允许传入3个参数,但是上述业务处理函数还有一个隐含的this指针,所以使用std::bind使第一个参数固定传入this后再绑定。

        直接给出完整的构造函数:

MqServer(int port,const std::string hname,const std::string& base_dir):_server(&_baseloop,muduo::net::InetAddress("0.0.0.0",port),"MqServer",muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&MqServer::onUnknowMessage,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage,&_dispatcher,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3))),_vhp(std::make_shared<VirtualHost>(hname,base_dir,base_dir+DBFILE)),_cmp(std::make_shared<QueueConsumerManger>()),_connp(std::make_shared<ConnectionManager>()),_pool(std::make_shared<threadpool>()){// 根据历史消息中的所有队列,初始化队列的消费者管理结构// 1.获取所有队列信息MsgQueueMap qm = _vhp->getAllMsgQueue();// 2.遍历队列,并将队列名添加到消费者管理结构for(const auto& queue:qm){_cmp->initQueueConsumer(queue.first);}// 注册请求与业务处理函数的映射关系// 创建/移除信道_dispatcher.registerMessageCallback<ChannelOpenRequest> // <>里给请求的类型,注意不是请求的智能指针(std::bind(&MqServer::onOpenChannel,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<ChannelCloseRequest>(std::bind(&MqServer::onCloseChannel,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 创建/移除交换机_dispatcher.registerMessageCallback<ExchangeDeclareRequest>(std::bind(&MqServer::onDecExchange,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<ExchangeDeleteRequest>(std::bind(&MqServer::onRemExchange,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 创建/移除队列_dispatcher.registerMessageCallback<MsgQueueDeclareRequest>(std::bind(&MqServer::onDecMsgQueue,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<MsgQueueDeleteRequest>(std::bind(&MqServer::onRemMsgQueue,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 绑定/解绑_dispatcher.registerMessageCallback<QueueBindRequest>(std::bind(&MqServer::onQueueBind,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<QueueUnbindRequest>(std::bind(&MqServer::onQueueUnBind,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 消息发送/确认_dispatcher.registerMessageCallback<BasicPublishRequest>(std::bind(&MqServer::onBasicPublish,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<BasicAckRequest>(std::bind(&MqServer::onBasicAck,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 订阅/取消订阅_dispatcher.registerMessageCallback<BasicConsumerRequest>(std::bind(&MqServer::onBasicConsume,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<BasicCancelRequest>(std::bind(&MqServer::onCancelBasicConsume,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 绑定服务器收到消息时,获取新连接时自动调用的回调函数// ProtobufCodec::onMessage 会将消息解码后分发给上面注册的不同业务处理函数_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage,_codec.get(),std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 绑定连接成功/断开连接时,自动调用的函数_server.setConnectionCallback(std::bind(&MqServer::onConnection,this,std::placeholders::_1));}

        至此,整个服务器模块都完成了。

下期预告:

        服务器完成后就可以着手客户端了,客户端的实现非常简单,它由消费者模块、信道管理模块组成,其中消费者模块不需要对多个消费者的管理,只要可以创建消费者即可,因为客户端不需要向消费者推送任何东西,而是使用消费者去订阅队列;信道管理模块与服务器的信道管理模块十分相似,因为它们是一一对应的,提供的接口也是一一对应的,因为客户端使用信道要完成的业务,服务器的信道都有业务的接口进行处理。


http://www.ppmy.cn/devtools/163488.html

相关文章

使用Uni-app实现语音视频聊天(Android、iOS)

使用Uni-app开发手机端APP已经变得很普遍&#xff0c;同一套代码就可以打包成Android App 和 iOS App&#xff0c;相比原生开发&#xff0c;可以节省客观的人力成本。那么如何使用Uni-app来开发视频聊天软件或视频会议软件了&#xff1f;本文将详细介绍在Uni-app中&#xff0c;…

【2025深度学习环境搭建-2】pytorch+Docker+VS Code+DevContainer搭建本地深度学习环境

上一篇文章&#xff1a;【2025深度学习环境搭建-1】在Win11上用WSL2和Docker解锁GPU加速 先启动Docker&#xff01;对文件内容有疑问&#xff0c;就去问AI 一、用Docker拉取pytorch镜像&#xff0c;启动容器&#xff0c;测试GPU docker pull pytorch/pytorch:2.5.0-cuda12.4…

游戏引擎学习第123天

仓库:https://gitee.com/mrxiao_com/2d_game_3 黑板&#xff1a;线程同步/通信 目标是从零开始编写一个完整的游戏。我们不使用引擎&#xff0c;也不依赖任何库&#xff0c;完全自己编写游戏所需的所有代码。我们做这个节目不仅是为了教育目的&#xff0c;同时也是因为编程本…

Pytorch使用手册—使用TACOTRON2进行文本到语音转换(专题二十四)

一、概述 本教程展示了如何使用torchaudio中的预训练Tacotron2构建文本到语音的管道。 文本到语音的管道流程如下: 文本预处理 首先,输入的文本被编码为一系列符号。在本教程中,我们将使用英语字符和音标作为符号。 谱图生成 从编码后的文本中生成谱图。我们使用Tacotron2…

鸿蒙 ArkUI 实现 2048 小游戏

2048 是一款经典的益智游戏&#xff0c;玩家通过滑动屏幕合并相同数字的方块&#xff0c;最终目标是合成数字 2048。本文基于鸿蒙 ArkUI 框架&#xff0c;详细解析其实现过程&#xff0c;帮助开发者理解如何利用声明式 UI 和状态管理构建此类游戏。 一、核心数据结构与状态管理…

1. EXCEL基础、界面介绍《AI赋能Excel 》

欢迎来到滔滔讲AI。 Excel表格是一种强大的电子表格软件&#xff0c;它不仅可以用来存储和组织数据&#xff0c;还可以进行复杂的计算、数据分析和可视化。无论是在工作,学习,还是日常生活中&#xff0c;Excel都经常用到&#xff0c;帮助人们管理和分析大量数据&#xff0c;做出…

浅谈Linux中的软件包管理器——基于ubuntu环境

文章目录 1. 为什么要使用软件包管理器1.1 使用源码1.2 使用rpm安装包1.3 使用apt软件包管理器 2. 如何使用apt2.1 软件的安装和卸载2.2 查找和搜素软件包2.3 更新并升级软件包2.4 清理缓存 3. 从apt到系统生态 1. 为什么要使用软件包管理器 在Linux中&#xff0c;有三种软件安…

Python学习第十七天之PyTorch保姆级安装

PyTorch安装与部署 一、准备工作二、pytorch介绍三、CPU版本pytorch安装1. 创建虚拟环境2. 删除虚拟环境1. 通过环境名称删除2. 通过环境路径删除 3. 配置镜像源4. 安装pytorch1. 首先激活环境变量2. 进入pytorch官网&#xff0c;找到安装指令 5. 验证pytorch是否安装成功 四、…