Reactor_0">Reactor模型
引言
概念:
Reactor模型
:又称为反应堆模型,它是一种基于事件驱动和I/O多路复用的设计模式,常用于处理大量并发I/O事件,是一个高性能模型,它通过事件驱动的方式,高效地管理大量并发连接,实现了对事件的同步解耦和高效处理,广泛应用于网络服务器的开发中事件
:事件就是类似欲I/O操作中的accept、connect、read、write等等不同的事件事件驱动
:不再是检测发生的事件类型调用对应的函数,而是通过提前注册好事件以及对应的回调函数,当事件发生时调用其回调函数,由事件来主导处理- Reactor模型也分
单Reactor单线程模型
、单Reactor多线程模型
、多Reactor多线程模型(主从Reactor多线程模型)
、单Reactor多进程模型(主从reactor多进程模型)
等等 - 本文只讲解单Reactor多线程模型和主从Reactor多线程模型,单Reactor单线程可参考Moon’s Blog
区别:
模型 | 特点 | 优点 | 缺点 |
---|---|---|---|
单Reactor单线程 | Reactor、事件处理器都在同一线程中运行,适用于连接数较少、业务处理简单的场景 | 实现简单,数据无需在线程间传递,线程安全性高 | 无法利用多核CPU,性能有限;当事件处理器执行耗时操作时,会阻塞整个事件循环 |
单Reactor多线程 | Reactor在主线程中运行,负责事件检测和分发,事件处理器在工作线程中运行,通常使用线程池 | 主线程专注于事件检测和分发,提高响应效率;工作线程并发处理事件,充分利用多核CPU | 实现相对复杂;当事件量较大时,单Reactor线程可能会成为性能瓶颈 |
主从reactor多线程 | 主Reactor线程负责接受新连接,将连接分配给从Reactor线程;从Reactor线程各自运行事件循环,处理各自负责的连接的I/O事件 | 主线程和工作线程职责分离,性能更高;更适合高并发、大流量的场景 | 实现复杂度更高;多个Reactor线程会消耗更多的系统资源,可能会导致资源利用率下降 |
Reactor_64">单Reactor多线程模型
概念
概念:
具体流程:
- 启动Reactor线程,将acceptor事件注册在I/O多路复用模型(select、epoll等)中
- Reactor线程监听客户端请求,处理连接请求,并将客户端事件(也就是handler)注册到I/O多路复用模型
- 当客户端事件发生时,将数据收发或者其他事件,放入任务队列(也就是交给工作线程)
- 最终将业务事件交给线程池中的工作线程处理
注意:Acceptor和handler组件可以抽象成一个事件类,因为他们其实都是事件的回调函数
Reactor多线程模型" />
实现细节:
- 实现一个基于非阻塞ET模式EPOLL的事件管理类(Reactor类)
- 将Acceptor和handler组件抽象成一个监听事件对象和业务处理事件对象,因此只需要实现一个事件类即可
- 将事件的属性信息作为成员封装成事件类
- 实现线程池并封装成类,将一个线程池作为对象放入Reactor类(用于将分发事件)
- 初始化监听事件对象(也就是Acceptor,可以像libevent库一样封装成类),将其注册在事件管理类(Reactor)中
- 启动Reactor事件循环,Acceptor将发起连接请求的客户端等信息初始化一个事件对象,然后注册到Reactor中,如果是断开连接请求,就将其从Reactor中删除
- 当客户端发送数据来时,事件被触发,Reactor将其事件对象中的回调函数,加入到任务队列中,分发给工作线程处理
组件实现
组件实现:
-
Reactor类
://提前定义的事件类(将acceptor和handler可抽象成一个事件类) #define MAX_EVENTS 65535 #define MAX_EPOLL_TIMEOUT_MSEC (35*60*1000)class Event;class EventBase { public:EventBase();~EventBase();void add_event(Event* ev); //注册事件到epoll中void del_event(Event* ev); //删除事件到epoll中void dispatch(struct timeval *tv); // 等同于libevent的event_base_dispatchvoid loopbreak(); // 等同于libevent的event_base_loopbreakEvent* timer_event_new(Callback cb,void *ctx,int flag,int timeout_ms); //创建并初始化新定时事件private:int epfd_; //epoll文件描述符Threadpool *tpool; //线程池bool shutdown_; //终止变量struct epoll_event events_[MAX_EVENTS]; //存储事件发生数组 };
-
事件类
:class Event { public:using Callback=std::function<void(Event*,void*)>; //定义回调函数类型Event(EventBase* base, int fd, uint32_t events, Callback cb,void *ctx); //事件初始化~Event();EventBase* getbase(); //获取其处于的Reactor类对象指针int getfd() const; //获取事件文件描述符uint32_t getevents() const; //获取事件类型void handle_cb(); //调用回调函数//判断事件是否是定时事件bool is_timer() const;private:EventBase* base_; //所处于的Reactor类对象指针int fd_; //事件文件描述符uint32_t events_; //事件类型Callback cb_; //事件发生要调用的回调函数void *ctx_; //回调函数参数bool is_timer_; //定时事件判断位 };
-
线程池类
:class Threadpool{ public:Threadpool(int num):threads(std::vector<std::thread>(num)),min_thr_num(num),live_num(num){init();}~Threadpool(){t_shutdown();}//初始化线程池void init();//销毁线程池并释放资源void t_shutdown();//各任务线程入口函数void t_task();//管理线程入口函数void adjust_task();//添加任务template<typename _Fn,typename... _Args>void add_task(_Fn&& fn,_Args&&... args){{auto f=std::bind(std::forward<_Fn>(fn),std::forward<_Args>(args)...);{std::unique_lock<std::mutex> lock(mx);if(shutdown) return;tasks.emplace(std::move(f));}task_cv.notify_one();}} private:std::thread adjust_thr; //管理线程std::vector<std::thread> threads; //线程数组std::queue<std::function<void()>> tasks; //任务队列std::mutex mx; //线程池锁std::condition_variable task_cv; //任务通知条件变量int min_thr_num=0; //线程池最小线程数int max_thr_num=0; //cpu核数的2n+1std::atomic<int> run_num=0; //线程池中正在执行任务的线程数std::atomic<int> live_num=0; //线程池空闲线程数std::atomic<int> exit_num=0; //线程池要销毁线程数bool shutdown=false; //线程池状态,true为运行,false为关闭 };
-
接口定义
:类似于libevent库中的接口提供,可以将以上类接口放入到私有成员,然后让用户使用以下列出的接口即可//创建Reactor对象指针 EventBase* event_base_new();// 开始事件循环 void event_base_dispatch(EventBase* base,struct timeval *tv);// 停止事件循环 void event_base_loopbreak(EventBase* base);// 创建新的事件 Event* event_new(EventBase* base, int fd, uint32_t events, Event::Callback cb,void *ctx);// 添加事件 void event_add(EventBase *base,Event* ev);// 删除事件 void event_del(EventBase *base,Event* ev);// 释放事件 void event_free(Event* ev);// 释放事件循环 void event_base_free(EventBase* base);//提供创建定时事件 Event *event_timer_new(EventBase *base,EventBase::Callback cb,void *ctx,int flag,int timeout_ms);//判断事件是否是定时事件 bool is_event_timer(Event *ev);
具体实现
具体实现:
-
Reactor类EventBase
:EventBase::EventBase() : epfd_(epoll_create1(0)), shutdown_(false) {//获取当前系统cpu数unsigned int cpu_cores = std::thread::hardware_concurrency();if (cpu_cores == 0) {cpu_cores = 4; // 默认值}tpool=new Threadpool(cpu_cores+1);if (epfd_ == -1) {perror("epoll_create1");exit(EXIT_FAILURE);} }//需要清理线程池和关闭epoll EventBase::~EventBase() {delete tpool;close(epfd_); }//注册事件(向epoll中添加) void EventBase::add_event(Event* event) {int fd = event->getfd();epoll_event ev;ev.data.ptr=event;ev.events = event->getevents();if (epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev) == -1) {perror("epoll_ctl: add");exit(EXIT_FAILURE);} }//删除事件(从epoll中删除) void EventBase::del_event(Event* event) {int fd = event->getfd();if (epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr) == -1) {perror("epoll_ctl: del");}}//事件循环,分发事件(将事件放入线程池中的任务队列) void EventBase::dispatch(struct timeval *tv) {//epoll_event epoll_events[MAX_EVENTS];int timeout=-1;if(tv!=nullptr){timeout = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000;}if(timeout>MAX_EPOLL_TIMEOUT_MSEC){timeout=MAX_EPOLL_TIMEOUT_MSEC;}while (!shutdown_) {//根据timeout设置检测间隔int n = epoll_wait(epfd_, events_, MAX_EVENTS, timeout);if (n == -1) {perror("epoll_wait");break;}for (int i = 0; i < n; ++i) {auto ev=static_cast<Event*>(events_[i].data.ptr);if(ev){tpool->add_task([&]{ev->handle_cb();});}}} }//创建并初始化新定时事件 Event* EventBase::timer_event_new(Callback cb,void *ctx,int flag,int timeout_ms){//使用Linux提供的定时器接口,可以结合epoll使用int fd=timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK);if(-1==fd){perror("timerfd_create error");return nullptr;}itimerspec tvalue;//it_value设置定时器第一次到期时间tvalue.it_value.tv_sec=timeout_ms/1000; //初始化到期秒数tvalue.it_value.tv_nsec=(timeout_ms%1000)*1000000; //初始化到期纳秒数tvalue.it_interval={0,0};//it_interval设置定时器第一次之后每次到期的间隔时间,设置为0,定时器只会触发一次,非0为周期性触发if(flag){tvalue.it_interval.tv_sec=timeout_ms/1000; //间隔时间秒数tvalue.it_interval.tv_nsec=(timeout_ms%1000)*1000000; //间隔时间的纳秒数}if(timerfd_settime(fd,0,&tvalue,NULL)==-1){perror("timerfd_settime error");close(fd);return nullptr;}return new Event(this,fd,EPOLLIN,cb,ctx,true); }//终止事件循环 void EventBase::loopbreak() {shutdown_ = false; }
-
事件类Event
:Event::Event(EventBase* base, int fd, uint32_t events, Callback cb,void *ctx,bool istimer): base_(base), fd_(fd), events_(events), cb_(cb),ctx_(ctx),is_timer_(istimer){//初始化就将事件放入对应的EventBase类对象中base_->add_event(this); }//析构自动调用删除事件操作 Event::~Event() {base_->del_event(this); }//获取其EventBase类对象 EventBase* Event::getbase(){return base_; }//获取事件文件描述符 int Event::getfd() const {return fd_; }//获取事件的事件类型 uint32_t Event::getevents() const {return events_; }//调用事件回调函数 void Event::handle_cb() {if (cb_) {cb_(this,ctx_);} }//获取定时事件判断位 bool Event::is_timer() const{return is_timer_; }
-
线程池类Threadpool
:#include "Threadpool.h" #include <chrono> #include <mutex> #include <iostream> #include <unistd.h>void Threadpool::init(){unsigned int cpu_cores = std::thread::hardware_concurrency()/2;if(cpu_cores==0) cpu_cores=4;max_thr_num=2*cpu_cores+1;adjust_thr=std::thread([this]{this->adjust_task();});for(int i=0;i<min_thr_num;++i){threads.emplace_back([this]{this->t_task();});} }void Threadpool::t_task(){while (1) {std::unique_lock<std::mutex> lock(mx);task_cv.wait(lock,[this]{return !tasks.empty()||shutdown||exit_num>0;});if(exit_num>0){exit_num--;return;}if(shutdown&&tasks.empty()){return;}auto task=tasks.front();tasks.pop();lock.unlock();++run_num;--live_num;task();++live_num;--run_num;std::this_thread::sleep_for(std::chrono::seconds(1));} }void Threadpool::t_shutdown(){{std::unique_lock<std::mutex> lock(mx);shutdown=true;}adjust_thr.detach();task_cv.notify_all();for(auto& t:threads){if(t.joinable()) t.join();} }void Threadpool::adjust_task(){while (!shutdown) {std::this_thread::sleep_for(std::chrono::seconds(DEFAULT_TIME));{int size=threads.size();if (tasks.size() > live_num && live_num < max_thr_num&&size<max_thr_num) {int add = 0;std::unique_lock<std::mutex> lock(mx);for (int i = size; i < max_thr_num && add < 10; ++i) {threads.emplace_back([this] {this->t_task();});add++;live_num++;}lock.unlock();std::cout << "线程池扩容成功!" << std::endl;}if (run_num * 2 < live_num && live_num > min_thr_num) {exit_num=live_num-min_thr_num>=10?10:live_num-min_thr_num;int x=exit_num;std::unique_lock<std::mutex> lock(mx);for (int i = 0; i < x; ++i) {task_cv.notify_one();live_num--;}lock.unlock();std::cout << "线程池瘦身成功!" << std::endl;}}} }
-
接口实现
:// 创建新的事件循环 EventBase* event_base_new() {return new EventBase(); }// 开始事件循环 void event_base_dispatch(EventBase* base,struct timeval *tv) {base->dispatch(tv); }// 停止事件循环 void event_base_loopbreak(EventBase* base) {base->loopbreak(); }// 创建新的事件 Event* event_new(EventBase* base, int fd, uint32_t events, Event::Callback cb,void *ctx) {return new Event(base, fd, events, cb,ctx); }// 添加事件 void event_add(EventBase *base,Event* ev) {// 在Event的构造函数中已经添加base->add_event(ev); }// 删除事件 void event_del(EventBase *base,Event* ev) {// 在Event的析构函数中已经删除base->del_event(ev); }// 释放事件 void event_free(Event* ev) {delete ev; }// 释放事件循环 void event_base_free(EventBase* base) {delete base; }//提供创建定时事件接口 Event *event_timer_new(EventBase *base,EventBase::Callback cb,void *ctx,int flag,int timeout_ms){return base->timer_event_new(cb,ctx,flag,timeout_ms); }//判断事件是否是定时事件 bool is_event_timer(Event *ev){return ev->is_timer(); }
使用示例
使用示例:
使用以上实现的接口写一个回声服务器(我将其封装到了event.h文件中)
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include "event.h"void setnonblock(int fd){fcntl(fd,F_SETFL, fcntl(fd,F_GETFL)|O_NONBLOCK);
}void readCallback(Event *ev,void *args){char buffer[1024];int fd=ev->getfd();while (true) {ssize_t n = read(fd, buffer, sizeof(buffer));if (n > 0) {write(fd, buffer, n); // 回显} else if (n == 0) {std::cout << "Client disconnected: " << fd << std::endl;event_free(ev);close(fd);break;} else {if (errno == EAGAIN || errno == EWOULDBLOCK) {break; // 数据读取完毕} else {perror("read");event_free(ev);close(fd);break;}}}
}void acceptCallback(Event *ev,void *args) {EventBase* base=ev->getbase();int fd=ev->getfd();while (true) {sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);int client_fd = accept(fd, (sockaddr*)&client_addr, &client_len);if (client_fd == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {break; // 没有更多的连接} else {perror("accept");break;}}std::cout << "New connection: " << client_fd << std::endl;setnonblock(client_fd);Event* client_event = event_new(base, client_fd, EPOLLIN|EPOLLET, readCallback,nullptr);}
}void time_hello_cb(Event *ev,void *args){//用来取出定时器到期次数uint64_t expirations;ssize_t s = read(ev->getfd(), &expirations, sizeof(uint64_t));if (s==-1) {//对于非阻塞处理if (errno == EAGAIN || errno == EWOULDBLOCK) {return;} else {perror("read");exit(EXIT_FAILURE);}}std::cout<<expirations<<":hello timer_event"<<std::endl;
}void time_world_cb(Event *ev,void *args){//用来取出定时器到期次数uint64_t expirations;ssize_t s = read(ev->getfd(), &expirations, sizeof(uint64_t));if (s==-1) {//对于非阻塞处理if (errno == EAGAIN || errno == EWOULDBLOCK) {return;} else {perror("read");exit(EXIT_FAILURE);}}std::cout<<expirations<<":world timer_event"<<std::endl;
}int main() {int listen_fd = socket(AF_INET, SOCK_STREAM, 0);setnonblock(listen_fd);setreuse(listen_fd);sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(5005);addr.sin_addr.s_addr = INADDR_ANY;Bind(listen_fd, (sockaddr*)&addr, sizeof(addr));listen(listen_fd, SOMAXCONN);EventBase* base = event_base_new();Event* listen_event = event_new(base, listen_fd, EPOLLIN|EPOLLET, acceptCallback,nullptr);//5秒触发一次Event* timer_event=event_timer_new(base,time_hello_cb,nullptr,1,5000);//5s后只触发一次Event* timer_ev=event_timer_new(base,time_world_cb,nullptr,0,5000);//开启循环struct timeval tv;tv.tv_sec=1; //设置秒数tv.tv_usec=0; //设置微妙event_base_dispatch(base,&tv);//释放event_free(timer_event);event_free(timer_ev);event_free(listen_event);event_base_free(base);return 0;
}
Reactor_746">主从Reactor多线程模型
概念
概念:
主从Reactor多线程模型
:该模型是将职能划分给主Reactor和从Reactor(工作线程),主Reactor
就负责监听连接事件(不进行任何I/O处理和其他实质性事件),并将新连接对象和事件分发给从Reactor,从Reactor
就负责处理和监听主Reactor分发的事件,这样能够最大限度提高接收连接的响应速度,避免被一些I/O或者其他事件操作所阻塞。one loop one thread
:这是一个常见的高性能网络编程的思想与架构(可见陈硕大佬的muduo库),意思就是一个事件循环(Reactor)对应着一个线程,也就是每一个线程都有一个属于自己的事件循环(Reactor),这个架构大大避免锁竞争、频繁上下文切换开销等问题,并充分利用多核 CPU 的性能- 本文也将借鉴muduo库,来实现主从Reactor多线程模型
one loop one thread:+-----------------+| 主线程(Acceptor)|+-----------------+|+-------------+-------------+| | |
+---------------+ +---------------+ +---------------+
| 工作线程1 | | 工作线程2 | | 工作线程3 |
| (Event Loop) | | (Event Loop) | | (Event Loop) |
+---------------+ +---------------+ +---------------+
具体架构:
- 首先不管是主Reactor和从Reactor都是一个由事件循环类实现的
- 但是主Reactor不止有事件循环的职能,还需要有监听连接事件的类,也就是Acceptor类
- 因为one loop one thread,我们需要将从reactor与线程进行做一个封装,让主Reactor使用线程池更好管理,所以我们需要一个事件循环线程类
- 主Reactor还有事件分发的职能,因此应该需要主Reactor来管理从Reactor并将事件分发,因此我们需要一个事件循环线程池
- 最后还有个最基础的事件类(用于整合文件描述符、注册事件以及对应回调函数)
注:
具体流程:
- 将acceptor事件注册到主Reactor中
- 初始化事件循环线程池,各事件循环线程启动事件循环
- 启动主Reactor事件循环,循环监听acceptor事件的连接请求
- 触发acceptor事件,调用创建连接回调函数,将新连接分发给从Reactor
- 从Reactor将分发下来的新连接事件注册到事件循环中
- 当事件触发,从Reactor进行处理
组件实现
组件实现:
-
eveneloop(Reactor类)/事件循环类
:#ifndef _EVENTLOOP_H_ #define _EVENTLOOP_H_#include "wrap.h" #include <atomic> #include <cstdlib> #include <iostream> #include <functional> #include <vector> #include <list> #include <unistd.h> #include <strings.h> #include <sys/epoll.h> #include <sys/eventfd.h> #include <sys/timerfd.h>#define MAX_EVENTS 65536namespace moon {class base_event; class event; class loopthread;//reactor类-->事件循环类 class eventloop{ public:using Callback=std::function<void()>;eventloop(loopthread* base=nullptr,int timeout=-1);~eventloop();loopthread* getbaseloop();int getefd() const;int getevfd() const;//事件控制函数void add_event(event* event);void del_event(event* event);void mod_event(event* event);void loop();void loopbreak();void create_eventfd(); //创建通知文件描述符void read_eventfd(); //通知文件描述符回调函数void write_eventfd(); //唤醒通知事件,用于终止服务通知void add_pending_del(base_event* ev);private:int epfd_; //epoll文件描述符int eventfd_; //通知文件描述符int timeout_=-1;std::atomic<bool> shutdown_;std::list<event*> evlist_;std::vector<epoll_event> events_;std::vector<base_event*> delque_; //待删除事件队列loopthread *baseloop_; }; }#endif
-
event类/事件类
:#ifndef _EVENT_H_ #define _EVENT_H_#include "wrap.h" #include <atomic> #include <cstdlib> #include <iostream> #include <functional> #include <list> #include <unistd.h> #include <strings.h> #include <sys/epoll.h> #include <sys/eventfd.h> #include <sys/timerfd.h>namespace moon{class eventloop;//事件类 class event{ public:// using Callback=std::function<void(event*)>;using Callback=std::function<void()>;event(eventloop *base,int fd,uint32_t events);~event();int getfd() const; //获取事件文件描述符uint32_t getevents() const; //获取获取监听事件类型eventloop* getloop() const;//设置回调函数,不需要传null即可void setcb(const Callback &rcb,const Callback &wcb,const Callback &ecb);void setrcb(const Callback &rcb);void setwcb(const Callback &wcb);void setecb(const Callback &ecb);Callback getrcb();Callback getwcb();Callback getecb();void setrevents(const uint32_t revents); //设置触发事件类型void enable_events(uint32_t op); //添加监听事件类型void disable_events(uint32_t op); //取消监听事件类型void update_ep(); //更新监听事件void handle_cb(); //处理事件回调函数bool readable();bool writeable();void enable_read();void disable_read();void enable_write();void disable_write();void enable_ET();void disable_ET();void reset_events();void del_listen(); //取消监听void enable_listen(); //开启监听void disable_cb();void close(){del_listen();} private:eventloop *loop_;int fd_;uint32_t events_; //监听事件uint32_t revents_; //触发事件Callback readcb_; //读事件回调函数Callback writecb_; //写事件回调函数Callback eventcb_; //出了读写事件的其他事件触发回调函数,用作错误事件回调 };}#endif
-
loopthread类/事件循环线程类
:#ifndef _LOOPTHREAD_H_ #define _LOOPTHREAD_H_#include <condition_variable> #include <mutex> #include <thread>#define MAX_EPOLL_TIMEOUT_MSEC (35*60*1000)namespace moon {class eventloop;class loopthread{ public:loopthread(int timeout=-1):loop_(nullptr),t_(std::thread(&loopthread::_init_,this)),timeout_(timeout){if(timeout_>MAX_EPOLL_TIMEOUT_MSEC){timeout_=MAX_EPOLL_TIMEOUT_MSEC;}}~loopthread();void _init_(); //初始化eventloop* getloop(); //获取loop_void join(){if(t_.joinable()) t_.join();} private:eventloop *loop_;std::mutex mx_;std::condition_variable cv_;std::thread t_;int timeout_=-1; //设置epoll间隔检测时间ms };}#endif // !_LOOPTHREAD_H_
-
looptpool类/事件循环线程池类
:#ifndef _LOOPTPOOL_H_ #define _LOOPTPOOL_H_#include <thread> #include <vector> #include <set>namespace moon {class eventloop; class loopthread;class looptpool{ public:looptpool(eventloop *base);~looptpool();void create_pool(int n,int timeout=-1); //初始化线程池eventloop* ev_dispatch(); //分发事件looptpool(const looptpool&)=delete;looptpool& operator=(const looptpool&)=delete;void addloop(); //添加事件循环void stop(); //终止运行 private:void init_pool(int timeout=-1);//设置线程池线程数void settnum(int n){t_num=n;}private:eventloop *baseloop_;std::vector<eventloop*> loadvec_;int next_=0;int t_num=0;int timeout_=-1; };}#endif
-
acceptor类/监听者类
:#ifndef _ACCEPTOR_H_ #define _ACCEPTOR_H_#include <functional>namespace moon {class eventloop; class event;//监听连接类 class acceptor{ public:using Callback=std::function<void(int)>;acceptor(int port,eventloop *base);~acceptor();void listen(); //开始监听void stop(); //停止监听void init_sock(int port); //建立监听套接字void setcb(const Callback &accept_cb); //设置回调函数void handle_accept(); //acceptor事件回调函数,用来接收连接 private:int lfd_;eventloop *loop_;event *ev_;Callback cb_; //连接后回调函数bool shutdown_; };}#endif
-
工具头文件/wrap.h实现
:#ifndef _WRAP_H_ #define _WRAP_H_#include <netinet/in.h> #include <cstddef> #include <cstdlib> #include <netinet/tcp.h> #include <fcntl.h> #include <cstdio>void perr_exit(const char *s); void settcpnodelay(int fd); void setreuse(int fd); void setnonblock(int fd); int Bind(int fd, const struct sockaddr *sa, socklen_t salen); int Listen(int fd, int backlog); int Socket(int family, int type, int protocol);#endif
具体实现
具体实现:
-
eventloop类
:#include "eventloop.h" #include "event.h" #include <algorithm>using namespace moon;eventloop::eventloop(loopthread* base,int timeout) :baseloop_(base),timeout_(timeout),shutdown_(false),events_(MAX_EVENTS){epfd_=epoll_create1(0);if(-1==epfd_){perror("epoll_create1");exit(EXIT_FAILURE);}create_eventfd(); }eventloop::~eventloop(){loopbreak();for(auto &ev : evlist_){delete ev;}for(auto &ev:delque_){delete ev;}evlist_.clear();delque_.clear();close(epfd_);close(eventfd_); }loopthread* eventloop::getbaseloop(){if(!baseloop_) return nullptr;return baseloop_; }int eventloop::getefd() const{return epfd_; }int eventloop::getevfd() const{return eventfd_; }void eventloop::add_event(event* event){if(std::find(evlist_.begin(), evlist_.end(),event)!=evlist_.end()){mod_event(event);return;}int fd=event->getfd();struct epoll_event ev;ev.data.ptr=event;//ev.data.fd=fd;ev.events=event->getevents();if(epoll_ctl(epfd_,EPOLL_CTL_ADD,fd,&ev)==-1){perror("epoll_ctl add error");}evlist_.emplace_back(event); }void eventloop::del_event(event* event){if(std::find(evlist_.begin(), evlist_.end(),event)==evlist_.end()) return;int fd=event->getfd();if(epoll_ctl(epfd_,EPOLL_CTL_DEL,fd,nullptr)==-1){perror("epoll_ctl del error");}evlist_.remove(event); }void eventloop::mod_event(event* event){if(std::find(evlist_.begin(), evlist_.end(),event)==evlist_.end()) return;int fd=event->getfd();struct epoll_event ev;ev.data.ptr=event;ev.events=event->getevents();if(epoll_ctl(epfd_,EPOLL_CTL_MOD,fd,&ev)==-1){perror("epoll_ctl modify error");} }void eventloop::loop(){while (!shutdown_) {int n=epoll_wait(epfd_,events_.data(),MAX_EVENTS,timeout_);if(-1==n){if(errno==EINTR) continue;perror("epoll_wait");break;}for(int i=0;i<n;++i){auto ev=static_cast<event*>(events_[i].data.ptr);ev->setrevents(events_[i].events);ev->handle_cb();}if(n==events_.size()){events_.resize(events_.size() * 2);}if(!delque_.empty()){for(auto ev : delque_){delete ev;}delque_.clear();}} }//终止事件循环 void eventloop::loopbreak(){if(shutdown_) return;write_eventfd(); //唤醒 }void eventloop::create_eventfd(){eventfd_=eventfd(0,EFD_CLOEXEC | EFD_NONBLOCK);if(eventfd_<0){perror("eventfd create error");exit(EXIT_FAILURE);}event *ev=new event(this,eventfd_,EPOLLIN);ev->setcb(std::bind(&eventloop::read_eventfd,this),NULL,NULL);add_event(ev); }void eventloop::read_eventfd(){uint64_t opt=1;ssize_t n=read(eventfd_,&opt,sizeof(opt));shutdown_=true;if(n<0){if(errno==EINTR || errno == EAGAIN){return;}perror("eventfd read error");exit(EXIT_FAILURE);} }void eventloop::write_eventfd(){uint64_t opt=1;ssize_t n=write(eventfd_,&opt,sizeof(opt));if(n<0){if(errno==EINTR){return;}perror("eventfd write error");exit(EXIT_FAILURE);} }void eventloop::add_pending_del(base_event *ev) {delque_.emplace_back(ev); }
-
event类
:#include "event.h" #include "eventloop.h"using namespace moon;event::event(eventloop *base,int fd,uint32_t events):loop_(base),fd_(fd),events_(events){ }event::~event(){ }int event::getfd() const{return fd_; }uint32_t event::getevents() const{return events_; }eventloop* event::getloop() const{return loop_; }void event::setcb(const Callback &rcb,const Callback &wcb,const Callback &ecb){readcb_=rcb;writecb_=wcb;eventcb_=ecb; }void event::setrcb(const Callback &rcb){readcb_=rcb; }void event::setwcb(const Callback &wcb){writecb_=wcb; }void event::setecb(const Callback &ecb){eventcb_=ecb; }void event::setrevents(const uint32_t revents){revents_=revents; }void event::update_ep(){loop_->mod_event(this); }void event::enable_events(uint32_t op){events_|=op;update_ep(); }void event::disable_events(uint32_t op){events_&=~op;update_ep(); }bool event::readable(){return (events_&EPOLLIN); }bool event::writeable(){return (events_&EPOLLOUT); }void event::enable_read(){enable_events(EPOLLIN); }void event::disable_read(){disable_events(EPOLLIN); }void event::enable_write(){enable_events(EPOLLOUT); }void event::disable_write(){disable_events(EPOLLOUT); }void event::enable_ET(){enable_events(EPOLLET); }void event::disable_ET(){disable_events(EPOLLET); }void event::reset_events(){events_=0;update_ep(); }void event::del_listen(){loop_->del_event(this); }void event::enable_listen(){loop_->add_event(this); }void event::disable_cb() {readcb_= nullptr;writecb_= nullptr;eventcb_= nullptr; }void event::handle_cb(){if((revents_ & EPOLLIN) || (revents_ & EPOLLRDHUP) || (revents_ & EPOLLPRI)){if(readcb_) readcb_();}if(revents_&EPOLLOUT){if(writecb_) writecb_();}if(revents_ & (EPOLLERR|EPOLLHUP)){if(eventcb_) eventcb_();} }event::Callback event::getrcb() {return readcb_; }event::Callback event::getwcb() {return writecb_; }event::Callback event::getecb() {return eventcb_; }
-
loopthread类
:#include "loopthread.h" #include "eventloop.h"using namespace moon;loopthread::~loopthread(){if(loop_){loop_->loopbreak();if(t_.joinable()) t_.join();delete loop_;}else{if(t_.joinable()) t_.join();} }void loopthread::_init_(){eventloop *loop=new eventloop(this,timeout_);{std::unique_lock<std::mutex> lock(mx_);loop_=loop;cv_.notify_all();}loop_->loop(); }eventloop* loopthread::getloop(){eventloop *ep=nullptr;{std::unique_lock<std::mutex> lock(mx_);cv_.wait(lock,[&](){return loop_!=nullptr;});ep=loop_;}return ep; }
-
looptpool类
:#include "looptpool.h" #include "eventloop.h" #include "loopthread.h"using namespace moon;looptpool::looptpool(eventloop *base) :baseloop_(base){ };looptpool::~looptpool(){stop();for(auto &t:loadvec_){delete t->getbaseloop();}loadvec_.clear();t_num=0; }void looptpool::init_pool(int timeout){timeout_=timeout;for(int i=0;i<t_num;++i){loopthread* lt=new loopthread(timeout);loadvec_.emplace_back(lt->getloop());} }void looptpool::create_pool(int n,int timeout){settnum(n);init_pool(timeout); }eventloop* looptpool::ev_dispatch(){if(t_num==0) return baseloop_;next_=(next_+1)%t_num;return loadvec_[next_]; }void looptpool::addloop(){loopthread* lt=new loopthread(timeout_);loadvec_.emplace_back(lt->getloop());++t_num; }void looptpool::stop() {for(auto& ep : loadvec_) {ep->loopbreak();ep->getbaseloop()->join();} }
-
acceptor类
:#include "acceptor.h" #include "event.h" #include "eventloop.h" #include "wrap.h"using namespace moon;acceptor::acceptor(int port,eventloop *base) :loop_(base),shutdown_(true){if(port!=-1)init_sock(port); }acceptor::~acceptor(){stop();delete ev_;close(lfd_); }void acceptor::init_sock(int port){int fd=Socket(AF_INET,SOCK_STREAM,0);setnonblock(fd);setreuse(fd);struct sockaddr_in ser_addr;bzero(&ser_addr,sizeof(ser_addr));ser_addr.sin_family=AF_INET;ser_addr.sin_addr.s_addr=INADDR_ANY;ser_addr.sin_port=htons(port);Bind(fd,(struct sockaddr*)&ser_addr,sizeof(ser_addr));Listen(fd,SOMAXCONN);lfd_=fd; }void acceptor::setcb(const Callback &accept_cb){cb_=accept_cb; }void acceptor::handle_accept(){struct sockaddr_in cli_addr;socklen_t cli_len = sizeof(cli_addr);bzero(&cli_addr,cli_len);while(true){int cfd=accept(lfd_,(struct sockaddr*)&cli_addr,&cli_len);if (cfd == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {break; // 没有更多的连接} else {perror("accept");break;}}setnonblock(cfd);settcpnodelay(cfd);if(cb_) cb_(cfd);}}void acceptor::listen(){if(!shutdown_) return;ev_=new event(loop_,lfd_,EPOLLIN|EPOLLET);ev_->setcb(std::bind(&acceptor::handle_accept,this),NULL,NULL);loop_->add_event(ev_);shutdown_=false; }void acceptor::stop(){if(shutdown_) return;ev_->del_listen();shutdown_=true; }
-
wrap.cpp
:#include "wrap.h"void perr_exit(const char *s){perror(s);exit(1); }void settcpnodelay(int fd){int opt=1;int ret=setsockopt(fd,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));if(-1==ret){perr_exit("setsockopt tcp nodelay error");} }void setreuse(int fd){int opt=1;int ret=setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));if(-1==ret){perr_exit("setsockopt ipreuse error");}ret=setsockopt(fd,SOL_SOCKET,SO_REUSEPORT,&opt,sizeof(opt));if(-1==ret){perr_exit("setsockopt portreuse error");} }void setnonblock(int fd){fcntl(fd,F_SETFL, fcntl(fd,F_GETFL)|O_NONBLOCK); }int Bind(int fd, const struct sockaddr *sa, socklen_t salen){int n;if((n=bind(fd,sa,salen))<0){perr_exit("bind error");}return n; }int Listen(int fd, int backlog){int n;if((n=listen(fd,backlog))<0){perr_exit("listen error");return -1;}return n; }int Socket(int family, int type, int protocol){int n;if((n=socket(family,type,protocol))<0){perr_exit("socket error");return -1;}return n; }
使用示例
使用示例:使用这些组件实现一个简单的主从reactor多线程模型的tcp回声服务器
#include "eventloop.h"
#include "event.h"
#include "acceptor.h"
#include "looptpool.h"
#include "wrap.h"
#include <iostream>
#include <unistd.h>
#include <cstring>using namespace moon;void handle_read(event* conn_event){char buf[1024];int fd = conn_event->getfd();while (true) {ssize_t n = read(fd, buf, sizeof(buf));if (n > 0) {// 回显数据给客户端ssize_t sent = 0;while (sent < n) {ssize_t m = write(fd, buf + sent, n - sent);if (m > 0) {sent += m;} else if (m == -1 && errno == EAGAIN) {// 写缓冲区已满,稍后再试break;} else {// 发生错误,关闭连接perror("write error");conn_event->del_listen();delete conn_event;close(fd);return;}}} else if (n == 0) {// 客户端关闭连接std::cout << "Client disconnected, fd: " << fd << std::endl;conn_event->del_listen();delete conn_event;close(fd);return;} else {if (errno == EAGAIN || errno == EWOULDBLOCK) {// 数据读取完毕break;} else {// 发生错误,关闭连接perror("read error");conn_event->del_listen();delete conn_event;close(fd);return;}}}
}int main() {// 定义监听端口int port = 5005; // 可以根据需要更改端口号// 定义工作线程数量int thread_num = 4; // 根据需要调整线程数量// 创建主事件循环(主 Reactor)eventloop *main_loop = new moon::eventloop();// 创建监听器(acceptor),设置监听端口和主事件循环acceptor *acceptor = new moon::acceptor(port, main_loop);// 创建事件循环线程池(从 Reactor)looptpool *loop_pool = new moon::looptpool(main_loop);loop_pool->create_pool(thread_num);// 设置 acceptor 的回调函数,当有新连接时调用acceptor->setcb([loop_pool](int fd) {// 从线程池中获取一个工作事件循环eventloop *worker_loop = loop_pool->ev_dispatch();// 在工作事件循环中为客户端连接创建一个事件event *conn_event = new event(worker_loop, fd, EPOLLIN | EPOLLET);// 设置读事件的回调函数,实现回声功能conn_event->setrcb(std::bind(handle_read,conn_event));// 将事件添加到工作事件循环中worker_loop->add_event(conn_event);});// 开始监听acceptor->listen();// 运行主事件循环main_loop->loop();// 清理资源delete acceptor;delete loop_pool;delete main_loop;return 0;
}
MoonNet网络库
MoonNet
:MoonNet 是一个专注于基于 Reactor 的高性能服务端网络库,提供基于主从 Reactor 多线程模型的服务端模块。它利用 Linux 的 epoll
机制,结合多线程和事件驱动设计,提供高效、可扩展的网络通信能力。MoonNet 支持 TCP 和 UDP 协议,并内置信号处理和定时器功能,适用于构建高并发、低延迟的服务器应用。
关于:MoonNet是我实现的高性能专注于linux服务端的网络库,该库将扩展以上讲的所有内容和代码,可供大家学习,如果感兴趣和喜欢的可以给我点个star
地址
:https://github.com/MoonforDream/MoonNet