高性能服务器模型之Reactor(单线程版本)

devtools/2024/11/25 11:43:13/

一、概念

Reactor模型是一种事件驱动的设计模式,用于处理并发I/O操作。其核心思想是将I/O事件的监听和实际的I/O操作分离开来,由事件循环(Event Loop)负责监听I/O事件,当事件发生时,将事件分发给相应的事件处理器(Event Handler)进行处理。

单线程版本如下图:

其中各个组件的任务如下:

  1. Reactor:这是事件循环的核心,负责监听和分发事件。它等待I/O事件(如连接请求、数据到达等)的发生,并将这些事件分发给相应的处理器。

  2. Acceptor:Acceptor是Reactor模式中的一个特殊处理器,专门用于处理新的连接请求。当Reactor监听到新的连接请求时,它会将这个事件交给Acceptor处理。Acceptor负责接受新的连接,并将新的连接分配给适当的资源或处理器。

  3. Clients:在图中,客户端(client)代表与服务器建立连接的实体。在Reactor模式中,客户端可以是任何发起连接请求的实体。

  4. Dispatch:Dispatch是Reactor中的一个组件,它负责将事件分发给相应的处理器。在图中,Dispatch连接了Acceptor和一系列的处理步骤。

  5. Read:读取数据的步骤。一旦连接建立,服务器需要从客户端读取数据。

  6. Decode:解码数据的步骤。读取的数据可能需要解码,以转换为服务器可以理解的格式。

  7. Compute:计算或处理数据的步骤。服务器对解码后的数据进行业务逻辑处理。

  8. Encode:编码数据的步骤。处理完的数据可能需要编码,以转换为客户端可以理解的格式。

  9. Send:发送数据的步骤。服务器将编码后的数据发送回客户端。

二、代码实现 

代码模块:

Socket:负责管理sockfd的创建和传参。

InetAddress:负责处理ip地址,端口号等的大小端转换处理等。

Acceptor:由Socket和InetAddress组合而成,ready函数负责完成服务器前期的基本操作,达到监听状态,accept负责获取连接的netfd。

TcpConnect:负责和客户端和读写操作,使用Acceptor的accept获得的文件描述符初始化,进行信息传递。

EevenLoop:负责事件循环中的操作,封装了epoll的创建,增加,监听和删除操作,TCP连接三个事件的处理,接受用户输入的回调函数,并处理但由于EvenLoop本身没有发送消息的能力,所以EvenLoop将回调函数传入TcpConnect中注册,再使用TcpConnect的回调函数。

Server:封装Acceptor和EvenLoop的一些操作,简化使用。

1、Socket

Socket.h

#pragma onceclass Socket
{
public:Socket();~Socket();int getSockfd();
private:int _fd;
};

Socket.cc

#include"Socket.h"
#include<kk.h>
Socket::Socket(){_fd=socket(AF_INET,SOCK_STREAM,0);
}Socket::~Socket(){close(_fd);
}int Socket::getSockfd(){return _fd;
}

 2、InetAddress

InetAddress.h

#pragma once
#include<kk.h>
#include<iostream>
using std::string;class InetAddress
{
public:InetAddress(const string &ip,const string &port);InetAddress(struct sockaddr_in clientAddr);~InetAddress();struct sockaddr_in *getInetAddr();string getIp();string getPort();
private:struct sockaddr_in _serverAddr;
};

 InetAddress.cc

#include"InetAddress.h"
InetAddress::InetAddress(const string & ip,const string & port){_serverAddr.sin_family=AF_INET;_serverAddr.sin_port=htons(atoi(port.c_str()));_serverAddr.sin_addr.s_addr=inet_addr(ip.c_str());
}
InetAddress::InetAddress(struct sockaddr_in clientAddr){}
InetAddress::~InetAddress(){}
struct sockaddr_in *InetAddress::getInetAddr(){return &_serverAddr;
}string getIp();string getPort();

3、Acceptor

Acceptor.h 

#pragma once
#include"InetAddress.h"
#include"Socket.h"class Acceptor
{
public:Acceptor(const string &ip,const string &port);~Acceptor();void bind();void listen();int accept();void setReuse();void ready();int getSockfd();
private:Socket _sock;InetAddress _addr;
};

 Acceptor.cc

