目录
第一节:基本构架
1-1.成员变量
1-2.构造函数
1-3.start 函数
1-4.onConnection 函数
1-5.onUnknowMessage
1-6.绑定回调函数
第二节:业务处理函数
2-1.函数声明
2-2.函数实现
2-3.函数绑定
下期预告:
服务器模块在mqserver目录下实现。
第一节:基本构架
服务器模块提供的业务函数都来自于3个子模块,它自己也有几个接口需要实现,现将它的基本框架搭建出来,然后再添加这些业务处理函数。
创建mq_server.hpp文件,添加以下内容:
#ifndef __M_TCPSERVER_H__ #define __M_TCPSERVER_H__ // 服务器所需文件 #include "muduo/proto/codec.h" #include "muduo/proto/dispatcher.h" #include "muduo/base/Logging.h" #include "muduo/base/Mutex.h" #include "muduo/net/EventLoop.h" #include "muduo/net/TcpServer.h" // 子模块 #include "mq_virtualhost.hpp" #include "mq_connection.hpp" #include "mq_consumer.hpp"#include "../mqcommon/mq_msg.pb.h" #include "../mqcommon/mq_proto.pb.h"namespace zd {};#endif
1-1.成员变量
private:muduo::net::EventLoop _baseloop; // IO监听器muduo::net::TcpServer _server; // 服务器ProtobufCodecPtr _codec; // protobuf协议处理器--编码与解码ProtobufDispatcher _dispatcher; // 请求分发器VirtualHost::ptr _vhp; // 虚拟机管理句柄QueueConsumerManger::ptr _cmp; // 消费者管理句柄ConnectionManager::ptr _connp; // 连接管理句柄threadpool::ptr _pool; // 线程池管理句柄
_baseloop:IO监听器,当服务器收到消息、获取新连接时会调用注册的回调函数进行处理
_server:服务器,提供接收消息、获取新连接的功能
_codec:协议处理器,自动将发送的消息序列化并加上报头;收到请求时,也会将请求反序列化,然后自动调用绑定的请求分发器。
_dispatcher:请求分发器,通过请求的类型调用绑定的业务函数
_vhp、_cmp、_connp、_pool:虚拟机管理句柄、消费者管理句柄、连接管理句柄、线程池句柄
1-2.构造函数
成员变量虽然多,但是只需要创建对象即可,不需要太多的参数,服务器只需呀端口号、虚拟机名称、保存数据的文件路径即可:
public:typedef std::shared_ptr<google::protobuf::Message> MessagePtr;public:MqServer(int port,const std::string hname,const std::string& base_dir):_server(&_baseloop,muduo::net::InetAddress("0.0.0.0",port),"MqServer",muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&MqServer::onUnknowMessage,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage,&_dispatcher,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3))),_vhp(std::make_shared<VirtualHost>(hname,base_dir,base_dir+DBFILE)),_cmp(std::make_shared<QueueConsumerManger>()),_connp(std::make_shared<ConnectionManager>()),_pool(std::make_shared<threadpool>()){// 根据历史消息中的所有队列,初始化队列的消费者管理结构// 1.获取所有队列信息MsgQueueMap qm = _vhp->getAllMsgQueue();// 2.遍历队列,并将队列名添加到消费者管理结构for(const auto& queue:qm){_cmp->initQueueConsumer(queue.first);}}
需要注意的是_dispatcher必须在_codec之前初始化,因为_codec要绑定_dispathcher。
1-3.start 函数
用于服务器的启动,启动服务器监听和IO事件监听:
// 启动服务器void start(){_server.start();_baseloop.loop();}
1-4.onConnection 函数
当服务器建立连接/断开连接后,自动调用的回调函数,提示连接是否建立成功/断开:
// 服务器建立连接时的回调函数void onConnection(const muduo::net::TcpConnectionPtr& conn){if(conn->connected()){LOG("连接建立成功");_connp->createConnection(_vhp,_cmp,_codec,conn,_pool);}else{LOG("连接断开");_connp->removeConnection(conn);}}
1-5.onUnknowMessage
请求分发器初始化时就注册的函数,如果客户端发来不符合通讯协议的请求格式,就调用它:
// 服务器收到未知消息时的回调函数void onUnknowMessage(const muduo::net::TcpConnectionPtr& conn,const MessagePtr& message,muduo::Timestamp){std::cout << "未知的消息:" << message->GetTypeName() << std::endl;conn->shutdown(); // 断开连接}
1-6.绑定回调函数
以下内容加到构造函数的函数体内,作用是当服务器收到请求时,把消息进行解码,然后使用请求分发器调用绑定的业务处理函数。
第二个就是在连接连接/断开时调用的回调函数。
// 绑定服务器收到消息时,获取新连接时自动调用的回调函数// ProtobufCodec::onMessage 会将消息解码后分发给上面注册的不同业务处理函数_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage,_codec.get(),std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 绑定连接成功/断开连接时,自动调用的函数_server.setConnectionCallback(std::bind(&MqServer::onConnection,this,std::placeholders::_1));
第二节:业务处理函数
2-1.函数声明
先在服务器MqServer中声明各种业务的处理函数,注意它们的第二个参数是业务对应的请求类型的智能指针:
private:/*-----业务处理函数-----*/// 1.打开信道void onOpenChannel(const muduo::net::TcpConnectionPtr& conn,const ChannelOpenRequestPtr& message,muduo::Timestamp receiveTime);// 2.关闭信道void onCloseChannel(const muduo::net::TcpConnectionPtr& conn,const ChannelCloseRequestPtr& message,muduo::Timestamp receiveTime);// 3.声明交换机void onDecExchange(const muduo::net::TcpConnectionPtr& conn,const ExchangeDeclareRequestPtr& message,muduo::Timestamp receiveTime);// 4.移除交换机void onRemExchange(const muduo::net::TcpConnectionPtr& conn,const ExchangeDeleteRequestPtr& message,muduo::Timestamp receiveTime);// 5.声明队列void onDecMsgQueue(const muduo::net::TcpConnectionPtr& conn,const MsgQueueDeclareRequestPtr& message,muduo::Timestamp receiveTime);// 6.移除队列void onRemMsgQueue(const muduo::net::TcpConnectionPtr& conn,const MsgQueueDeleteRequestPtr& message,muduo::Timestamp receiveTime);// 7.绑定void onQueueBind(const muduo::net::TcpConnectionPtr& conn,const QueueBindRequestPtr& message,muduo::Timestamp receiveTime);// 8.解绑void onQueueUnBind(const muduo::net::TcpConnectionPtr& conn,const QueueUnbindRequestPtr& message,muduo::Timestamp receiveTime);// 9.消息发布void onBasicPublish(const muduo::net::TcpConnectionPtr& conn,const BasicPublishRequestPtr& message,muduo::Timestamp receiveTime);// 10.消息确认void onBasicAck(const muduo::net::TcpConnectionPtr& conn,const BasicAckRequestPtr& message,muduo::Timestamp receiveTime);// 11.订阅队列void onBasicConsume(const muduo::net::TcpConnectionPtr& conn,const BasicConsumerRequestPtr& message,muduo::Timestamp receiveTime);// 12.取消订阅void onCancelBasicConsume(const muduo::net::TcpConnectionPtr& conn,const BasicCancelRequestPtr& message,muduo::Timestamp receiveTime);
2-2.函数实现
创建qm_server.cc文件,业务处理函数的实现都放在里面,除了信道是直接由对应连接的句柄执行之外,其他的都需要找到对应的信道去执行。
函数的实现只是封装已经写好的管理句柄,十分简单,直接给出代码:
#include "mq_server.hpp"namespace zd {// 1.打开信道void MqServer::onOpenChannel(const muduo::net::TcpConnectionPtr& conn,const ChannelOpenRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("打开信道时,没有找到连接对应的connection对象!");conn->shutdown();return;}zd_conn->openChannel(message);}// 2.关闭信道void MqServer::onCloseChannel(const muduo::net::TcpConnectionPtr& conn,const ChannelCloseRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("关闭信道时,没有找到连接对应的connection对象!");conn->shutdown();return;}zd_conn->closeChannel(message);}// 3.声明交换机void MqServer::onDecExchange(const muduo::net::TcpConnectionPtr& conn,const ExchangeDeclareRequestPtr& message,muduo::Timestamp receiveTime){ // 找到发送请求的连接Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("声明交换机时,没有找到连接对应的connection对象!");conn->shutdown();return;}// 找到具体的信道Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("声明交换机时,没有找到信道");return;}channel->declareExchange(message);}// 4.移除交换机void MqServer::onRemExchange(const muduo::net::TcpConnectionPtr& conn,const ExchangeDeleteRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("移除交换机时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("移除交换机时,没有找到信道");return;}channel->destoryExchange(message);}// 5.声明队列void MqServer::onDecMsgQueue(const muduo::net::TcpConnectionPtr& conn,const MsgQueueDeclareRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("声明队列时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("声明队列时,没有找到信道");return;}channel->declareMsgQueue(message);}// 6.移除队列void MqServer::onRemMsgQueue(const muduo::net::TcpConnectionPtr& conn,const MsgQueueDeleteRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("移除队列时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("移除队列时,没有找到信道");return;}channel->destoryMsgQueue(message);}// 7.绑定void MqServer::onQueueBind(const muduo::net::TcpConnectionPtr& conn,const QueueBindRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("绑定时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("绑定时,没有找到信道");return;}channel->queueBind(message);}// 8.解绑void MqServer::onQueueUnBind(const muduo::net::TcpConnectionPtr& conn,const QueueUnbindRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("解绑时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("解绑时,没有找到信道");return;}channel->queueUnBind(message);}// 9.消息发布void MqServer::onBasicPublish(const muduo::net::TcpConnectionPtr& conn,const BasicPublishRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("发布消息时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("发布消息时,没有找到信道");return;}channel->basicPublish(message);}// 10.消息确认void MqServer::onBasicAck(const muduo::net::TcpConnectionPtr& conn,const BasicAckRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("确认消息时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("确认消息时,没有找到信道");return;}channel->basicAck(message);}// 11.订阅队列void MqServer::onBasicConsume(const muduo::net::TcpConnectionPtr& conn,const BasicConsumerRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("订阅队列时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("订阅队列时,没有找到信道");return;}channel->basicConsume(message);}// 12.取消订阅void MqServer::onCancelBasicConsume(const muduo::net::TcpConnectionPtr& conn,const BasicCancelRequestPtr& message,muduo::Timestamp receiveTime){Connection::ptr zd_conn = _connp->getOneConnection(conn);if(zd_conn.get() == nullptr){LOG("取消订阅时,没有找到连接对应的connection对象!");conn->shutdown();return;}Channel::ptr channel = zd_conn->getOneChannel(message->channel_id());if(channel.get() == nullptr){LOG("取消订阅时,没有找到信道");return;}channel->basicCancel(message);} };
2-3.函数绑定
最后将业务处理函数与对应的请求类型在请求分发器中绑定起来即可,这在构造函数的函数体中完成,绑定的函数只允许传入3个参数,但是上述业务处理函数还有一个隐含的this指针,所以使用std::bind使第一个参数固定传入this后再绑定。
直接给出完整的构造函数:
MqServer(int port,const std::string hname,const std::string& base_dir):_server(&_baseloop,muduo::net::InetAddress("0.0.0.0",port),"MqServer",muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&MqServer::onUnknowMessage,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage,&_dispatcher,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3))),_vhp(std::make_shared<VirtualHost>(hname,base_dir,base_dir+DBFILE)),_cmp(std::make_shared<QueueConsumerManger>()),_connp(std::make_shared<ConnectionManager>()),_pool(std::make_shared<threadpool>()){// 根据历史消息中的所有队列,初始化队列的消费者管理结构// 1.获取所有队列信息MsgQueueMap qm = _vhp->getAllMsgQueue();// 2.遍历队列,并将队列名添加到消费者管理结构for(const auto& queue:qm){_cmp->initQueueConsumer(queue.first);}// 注册请求与业务处理函数的映射关系// 创建/移除信道_dispatcher.registerMessageCallback<ChannelOpenRequest> // <>里给请求的类型,注意不是请求的智能指针(std::bind(&MqServer::onOpenChannel,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<ChannelCloseRequest>(std::bind(&MqServer::onCloseChannel,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 创建/移除交换机_dispatcher.registerMessageCallback<ExchangeDeclareRequest>(std::bind(&MqServer::onDecExchange,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<ExchangeDeleteRequest>(std::bind(&MqServer::onRemExchange,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 创建/移除队列_dispatcher.registerMessageCallback<MsgQueueDeclareRequest>(std::bind(&MqServer::onDecMsgQueue,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<MsgQueueDeleteRequest>(std::bind(&MqServer::onRemMsgQueue,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 绑定/解绑_dispatcher.registerMessageCallback<QueueBindRequest>(std::bind(&MqServer::onQueueBind,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<QueueUnbindRequest>(std::bind(&MqServer::onQueueUnBind,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 消息发送/确认_dispatcher.registerMessageCallback<BasicPublishRequest>(std::bind(&MqServer::onBasicPublish,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<BasicAckRequest>(std::bind(&MqServer::onBasicAck,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 订阅/取消订阅_dispatcher.registerMessageCallback<BasicConsumerRequest>(std::bind(&MqServer::onBasicConsume,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));_dispatcher.registerMessageCallback<BasicCancelRequest>(std::bind(&MqServer::onCancelBasicConsume,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 绑定服务器收到消息时,获取新连接时自动调用的回调函数// ProtobufCodec::onMessage 会将消息解码后分发给上面注册的不同业务处理函数_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage,_codec.get(),std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));// 绑定连接成功/断开连接时,自动调用的函数_server.setConnectionCallback(std::bind(&MqServer::onConnection,this,std::placeholders::_1));}
至此,整个服务器模块都完成了。
下期预告:
服务器完成后就可以着手客户端了,客户端的实现非常简单,它由消费者模块、信道管理模块组成,其中消费者模块不需要对多个消费者的管理,只要可以创建消费者即可,因为客户端不需要向消费者推送任何东西,而是使用消费者去订阅队列;信道管理模块与服务器的信道管理模块十分相似,因为它们是一一对应的,提供的接口也是一一对应的,因为客户端使用信道要完成的业务,服务器的信道都有业务的接口进行处理。