一、连接管理模块的设计
我们之前说:RabbitMQ是将连接细分为信道,从而复用TCP连接
这个连接就是我们今天要实现的连接模块
muduo库当中有一个TcpConnection连接类,它是对底层TCP套接字的封装,不是我们要实现的业务上的连接模块
我们要实现的业务上的连接模块要怎么设计呢?
1.连接模块的设计
信道是连接的细分,因此连接的任务就是:
- 打开/关闭信道
- 获取信道
而且声明/删除信道都是接收Request,响应Response
成员:
- 信道管理模块句柄
- muduo网络通信连接TcpConnectionPtr
- ProtobufCodecPtr
- 信道模块构造所需资源:
- 虚拟机管理模块句柄
- 消费者管理模块句柄
- 异步工作线程池句柄
依旧是无需互斥锁
2. 连接管理模块的设计
增、删、查
1.连接ID? TcpConnectionPtr!!
我们肯定是需要将连接放到哈希表当中组织起来的
问题是应该让谁当key呢?
跟信道一样,搞一个“连接ID”??
是可以的,只不过要给所有的请求都在加上一个连接ID,而我们说信道是用户视角中的通信通道,现在如果要给所有的请求都加上一个连接ID,未免有点“违背”了这个信道的概念,而且很不优雅
那怎么办呢?
有没有什么优雅一些的方法?
接口:
- 创建连接
- 销毁连接
- 获取连接
成员:
4. 互斥锁
5. unordered_map<TcpConnectionPtr,Connection::ptr> _connection_map;
注意:
unordered_map 直接对 shared_ptr 的底层原始指针进行哈希,而不是对 shared_ptr 对象本身进行哈希
二、连接管理模块的实现
1.连接模块的实现
因为信道是连接的细分,所以对应的信道管理模块也由连接所创建,所独占,所管理
using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using OpenChannelRequestPtr = std::shared_ptr<OpenChannelRequest>;
using CloseChannelRequestPtr = std::shared_ptr<CloseChannelRequest>;class Connection
{
public:using ptr = std::shared_ptr<Connection>;//因为信道是连接的细分,所以对应的信道管理模块也由连接所创建,所独占,所管理Connection(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec, const ConsumerManager::ptr &consumer_manager_ptr,const VirtualHostManager::ptr &vhost_manager_ptr, const threadpool::ptr &pool_ptr): _conn(conn), _codec(codec), _consumer_manager_ptr(consumer_manager_ptr), _vhost_manager_ptr(vhost_manager_ptr), _pool_ptr(pool_ptr), _channel_manager_ptr(std::make_shared<ChannelManager>()) {}void openChannel(const OpenChannelRequestPtr &req){// 1. 处理请求_channel_manager_ptr->OpenChannel(req->channel_id(), _conn, _codec, _consumer_manager_ptr, _vhost_manager_ptr, _pool_ptr);// 2. 返回响应basicResponse(req->channel_id(), req->req_id(), true);}void closeChannel(const CloseChannelRequestPtr &req){// 1. 处理请求_channel_manager_ptr->CloseChannel(req->channel_id());// 2. 返回响应basicResponse(req->channel_id(), req->req_id(), true);}Channel::ptr getChannel(const std::string &channel_id){return _channel_manager_ptr->getChannel(channel_id);}private:void basicResponse(const std::string &channel_id, const std::string &req_id, bool ret){BasicCommonResponse resp;resp.set_channel_id(channel_id);resp.set_req_id(req_id);resp.set_ok(ret);_codec->send(_conn, resp);}muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;ConsumerManager::ptr _consumer_manager_ptr;VirtualHostManager::ptr _vhost_manager_ptr;threadpool::ptr _pool_ptr;ChannelManager::ptr _channel_manager_ptr;
};
2.连接管理模块的实现
就是增、删、查而已
class ConnectionManager
{
public:using ptr = std::shared_ptr<ConnectionManager>;void createConnection(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec,const ConsumerManager::ptr &consumer_manager_ptr, const VirtualHostManager::ptr &vhost_manager_ptr,const threadpool::ptr &pool_ptr){std::unique_lock<std::mutex> ulock(_mutex);if (_connection_map.count(conn))return;_connection_map.insert(std::make_pair(conn, std::make_shared<Connection>(conn, codec, consumer_manager_ptr, vhost_manager_ptr, pool_ptr)));}void destroyConnection(const muduo::net::TcpConnectionPtr &conn){std::unique_lock<std::mutex> ulock(_mutex);_connection_map.erase(conn);}Connection::ptr getConnecion(const muduo::net::TcpConnectionPtr &conn){std::unique_lock<std::mutex> ulock(_mutex);auto iter = _connection_map.find(conn);if (iter == _connection_map.end()){default_warning("未找到该TcpConnectionPtr对象所关联的连接对象");return Connection::ptr();}return iter->second;}private:std::mutex _mutex;std::unordered_map<muduo::net::TcpConnectionPtr, Connection::ptr> _connection_map;
};
三、服务器模块的设计
1.前言
在完成了信道管理模块和连接管理模块之后,即具体的TCP连接所对应的网络服务模块已经完成了之后
下面我们就可以直接搭建服务器了
我们之前使用过muduo库来搭建基于protobuf的网络服务器和客户端,知道muduo库是基于事件注册与响应机制的
都已经写过了,所以怎么设计,为何要这么写我就不赘述了,用法都是一样的
2.设计
注册的事件响应函数:
- 打开/关闭信道
- 声明/删除虚拟机
- 声明/删除交换机
- 声明/删除队列
- 绑定/解绑队列
- 订阅/取消订阅队列
- 发布/确认消息
注册的连接响应函数:
OnConnectionCallback
对外接口:
构造
start开启服务函数(死循环进行循环监听)
成员:
- muduo库网络通信模块:
EventLoop
TcpServer - muduo库基于protobuf的协议处理模块
ProtobufDispatcher
ProtobufCodec - 连接所需资源模块句柄
VirtualHostManager::ptr
ConsumerManager::ptr
threadpool::ptr - 连接管理模块句柄
ConnectionManager::ptr
四、服务器模块的实现
其实那些注册的函数就是先获取连接,后获取信道
然后复用信道的接口即可
注意:虚拟机在创建的时候要恢复历史消息,恢复历史消息时顺便初始化对应队列的消费者管理结构
using namespace ns_helper;namespace ns_google
{using MessagePtr = std::shared_ptr<google::protobuf::Message>;
}
const std::string vhost_dbfile = "main.db";
const int default_thread_num = 5;class Server
{
public:Server(uint16_t port, const std::string &dbfile = vhost_dbfile, int thread_num = default_thread_num): _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "Server", muduo::net::TcpServer::kReusePort), _dispatcher(std::bind(&Server::OnUnknownCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)), _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))), _consumer_manager_ptr(std::make_shared<ConsumerManager>()), _vhost_manager_ptr(std::make_shared<VirtualHostManager>(dbfile)), _pool_ptr(std::make_shared<threadpool>(thread_num)), _connection_manager_ptr(std::make_shared<ConnectionManager>()){// 0.针对历史消息中的所有队列,初始化队列的消费者管理结构VirtualHostMap vhmap = _vhost_manager_ptr->getAllVirtualHost();for (auto &vhost_kv : vhmap){MsgQueueMap mqmap = _vhost_manager_ptr->getAllMsgQueue(vhost_kv.first);for (auto &q_kv : mqmap){_consumer_manager_ptr->initQueueConsumerManager(vhost_kv.first,q_kv.first);}}// 1. 注册_dispatcher的事件响应回调函数// 1. 打开/关闭信道_dispatcher.registerMessageCallback<OpenChannelRequest>(std::bind(&Server::OnOpenChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<CloseChannelRequest>(std::bind(&Server::OnCloseChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 2. 声明/删除虚拟机_dispatcher.registerMessageCallback<DeclareVirtualHostRequest>(std::bind(&Server::OnDeclareVirtualHost, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<EraseVirtualHostRequest>(std::bind(&Server::OnEraseVirtualHost, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 3. 声明/删除交换机_dispatcher.registerMessageCallback<DeclareExchangeRequest>(std::bind(&Server::OnDeclareExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<EraseExchangeRequest>(std::bind(&Server::OnEraseExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 4. 声明/删除队列_dispatcher.registerMessageCallback<DeclareMsgQueueRequest>(std::bind(&Server::OnDeclareMsgQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<EraseMsgQueueRequest>(std::bind(&Server::OnEraseMsgQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 5. 绑定/解绑队列_dispatcher.registerMessageCallback<BindRequest>(std::bind(&Server::OnBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<UnbindRequest>(std::bind(&Server::OnUnbind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 6. 订阅/取消订阅队列_dispatcher.registerMessageCallback<BasicConsumeRequest>(std::bind(&Server::OnBasicConsume, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<BasicCancelRequest>(std::bind(&Server::OnBasicCancel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 7. 发布/确认消息_dispatcher.registerMessageCallback<BasicPublishRequest>(std::bind(&Server::OnBasicPublish, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<BasicAckRequest>(std::bind(&Server::OnBasicAck, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 2. 注册_server的事件回调函数和连接回调函数_server.setConnectionCallback(std::bind(&Server::OnConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));}void start(){// 开始监听_server.start();// 开始事件死循环监控_baseloop.loop();}// 回调接口
private:void OnConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){// 创建连接_connection_manager_ptr->createConnection(conn, _codec, _consumer_manager_ptr, _vhost_manager_ptr, _pool_ptr);default_info("连接建立成功");}else{// 删除连接_connection_manager_ptr->destroyConnection(conn);default_info("连接断开成功");}}void OnUnknownCallback(const muduo::net::TcpConnectionPtr &conn, const ns_google::MessagePtr &message, muduo::Timestamp){default_warning("非法请求,即将断开连接");if (conn->connected()){conn->shutdown();}}// 1. 打开/关闭信道void OnOpenChannel(const muduo::net::TcpConnectionPtr &conn, const OpenChannelRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("打开信道时,没有找到连接对应的Connection对象");return;}myconn->openChannel(req);}void OnCloseChannel(const muduo::net::TcpConnectionPtr &conn, const CloseChannelRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("关闭信道时,没有找到连接对应的Connection对象");return;}myconn->closeChannel(req);}// 2. 声明/删除虚拟机void OnDeclareVirtualHost(const muduo::net::TcpConnectionPtr &conn, const DeclareVirtualHostRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("声明虚拟机时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("声明虚拟机失败,因为获取信道失败");return;}mychannel->declareVirtualHost(req);}void OnEraseVirtualHost(const muduo::net::TcpConnectionPtr &conn, const EraseVirtualHostRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("删除虚拟机时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("删除虚拟机失败,因为获取信道失败");return;}mychannel->eraseVirtualHost(req);}// 3. 声明/删除交换机void OnDeclareExchange(const muduo::net::TcpConnectionPtr &conn, const DeclareExchangeRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("声明交换机时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("声明交换机失败,因为获取信道失败");return;}mychannel->declareExchange(req);}void OnEraseExchange(const muduo::net::TcpConnectionPtr &conn, const EraseExchangeRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("删除交换机时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("删除交换机失败,因为获取信道失败");return;}mychannel->eraseExchange(req);}// 4. 声明/删除队列void OnDeclareMsgQueue(const muduo::net::TcpConnectionPtr &conn, const DeclareMsgQueueRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("声明队列时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("声明队列失败,因为获取信道失败");return;}mychannel->declareMsgQueue(req);}void OnEraseMsgQueue(const muduo::net::TcpConnectionPtr &conn, const EraseMsgQueueRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("删除队列时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("删除队列失败,因为获取信道失败");return;}mychannel->eraseMsgQueue(req);}// 5. 绑定/解绑队列void OnBind(const muduo::net::TcpConnectionPtr &conn, const BindRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("绑定队列时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("绑定队列失败,因为获取信道失败");return;}mychannel->bind(req);}void OnUnbind(const muduo::net::TcpConnectionPtr &conn, const UnbindRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("解除绑定队列时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("解除绑定队列失败,因为获取信道失败");return;}mychannel->unBind(req);}// 6. 订阅/取消订阅队列void OnBasicConsume(const muduo::net::TcpConnectionPtr &conn, const BasicConsumeRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("订阅队列时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("订阅队列失败,因为获取信道失败");return;}mychannel->basicConsume(req);}void OnBasicCancel(const muduo::net::TcpConnectionPtr &conn, const BasicCancelRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("取消订阅队列时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("取消订阅队列失败,因为获取信道失败");return;}mychannel->basicCancel(req);}// 7. 发布/确认消息void OnBasicPublish(const muduo::net::TcpConnectionPtr &conn, const BasicPublishRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("发布消息时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("发布消息失败,因为获取信道失败");return;}mychannel->basicPublish(req);}void OnBasicAck(const muduo::net::TcpConnectionPtr &conn, const BasicAckRequestPtr &req, muduo::Timestamp){// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("确认消息时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("确认消息失败,因为获取信道失败");return;}mychannel->basicAck(req);}muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;ProtobufDispatcher _dispatcher;ProtobufCodecPtr _codec;ConsumerManager::ptr _consumer_manager_ptr;VirtualHostManager::ptr _vhost_manager_ptr;threadpool::ptr _pool_ptr;ConnectionManager::ptr _connection_manager_ptr;
};
五、服务器源文件编写,编译与运行
#include "broker.hpp"int main()
{ns_mq::Server server(8888);server.start();return 0;
}
注意:因为muduo_base库要依赖muduo_net库,所以要先连接muduo_net,后连接muduo_base
server:server.cc ../mqthird/include/proto/codec.cc ../mqcommon/mq_msg.pb.cc ../mqcommon/mq_proto.pb.ccg++ -o $@ $^ -std=c++11 -L ../mqthird/lib -I ../mqthird/include -lprotobuf -pthread -lmuduo_net -lmuduo_base -lsqlite3 -lz
.PHONY:clean
clean:rm -f server
编译并运行成功
套接字的确处于监听状态
等到我们写完客户端之后在进行一次功能联合大测试,到时候统一排查BUG
至此,服务器大功告成,总共3347行代码