#include"Acceptor.h"
Acceptor::Acceptor(const string &ip,const string &port):_sock(),_addr(ip,port){}Acceptor::~Acceptor(){}void Acceptor::bind(){int ret=::bind(_sock.getSockfd(),(struct sockaddr *)_addr.getInetAddr(),sizeof(struct sockaddr_in));if(ret==-1){perror("bind");return;}
}void Acceptor::listen(){int ret=::listen(_sock.getSockfd(),50);if(ret==-1){perror("listen");return;}
}int Acceptor::accept(){int netfd=::accept(_sock.getSockfd(),NULL,NULL);if(netfd==-1){perror("accept");return -1;}return netfd;
}void Acceptor::setReuse(){int reuse=1;setsockopt(_sock.getSockfd(),SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(reuse));int reuse2=1;setsockopt(_sock.getSockfd(),SOL_SOCKET,SO_REUSEPORT,&reuse2,sizeof(reuse2));
}void Acceptor::ready(){setReuse();bind();listen();
}
int Acceptor::getSockfd(){return _sock.getSockfd();
}

4、TcpConnect

TcpConnect.h

#pragma once
#include<iostream>
#include<functional>
#include<memory>
#include<kk.h>
using std::string;
class TcpConnect;
using ConnectPtr=std::shared_ptr<TcpConnect>;
using TcpConnectCallback=std::function<void(const ConnectPtr &)>;
class TcpConnect
:public std::enable_shared_from_this<TcpConnect>
{
public:TcpConnect(int netfd);~TcpConnect();string recv();int send(const string &msg);void setNewConnectCb(const TcpConnectCallback &cb);void setCloseCb(const TcpConnectCallback &cb);void setMessageCb(const TcpConnectCallback &cb);void handleNewConnectCb();void handleMessageCb();void handCloseCb();bool isClose();
private:int readn(char *buf,int len);int writen(char *buf,int len);int readline(char *buf,int len);TcpConnectCallback _onNewConnectCb;TcpConnectCallback _onCloseCb;TcpConnectCallback _onMessageCb;int _netfd;
};

TcpConnect.cc

#include"TcpConnect.h"TcpConnect::TcpConnect(int netfd):_netfd(netfd)
{}TcpConnect::~TcpConnect(){close(_netfd);
}string TcpConnect::recv(){char buf[65535]={0}; readline(buf,sizeof(buf)); return string(buf); 
}int TcpConnect::send(const string &msg){::send(_netfd,msg.c_str(),msg.length(),MSG_NOSIGNAL);
}int TcpConnect::readn(char *buf,int len){int left=len;char *pstr=buf;int ret=0;while(left>0){ret=read(_netfd,pstr,left);if(ret==-1&&errno==EINTR){continue;}else if(ret==-1){perror("read error -1");return -1;}else if(ret==0){break;}else{pstr+=ret;left-=ret;}}return len-left;
}int TcpConnect::readline(char *buf,int len){int left=len-1;char *pstr=buf;int ret=0,total=0;while(left>0){ret=::recv(_netfd,pstr,left,MSG_PEEK);if(-1==ret&&errno==EINTR){continue;}else if(-1==ret){perror("readLine error -1");return -1;}else if(0==ret){break;}else{for(int i=0;i<ret;i++){if(pstr[i]=='\n'){int sz=i+1;readn(pstr,sz);pstr+=sz;*pstr='\0';return total+sz;}}readn(pstr,ret);total+=ret;pstr+=ret;left-=ret;}}return total;
}
void TcpConnect::setNewConnectCb(const TcpConnectCallback &cb){_onNewConnectCb=std::move(cb);
}
void TcpConnect::setCloseCb(const TcpConnectCallback &cb){_onCloseCb=std::move(cb);
}
void TcpConnect::setMessageCb(const TcpConnectCallback &cb){_onMessageCb=std::move(cb);
}
void TcpConnect::handleNewConnectCb(){if(_onNewConnectCb){_onNewConnectCb(shared_from_this());}else{std::cout<<"_onNewConnectCb==nullptr"<<std::endl;}
}
void TcpConnect::handleMessageCb(){if(_onMessageCb){_onMessageCb(shared_from_this());}else{std::cout<<"_onMessageCb==nullptr"<<std::endl;}
}
void TcpConnect::handCloseCb(){if(_onMessageCb){_onCloseCb(shared_from_this());}else{std::cout<<"_onCloseCb"<<std::endl;}
}bool TcpConnect::isClose(){char buf[10]={0};int ret=::recv(_netfd,buf,sizeof(buf),MSG_PEEK);return (0==ret);
}

 5、EvenLoop

EvenLoop.h

#pragma once
#include<vector>
#include<memory>
#include<map>
#include<utility>
#include<kk.h>
#include<functional>
#include"Acceptor.h"
#include"TcpConnect.h"
using std::vector;
using std::shared_ptr;
using std::map;using ConnectPtr=shared_ptr<TcpConnect>;
using TcpConnectCallback=std::function<void(const ConnectPtr &)>;class EvenLoop
{
public:EvenLoop(Acceptor &acceptor);~EvenLoop();int createEpoll();void addEpoll(int fd);void delEpoll(int fd);void loop();void unloop();void waitEpoll();void handleNewConnect();void handleMessage(int netfd);void setNewConnectCb(const TcpConnectCallback &cb);void setCloseCb(const TcpConnectCallback &cb);void setMessageCb(const TcpConnectCallback &cb);private:int _epfd;vector<struct epoll_event> _readyArr;bool _isLooping;Acceptor &_acceptor;map<int,ConnectPtr> _conns;TcpConnectCallback _onNewConnectCb;TcpConnectCallback _onCloseCb;TcpConnectCallback _onMessageCb;
};

EvenLoop.cc

#include"EvenLoop.h"
#include<iostream>
using std::cerr;
using std::endl;
using std::cout;
EvenLoop::EvenLoop(Acceptor &acceptor):_epfd(createEpoll()),_readyArr(1024),_isLooping(false),_acceptor(acceptor),_conns()
{addEpoll(_acceptor.getSockfd());
}EvenLoop::~EvenLoop(){close(_epfd);
}int EvenLoop::createEpoll(){int ret=epoll_create(1);if(ret<0){perror("epoll_create");return ret;}return ret;
}void EvenLoop::addEpoll(int fd){struct epoll_event event;event.events=EPOLLIN;event.data.fd=fd;int ret=epoll_ctl(_epfd,EPOLL_CTL_ADD,fd,&event);if(ret<0){perror("epoll_ctr_add");return;}
}void EvenLoop::delEpoll(int fd){epoll_ctl(_epfd,EPOLL_CTL_DEL,fd,nullptr);
}void EvenLoop::loop(){_isLooping=true;while(_isLooping==true){waitEpoll();}
}void EvenLoop::unloop(){_isLooping=false;
}void EvenLoop::waitEpoll(){int readyNum=0;do{readyNum=epoll_wait(_epfd,&*_readyArr.begin(),_readyArr.size(),3000);}while(readyNum==-1&&errno==EINTR);if(readyNum==-1){cerr<<"readyNum==-1"<<endl;}else if(readyNum==0){cout<<"----timeout!!!----"<<endl;}else{if(readyNum==(int)_readyArr.size()){_readyArr.reserve(2*readyNum);}for(int i=0;i<readyNum;i++){int fd=_readyArr[i].data.fd;if(fd==_acceptor.getSockfd()){handleNewConnect();}else{handleMessage(fd);}}}
}
void EvenLoop::handleNewConnect(){int netfd=_acceptor.accept();if(netfd<0){perror("handleNewConnect");return ;}addEpoll(netfd);ConnectPtr conn(new TcpConnect(netfd));conn->setNewConnectCb(_onNewConnectCb);conn->setMessageCb(_onMessageCb);conn->setCloseCb(_onCloseCb);_conns.insert(std::make_pair(netfd,conn));conn->handleNewConnectCb();
}void EvenLoop::handleMessage(int netfd){auto it=_conns.find(netfd);//存在该此连接if(it!=_conns.end()){bool flag=it->second->isClose(); if(flag==true){//已断开为trueit->second->handCloseCb();_conns.erase(it);delEpoll(netfd);return ;}else{it->second->handleMessageCb();}}else{std::cout<<"连接不存在"<<std::endl;return;}
}void EvenLoop::setNewConnectCb(const TcpConnectCallback &cb){_onNewConnectCb=cb;
}
void EvenLoop::setCloseCb(const TcpConnectCallback &cb){_onCloseCb=cb;
}
void EvenLoop::setMessageCb(const TcpConnectCallback &cb){_onMessageCb=cb;
}

 6、TcpServer

 TcpServer.h

#pragma once
#include"EvenLoop.h"
class TcpServer
{
public:TcpServer(const string &ip,const string &port);~TcpServer();void start();void stop();void setAllFunction(const TcpConnectCallback &newCallBack,const TcpConnectCallback &msgCallBack,const TcpConnectCallback &closeCallBack);private:Acceptor _acceptor;EvenLoop _loop;
};

 TcpServer.cc

#include"TcpServer.h"TcpServer::TcpServer(const string &ip,const string &port):_acceptor(ip,port),_loop(_acceptor){}
TcpServer::~TcpServer(){}void TcpServer::start(){_acceptor.ready();_loop.loop();
}
void TcpServer::stop(){_loop.unloop();
}
void TcpServer::setAllFunction(const TcpConnectCallback &newCallBack,const TcpConnectCallback &msgCallBack,const TcpConnectCallback &closeCallBack){_loop.setNewConnectCb(newCallBack);_loop.setMessageCb(msgCallBack);_loop.setCloseCb(closeCallBack);
}

三、测试

test.c 

#include <iostream>
#include"Socket.h"
#include"Acceptor.h"
#include"InetAddress.h"
#include"TcpConnect.h"
#include"EvenLoop.h"
#include"TcpServer.h"
#include"ThreadPool.h"using std::cout;
using std::endl;void onNew(const ConnectPtr& con){cout<<"---a client has connected!!---"<<endl;
}
void onMes(const ConnectPtr& con){string ret=con->recv();cout<<"msg>>"<<ret<<endl;con->send(ret);
}
void onClose(const ConnectPtr&con){cout<<"---a client has close---"<<endl;}int main()
{TcpServer server("127.0.0.1","1234");   server.setAllFunction(onNew,onMes,onClose);server.start();return 0;
}

server

 

client

实现了一个执行回声业务的reactor单线程服务器模型。 


http://www.ppmy.cn/devtools/136828.html

相关文章

第21周:机器学习

目录 摘要 Abstract 一、ARIMA模型 1、时间序列模型 &#xff08;1&#xff09;时间序列的分析方法 &#xff08;2&#xff09;时间序列的预处理 &#xff08;3&#xff09;ARIMA模型的引入 2、AR模型 3、MA模型 4、小结 二、K-means聚类算法 三、实验 1、数据处…

一加ACE 3 Pro手机无法连接电脑传输文件问题

先说结论&#xff1a;OnePlus手机无法连接电脑传输数据的原因&#xff0c;大概率是一加数据线的问题。尝试其他手机品牌的数据线&#xff08;比如华为&#xff09;&#xff0c;再次尝试。 连接电脑方法&#xff1a; 1 打开开发者模式&#xff08;非必要操作&#xff09; 进入…

Axios案例练习

使用原生的Ajax请求还是比较繁琐&#xff0c;所以说一般使用Axios&#xff0c;Axios是对于Ajax的封装&#xff0c;主要是为了简化书写。 Axios使用比较简单&#xff0c;主要分为两步&#xff1a; 1.在script标签的src中引入Axios文件 特别注意&#xff0c;这里是需要一对单独的…

详细描述一下Elasticsearch搜索的过程?

大家好&#xff0c;我是锋哥。今天分享关于【详细描述一下Elasticsearch搜索的过程?】面试题。希望对大家有帮助&#xff1b; 详细描述一下Elasticsearch搜索的过程? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Elasticsearch 是一个基于 Apache Lucene 构建的…

UE5材质篇5 简易水面

不得不说&#xff0c;UE5里搞一个水面实在是相比要自己写各种反射来说太友好了&#xff0c;就主要是开启一堆开关&#xff0c;lumen相关的&#xff0c;然后稍微连一些蓝图就几乎有了 这里要改一个shading model&#xff0c;要这个 然后要增加一个这个node 并且不需要连接base …

深入分析:固定参考框架在RViz中的作用与对数据可视化的影响 ros ubuntu20.04

深入分析&#xff1a;固定参考框架在RViz中的作用与对数据可视化的影响 RViz (Robot Visualization) 是 ROS (Robot Operating System) 中一种重要的三维可视化工具&#xff0c;主要用于实时观察和分析传感器数据、机器人状态信息以及环境模型。RViz的核心功能之一是固定参考框…

【探寻密码的奥秘】-000:密码相关概念定义及介绍(持续更新~~)

密码相关概念 1、密码学 1、密码学 密码学是研究密码与密码活动本质和规律&#xff0c;以及指导密码实践的科学&#xff0c;主要探索密码编码和密码分析的一般规律&#xff0c;它是一门结合数学、计算机科学、信息通信系统等多门学科为一体的综合性学科。 密码学的常见应用场景…

如何利用Python爬虫精准获得1688店铺详情

在数字化时代&#xff0c;数据的价值日益凸显&#xff0c;尤其是对于电商平台而言&#xff0c;精准获取店铺详情信息对于市场分析、竞争对手研究等方面至关重要。本文将详细介绍如何利用Python爬虫技术精准获取1688店铺的详细信息&#xff0c;包括店铺名称、地址、商品信息等&a…