Reactor模型

ops/2024/12/25 2:02:59/

Reactor_0">Reactor模型

引言

概念

  • Reactor模型:又称为反应堆模型,它是一种基于事件驱动I/O多路复用的设计模式,常用于处理大量并发I/O事件,是一个高性能模型,它通过事件驱动的方式,高效地管理大量并发连接,实现了对事件的同步解耦和高效处理,广泛应用于网络服务器的开发中
  • 事件:事件就是类似欲I/O操作中的accept、connect、read、write等等不同的事件
  • 事件驱动:不再是检测发生的事件类型调用对应的函数,而是通过提前注册好事件以及对应的回调函数,当事件发生时调用其回调函数,由事件来主导处理
  • Reactor模型也分Reactor单线程模型Reactor多线程模型Reactor多线程模型(主从Reactor多线程模型)Reactor多进程模型(主从reactor多进程模型)等等
  • 本文只讲解单Reactor多线程模型和主从Reactor多线程模型,单Reactor单线程可参考Moon’s Blog

区别

模型特点优点缺点
Reactor单线程Reactor、事件处理器都在同一线程中运行,适用于连接数较少、业务处理简单的场景实现简单,数据无需在线程间传递,线程安全性高无法利用多核CPU,性能有限;当事件处理器执行耗时操作时,会阻塞整个事件循环
Reactor多线程Reactor在主线程中运行,负责事件检测和分发,事件处理器在工作线程中运行,通常使用线程池主线程专注于事件检测和分发,提高响应效率;工作线程并发处理事件,充分利用多核CPU实现相对复杂;当事件量较大时,Reactor线程可能会成为性能瓶颈
主从reactor多线程Reactor线程负责接受新连接,将连接分配给从Reactor线程;Reactor线程各自运行事件循环,处理各自负责的连接的I/O事件主线程和工作线程职责分离,性能更高;更适合高并发、大流量的场景实现复杂度更高;多个Reactor线程会消耗更多的系统资源,可能会导致资源利用率下降

Reactor_64">单Reactor多线程模型

概念

概念

  • Reactor多线程模型:在单Reactor单线程的模型增加了一个线程池,Reactor线程只接受网络I/O和连接请求,其他事件交给线程池的工作线程处理

具体流程

  1. 启动Reactor线程,将acceptor事件注册在I/O多路复用模型(select、epoll等)中
  2. Reactor线程监听客户端请求,处理连接请求,并将客户端事件(也就是handler)注册到I/O多路复用模型
  3. 当客户端事件发生时,将数据收发或者其他事件,放入任务队列(也就是交给工作线程)
  4. 最终将业务事件交给线程池中的工作线程处理

注意:Acceptor和handler组件可以抽象成一个事件类,因为他们其实都是事件的回调函数

单<a class=Reactor多线程模型" />

实现细节

  1. 实现一个基于非阻塞ET模式EPOLL的事件管理类(Reactor类)
  2. 将Acceptor和handler组件抽象成一个监听事件对象和业务处理事件对象,因此只需要实现一个事件类即可
  3. 将事件的属性信息作为成员封装成事件类
  4. 实现线程池并封装成类,将一个线程池作为对象放入Reactor类(用于将分发事件)
  5. 初始化监听事件对象(也就是Acceptor,可以像libevent库一样封装成类),将其注册在事件管理类(Reactor)中
  6. 启动Reactor事件循环,Acceptor将发起连接请求的客户端等信息初始化一个事件对象,然后注册到Reactor中,如果是断开连接请求,就将其从Reactor中删除
  7. 当客户端发送数据来时,事件被触发,Reactor将其事件对象中的回调函数,加入到任务队列中,分发给工作线程处理

组件实现

组件实现

  1. Reactor

    //提前定义的事件类(将acceptor和handler可抽象成一个事件类)
    #define MAX_EVENTS 65535
    #define MAX_EPOLL_TIMEOUT_MSEC (35*60*1000)class Event;class EventBase {
    public:EventBase();~EventBase();void add_event(Event* ev);	//注册事件到epoll中void del_event(Event* ev);	//删除事件到epoll中void dispatch(struct timeval *tv); // 等同于libevent的event_base_dispatchvoid loopbreak(); // 等同于libevent的event_base_loopbreakEvent* timer_event_new(Callback cb,void *ctx,int flag,int timeout_ms);   //创建并初始化新定时事件private:int epfd_;	//epoll文件描述符Threadpool *tpool;	//线程池bool shutdown_;		//终止变量struct epoll_event events_[MAX_EVENTS];	//存储事件发生数组
    };
    
  2. 事件类

    class Event {
    public:using Callback=std::function<void(Event*,void*)>;	//定义回调函数类型Event(EventBase* base, int fd, uint32_t events, Callback cb,void *ctx);	//事件初始化~Event();EventBase* getbase();	//获取其处于的Reactor类对象指针int getfd() const;		//获取事件文件描述符uint32_t getevents() const;		//获取事件类型void handle_cb();		//调用回调函数//判断事件是否是定时事件bool is_timer() const;private:EventBase* base_;	//所处于的Reactor类对象指针int fd_;	//事件文件描述符uint32_t events_;	//事件类型Callback cb_;	//事件发生要调用的回调函数void *ctx_;		//回调函数参数bool is_timer_; //定时事件判断位
    };
    
  3. 线程池类:

    class Threadpool{
    public:Threadpool(int num):threads(std::vector<std::thread>(num)),min_thr_num(num),live_num(num){init();}~Threadpool(){t_shutdown();}//初始化线程池void init();//销毁线程池并释放资源void t_shutdown();//各任务线程入口函数void t_task();//管理线程入口函数void adjust_task();//添加任务template<typename _Fn,typename... _Args>void add_task(_Fn&& fn,_Args&&... args){{auto f=std::bind(std::forward<_Fn>(fn),std::forward<_Args>(args)...);{std::unique_lock<std::mutex> lock(mx);if(shutdown) return;tasks.emplace(std::move(f));}task_cv.notify_one();}}
    private:std::thread adjust_thr;     //管理线程std::vector<std::thread> threads;   //线程数组std::queue<std::function<void()>> tasks;    //任务队列std::mutex mx;  //线程池锁std::condition_variable task_cv;    //任务通知条件变量int min_thr_num=0;    //线程池最小线程数int max_thr_num=0;     //cpu核数的2n+1std::atomic<int> run_num=0; //线程池中正在执行任务的线程数std::atomic<int> live_num=0; //线程池空闲线程数std::atomic<int> exit_num=0; //线程池要销毁线程数bool shutdown=false;  //线程池状态,true为运行,false为关闭
    };
    
  4. 接口定义:类似于libevent库中的接口提供,可以将以上类接口放入到私有成员,然后让用户使用以下列出的接口即可

    //创建Reactor对象指针
    EventBase* event_base_new();// 开始事件循环
    void event_base_dispatch(EventBase* base,struct timeval *tv);// 停止事件循环
    void event_base_loopbreak(EventBase* base);// 创建新的事件
    Event* event_new(EventBase* base, int fd, uint32_t events, Event::Callback cb,void *ctx);// 添加事件
    void event_add(EventBase *base,Event* ev);// 删除事件
    void event_del(EventBase *base,Event* ev);// 释放事件
    void event_free(Event* ev);// 释放事件循环
    void event_base_free(EventBase* base);//提供创建定时事件
    Event *event_timer_new(EventBase *base,EventBase::Callback cb,void *ctx,int flag,int timeout_ms);//判断事件是否是定时事件
    bool is_event_timer(Event *ev);
    

注:线程池实现可参考本博客Moon’s Linux网络编程或Moon’s C++多线程编程

具体实现

具体实现

  1. Reactor类EventBase

    EventBase::EventBase() : epfd_(epoll_create1(0)), shutdown_(false) {//获取当前系统cpu数unsigned int cpu_cores = std::thread::hardware_concurrency();if (cpu_cores == 0) {cpu_cores = 4; // 默认值}tpool=new Threadpool(cpu_cores+1);if (epfd_ == -1) {perror("epoll_create1");exit(EXIT_FAILURE);}
    }//需要清理线程池和关闭epoll
    EventBase::~EventBase() {delete tpool;close(epfd_);
    }//注册事件(向epoll中添加)
    void EventBase::add_event(Event* event) {int fd = event->getfd();epoll_event ev;ev.data.ptr=event;ev.events = event->getevents();if (epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev) == -1) {perror("epoll_ctl: add");exit(EXIT_FAILURE);}
    }//删除事件(从epoll中删除)
    void EventBase::del_event(Event* event) {int fd = event->getfd();if (epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr) == -1) {perror("epoll_ctl: del");}}//事件循环,分发事件(将事件放入线程池中的任务队列)
    void EventBase::dispatch(struct timeval *tv) {//epoll_event epoll_events[MAX_EVENTS];int timeout=-1;if(tv!=nullptr){timeout = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000;}if(timeout>MAX_EPOLL_TIMEOUT_MSEC){timeout=MAX_EPOLL_TIMEOUT_MSEC;}while (!shutdown_) {//根据timeout设置检测间隔int n = epoll_wait(epfd_, events_, MAX_EVENTS, timeout);if (n == -1) {perror("epoll_wait");break;}for (int i = 0; i < n; ++i) {auto ev=static_cast<Event*>(events_[i].data.ptr);if(ev){tpool->add_task([&]{ev->handle_cb();});}}}
    }//创建并初始化新定时事件
    Event* EventBase::timer_event_new(Callback cb,void *ctx,int flag,int timeout_ms){//使用Linux提供的定时器接口,可以结合epoll使用int fd=timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK);if(-1==fd){perror("timerfd_create error");return nullptr;}itimerspec tvalue;//it_value设置定时器第一次到期时间tvalue.it_value.tv_sec=timeout_ms/1000; //初始化到期秒数tvalue.it_value.tv_nsec=(timeout_ms%1000)*1000000;  //初始化到期纳秒数tvalue.it_interval={0,0};//it_interval设置定时器第一次之后每次到期的间隔时间,设置为0,定时器只会触发一次,非0为周期性触发if(flag){tvalue.it_interval.tv_sec=timeout_ms/1000;  //间隔时间秒数tvalue.it_interval.tv_nsec=(timeout_ms%1000)*1000000;   //间隔时间的纳秒数}if(timerfd_settime(fd,0,&tvalue,NULL)==-1){perror("timerfd_settime error");close(fd);return nullptr;}return new Event(this,fd,EPOLLIN,cb,ctx,true);
    }//终止事件循环
    void EventBase::loopbreak() {shutdown_ = false;
    }
    
  2. 事件类Event

    Event::Event(EventBase* base, int fd, uint32_t events, Callback cb,void *ctx,bool istimer): base_(base), fd_(fd), events_(events), cb_(cb),ctx_(ctx),is_timer_(istimer){//初始化就将事件放入对应的EventBase类对象中base_->add_event(this);
    }//析构自动调用删除事件操作
    Event::~Event() {base_->del_event(this);
    }//获取其EventBase类对象
    EventBase* Event::getbase(){return base_;
    }//获取事件文件描述符
    int Event::getfd() const {return fd_;
    }//获取事件的事件类型
    uint32_t Event::getevents() const {return events_;
    }//调用事件回调函数
    void Event::handle_cb() {if (cb_) {cb_(this,ctx_);}
    }//获取定时事件判断位
    bool Event::is_timer() const{return is_timer_;
    }
    
  3. 线程池类Threadpool

    #include "Threadpool.h"
    #include <chrono>
    #include <mutex>
    #include <iostream>
    #include <unistd.h>void Threadpool::init(){unsigned int cpu_cores = std::thread::hardware_concurrency()/2;if(cpu_cores==0) cpu_cores=4;max_thr_num=2*cpu_cores+1;adjust_thr=std::thread([this]{this->adjust_task();});for(int i=0;i<min_thr_num;++i){threads.emplace_back([this]{this->t_task();});}
    }void Threadpool::t_task(){while (1) {std::unique_lock<std::mutex> lock(mx);task_cv.wait(lock,[this]{return !tasks.empty()||shutdown||exit_num>0;});if(exit_num>0){exit_num--;return;}if(shutdown&&tasks.empty()){return;}auto task=tasks.front();tasks.pop();lock.unlock();++run_num;--live_num;task();++live_num;--run_num;std::this_thread::sleep_for(std::chrono::seconds(1));}
    }void Threadpool::t_shutdown(){{std::unique_lock<std::mutex> lock(mx);shutdown=true;}adjust_thr.detach();task_cv.notify_all();for(auto& t:threads){if(t.joinable()) t.join();}
    }void Threadpool::adjust_task(){while (!shutdown) {std::this_thread::sleep_for(std::chrono::seconds(DEFAULT_TIME));{int size=threads.size();if (tasks.size() > live_num && live_num < max_thr_num&&size<max_thr_num) {int add = 0;std::unique_lock<std::mutex> lock(mx);for (int i = size; i < max_thr_num && add < 10; ++i) {threads.emplace_back([this] {this->t_task();});add++;live_num++;}lock.unlock();std::cout << "线程池扩容成功!" << std::endl;}if (run_num * 2 < live_num && live_num > min_thr_num) {exit_num=live_num-min_thr_num>=10?10:live_num-min_thr_num;int x=exit_num;std::unique_lock<std::mutex> lock(mx);for (int i = 0; i < x; ++i) {task_cv.notify_one();live_num--;}lock.unlock();std::cout << "线程池瘦身成功!" << std::endl;}}}
    }
    
  4. 接口实现

    // 创建新的事件循环
    EventBase* event_base_new() {return new EventBase();
    }// 开始事件循环
    void event_base_dispatch(EventBase* base,struct timeval *tv) {base->dispatch(tv);
    }// 停止事件循环
    void event_base_loopbreak(EventBase* base) {base->loopbreak();
    }// 创建新的事件
    Event* event_new(EventBase* base, int fd, uint32_t events, Event::Callback cb,void *ctx) {return new Event(base, fd, events, cb,ctx);
    }// 添加事件
    void event_add(EventBase *base,Event* ev) {// 在Event的构造函数中已经添加base->add_event(ev);
    }// 删除事件
    void event_del(EventBase *base,Event* ev) {// 在Event的析构函数中已经删除base->del_event(ev);
    }// 释放事件
    void event_free(Event* ev) {delete ev;
    }// 释放事件循环
    void event_base_free(EventBase* base) {delete base;
    }//提供创建定时事件接口
    Event *event_timer_new(EventBase *base,EventBase::Callback cb,void *ctx,int flag,int timeout_ms){return base->timer_event_new(cb,ctx,flag,timeout_ms);
    }//判断事件是否是定时事件
    bool is_event_timer(Event *ev){return ev->is_timer();
    }
    

使用示例

使用示例

使用以上实现的接口写一个回声服务器(我将其封装到了event.h文件中)

#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include "event.h"void setnonblock(int fd){fcntl(fd,F_SETFL, fcntl(fd,F_GETFL)|O_NONBLOCK);
}void readCallback(Event *ev,void *args){char buffer[1024];int fd=ev->getfd();while (true) {ssize_t n = read(fd, buffer, sizeof(buffer));if (n > 0) {write(fd, buffer, n); // 回显} else if (n == 0) {std::cout << "Client disconnected: " << fd << std::endl;event_free(ev);close(fd);break;} else {if (errno == EAGAIN || errno == EWOULDBLOCK) {break; // 数据读取完毕} else {perror("read");event_free(ev);close(fd);break;}}}
}void acceptCallback(Event *ev,void *args) {EventBase* base=ev->getbase();int fd=ev->getfd();while (true) {sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);int client_fd = accept(fd, (sockaddr*)&client_addr, &client_len);if (client_fd == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {break; // 没有更多的连接} else {perror("accept");break;}}std::cout << "New connection: " << client_fd << std::endl;setnonblock(client_fd);Event* client_event = event_new(base, client_fd, EPOLLIN|EPOLLET, readCallback,nullptr);}
}void time_hello_cb(Event *ev,void *args){//用来取出定时器到期次数uint64_t expirations;ssize_t s = read(ev->getfd(), &expirations, sizeof(uint64_t));if (s==-1) {//对于非阻塞处理if (errno == EAGAIN || errno == EWOULDBLOCK) {return;} else {perror("read");exit(EXIT_FAILURE);}}std::cout<<expirations<<":hello timer_event"<<std::endl;
}void time_world_cb(Event *ev,void *args){//用来取出定时器到期次数uint64_t expirations;ssize_t s = read(ev->getfd(), &expirations, sizeof(uint64_t));if (s==-1) {//对于非阻塞处理if (errno == EAGAIN || errno == EWOULDBLOCK) {return;} else {perror("read");exit(EXIT_FAILURE);}}std::cout<<expirations<<":world timer_event"<<std::endl;
}int main() {int listen_fd = socket(AF_INET, SOCK_STREAM, 0);setnonblock(listen_fd);setreuse(listen_fd);sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(5005);addr.sin_addr.s_addr = INADDR_ANY;Bind(listen_fd, (sockaddr*)&addr, sizeof(addr));listen(listen_fd, SOMAXCONN);EventBase* base = event_base_new();Event* listen_event = event_new(base, listen_fd, EPOLLIN|EPOLLET, acceptCallback,nullptr);//5秒触发一次Event* timer_event=event_timer_new(base,time_hello_cb,nullptr,1,5000);//5s后只触发一次Event* timer_ev=event_timer_new(base,time_world_cb,nullptr,0,5000);//开启循环struct timeval tv;tv.tv_sec=1;   //设置秒数tv.tv_usec=0;  //设置微妙event_base_dispatch(base,&tv);//释放event_free(timer_event);event_free(timer_ev);event_free(listen_event);event_base_free(base);return 0;
}

Reactor_746">主从Reactor多线程模型

概念

概念

  • 主从Reactor多线程模型:该模型是将职能划分给主Reactor和从Reactor(工作线程),Reactor负责监听连接事件(不进行任何I/O处理和其他实质性事件),并将新连接对象和事件分发给从ReactorReactor负责处理和监听主Reactor分发的事件,这样能够最大限度提高接收连接的响应速度,避免被一些I/O或者其他事件操作所阻塞。
  • one loop one thread:这是一个常见的高性能网络编程的思想与架构(可见陈硕大佬的muduo库),意思就是一个事件循环(Reactor)对应着一个线程,也就是每一个线程都有一个属于自己的事件循环(Reactor),这个架构大大避免锁竞争、频繁上下文切换开销等问题,并充分利用多核 CPU 的性能
  • 本文也将借鉴muduo库,来实现主从Reactor多线程模型
one loop one thread:+-----------------+|   主线程(Acceptor)|+-----------------+|+-------------+-------------+|             |             |
+---------------+ +---------------+ +---------------+
| 工作线程1     | | 工作线程2     | | 工作线程3     |
| (Event Loop) | | (Event Loop) | | (Event Loop) |
+---------------+ +---------------+ +---------------+

具体架构

  1. 首先不管是主Reactor和从Reactor都是一个由事件循环类实现的
  2. 但是主Reactor不止有事件循环的职能,还需要有监听连接事件的类,也就是Acceptor类
  3. 因为one loop one thread,我们需要将从reactor与线程进行做一个封装,让主Reactor使用线程池更好管理,所以我们需要一个事件循环线程类
  4. Reactor还有事件分发的职能,因此应该需要主Reactor来管理从Reactor并将事件分发,因此我们需要一个事件循环线程池
  5. 最后还有个最基础的事件类(用于整合文件描述符、注册事件以及对应回调函数)

注:

  • 事件循环eventloop:也就是封装了select、poll或者epoll作为事件循环监听的基础,然后管理所监听的事件,监听事件触发,调用其对应的回调函数处理
  • Acceptor:用于封装监听套接字和其处理连接请求的回调函数的类
  • 事件循环线程loopthread:用于封装thread和事件循环的类
  • 事件循环线程池loopthreadpool:用于管理从Reactor(事件循环)的线程池,一个线程对应一个从Reactor

具体流程

  1. 将acceptor事件注册到主Reactor
  2. 初始化事件循环线程池,各事件循环线程启动事件循环
  3. 启动主Reactor事件循环,循环监听acceptor事件的连接请求
  4. 触发acceptor事件,调用创建连接回调函数,将新连接分发给从Reactor
  5. Reactor将分发下来的新连接事件注册到事件循环中
  6. 当事件触发,从Reactor进行处理

组件实现

组件实现

  1. eveneloop(Reactor类)/事件循环类

    #ifndef _EVENTLOOP_H_
    #define _EVENTLOOP_H_#include "wrap.h"
    #include <atomic>
    #include <cstdlib>
    #include <iostream>
    #include <functional>
    #include <vector>
    #include <list>
    #include <unistd.h>
    #include <strings.h>
    #include <sys/epoll.h>
    #include <sys/eventfd.h>
    #include <sys/timerfd.h>#define MAX_EVENTS 65536namespace moon {class base_event;
    class event;
    class loopthread;//reactor类-->事件循环类
    class eventloop{
    public:using Callback=std::function<void()>;eventloop(loopthread* base=nullptr,int timeout=-1);~eventloop();loopthread* getbaseloop();int getefd() const;int getevfd() const;//事件控制函数void add_event(event* event);void del_event(event* event);void mod_event(event* event);void loop();void loopbreak();void create_eventfd();     //创建通知文件描述符void read_eventfd();        //通知文件描述符回调函数void write_eventfd();       //唤醒通知事件,用于终止服务通知void add_pending_del(base_event* ev);private:int epfd_;      //epoll文件描述符int eventfd_;   //通知文件描述符int timeout_=-1;std::atomic<bool> shutdown_;std::list<event*> evlist_;std::vector<epoll_event> events_;std::vector<base_event*> delque_;   //待删除事件队列loopthread *baseloop_;
    };
    }#endif 
    
  2. event类/事件类

    #ifndef _EVENT_H_
    #define _EVENT_H_#include "wrap.h"
    #include <atomic>
    #include <cstdlib>
    #include <iostream>
    #include <functional>
    #include <list>
    #include <unistd.h>
    #include <strings.h>
    #include <sys/epoll.h>
    #include <sys/eventfd.h>
    #include <sys/timerfd.h>namespace moon{class eventloop;//事件类
    class event{
    public:// using Callback=std::function<void(event*)>;using Callback=std::function<void()>;event(eventloop *base,int fd,uint32_t events);~event();int getfd() const;  //获取事件文件描述符uint32_t getevents() const;     //获取获取监听事件类型eventloop* getloop() const;//设置回调函数,不需要传null即可void setcb(const Callback &rcb,const Callback &wcb,const Callback &ecb);void setrcb(const Callback &rcb);void setwcb(const Callback &wcb);void setecb(const Callback &ecb);Callback getrcb();Callback getwcb();Callback getecb();void setrevents(const uint32_t revents);    //设置触发事件类型void enable_events(uint32_t op);  //添加监听事件类型void disable_events(uint32_t op); //取消监听事件类型void update_ep();   //更新监听事件void handle_cb();   //处理事件回调函数bool readable();bool writeable();void enable_read();void disable_read();void enable_write();void disable_write();void enable_ET();void disable_ET();void reset_events();void del_listen();  //取消监听void enable_listen();   //开启监听void disable_cb();void close(){del_listen();}
    private:eventloop *loop_;int fd_;uint32_t events_;   //监听事件uint32_t revents_;  //触发事件Callback readcb_;   //读事件回调函数Callback writecb_;  //写事件回调函数Callback eventcb_;  //出了读写事件的其他事件触发回调函数,用作错误事件回调
    };}#endif
    
  3. loopthread类/事件循环线程类

    #ifndef _LOOPTHREAD_H_
    #define _LOOPTHREAD_H_#include <condition_variable>
    #include <mutex>
    #include <thread>#define MAX_EPOLL_TIMEOUT_MSEC (35*60*1000)namespace moon {class eventloop;class loopthread{
    public:loopthread(int timeout=-1):loop_(nullptr),t_(std::thread(&loopthread::_init_,this)),timeout_(timeout){if(timeout_>MAX_EPOLL_TIMEOUT_MSEC){timeout_=MAX_EPOLL_TIMEOUT_MSEC;}}~loopthread();void _init_();  //初始化eventloop* getloop();   //获取loop_void join(){if(t_.joinable()) t_.join();}
    private:eventloop *loop_;std::mutex mx_;std::condition_variable cv_;std::thread t_;int timeout_=-1;     //设置epoll间隔检测时间ms
    };}#endif // !_LOOPTHREAD_H_
    
  4. looptpool类/事件循环线程池类

    #ifndef _LOOPTPOOL_H_
    #define _LOOPTPOOL_H_#include <thread>
    #include <vector>
    #include <set>namespace moon {class eventloop;
    class loopthread;class looptpool{
    public:looptpool(eventloop *base);~looptpool();void create_pool(int n,int timeout=-1);   //初始化线程池eventloop* ev_dispatch();   //分发事件looptpool(const looptpool&)=delete;looptpool& operator=(const looptpool&)=delete;void addloop(); //添加事件循环void stop();    //终止运行
    private:void init_pool(int timeout=-1);//设置线程池线程数void settnum(int n){t_num=n;}private:eventloop *baseloop_;std::vector<eventloop*> loadvec_;int next_=0;int t_num=0;int timeout_=-1;
    };}#endif
    
  5. acceptor类/监听者类

    #ifndef _ACCEPTOR_H_
    #define _ACCEPTOR_H_#include <functional>namespace moon {class eventloop;
    class event;//监听连接类
    class acceptor{
    public:using Callback=std::function<void(int)>;acceptor(int port,eventloop *base);~acceptor();void listen();  //开始监听void stop();    //停止监听void init_sock(int port);    //建立监听套接字void setcb(const Callback &accept_cb); //设置回调函数void handle_accept();       //acceptor事件回调函数,用来接收连接
    private:int lfd_;eventloop *loop_;event *ev_;Callback cb_;   //连接后回调函数bool shutdown_;
    };}#endif
    
  6. 工具头文件/wrap.h实现

    #ifndef _WRAP_H_
    #define _WRAP_H_#include <netinet/in.h>
    #include <cstddef>
    #include <cstdlib>
    #include <netinet/tcp.h>
    #include <fcntl.h>
    #include <cstdio>void perr_exit(const char *s);
    void settcpnodelay(int fd);
    void setreuse(int fd);
    void setnonblock(int fd);
    int Bind(int fd, const struct sockaddr *sa, socklen_t salen);
    int Listen(int fd, int backlog);
    int Socket(int family, int type, int protocol);#endif
    

具体实现

具体实现

  1. eventloop类

    #include "eventloop.h"
    #include "event.h"
    #include <algorithm>using namespace moon;eventloop::eventloop(loopthread* base,int timeout)
    :baseloop_(base),timeout_(timeout),shutdown_(false),events_(MAX_EVENTS){epfd_=epoll_create1(0);if(-1==epfd_){perror("epoll_create1");exit(EXIT_FAILURE);}create_eventfd();
    }eventloop::~eventloop(){loopbreak();for(auto &ev : evlist_){delete ev;}for(auto &ev:delque_){delete ev;}evlist_.clear();delque_.clear();close(epfd_);close(eventfd_);
    }loopthread* eventloop::getbaseloop(){if(!baseloop_) return nullptr;return baseloop_;
    }int eventloop::getefd() const{return epfd_;
    }int eventloop::getevfd() const{return eventfd_;
    }void eventloop::add_event(event* event){if(std::find(evlist_.begin(), evlist_.end(),event)!=evlist_.end()){mod_event(event);return;}int fd=event->getfd();struct epoll_event ev;ev.data.ptr=event;//ev.data.fd=fd;ev.events=event->getevents();if(epoll_ctl(epfd_,EPOLL_CTL_ADD,fd,&ev)==-1){perror("epoll_ctl add error");}evlist_.emplace_back(event);
    }void eventloop::del_event(event* event){if(std::find(evlist_.begin(), evlist_.end(),event)==evlist_.end()) return;int fd=event->getfd();if(epoll_ctl(epfd_,EPOLL_CTL_DEL,fd,nullptr)==-1){perror("epoll_ctl del error");}evlist_.remove(event);
    }void eventloop::mod_event(event* event){if(std::find(evlist_.begin(), evlist_.end(),event)==evlist_.end()) return;int fd=event->getfd();struct epoll_event ev;ev.data.ptr=event;ev.events=event->getevents();if(epoll_ctl(epfd_,EPOLL_CTL_MOD,fd,&ev)==-1){perror("epoll_ctl modify error");}
    }void eventloop::loop(){while (!shutdown_) {int n=epoll_wait(epfd_,events_.data(),MAX_EVENTS,timeout_);if(-1==n){if(errno==EINTR) continue;perror("epoll_wait");break;}for(int i=0;i<n;++i){auto ev=static_cast<event*>(events_[i].data.ptr);ev->setrevents(events_[i].events);ev->handle_cb();}if(n==events_.size()){events_.resize(events_.size() * 2);}if(!delque_.empty()){for(auto ev : delque_){delete ev;}delque_.clear();}}
    }//终止事件循环
    void eventloop::loopbreak(){if(shutdown_) return;write_eventfd();  //唤醒
    }void eventloop::create_eventfd(){eventfd_=eventfd(0,EFD_CLOEXEC | EFD_NONBLOCK);if(eventfd_<0){perror("eventfd create error");exit(EXIT_FAILURE);}event *ev=new event(this,eventfd_,EPOLLIN);ev->setcb(std::bind(&eventloop::read_eventfd,this),NULL,NULL);add_event(ev);
    }void eventloop::read_eventfd(){uint64_t opt=1;ssize_t n=read(eventfd_,&opt,sizeof(opt));shutdown_=true;if(n<0){if(errno==EINTR || errno == EAGAIN){return;}perror("eventfd read error");exit(EXIT_FAILURE);}
    }void eventloop::write_eventfd(){uint64_t opt=1;ssize_t n=write(eventfd_,&opt,sizeof(opt));if(n<0){if(errno==EINTR){return;}perror("eventfd write error");exit(EXIT_FAILURE);}
    }void eventloop::add_pending_del(base_event *ev) {delque_.emplace_back(ev);
    }
    
  2. event类

    #include "event.h"
    #include "eventloop.h"using namespace moon;event::event(eventloop *base,int fd,uint32_t events):loop_(base),fd_(fd),events_(events){
    }event::~event(){
    }int event::getfd() const{return fd_;
    }uint32_t event::getevents() const{return events_;
    }eventloop* event::getloop() const{return loop_;
    }void event::setcb(const Callback &rcb,const Callback &wcb,const Callback &ecb){readcb_=rcb;writecb_=wcb;eventcb_=ecb;
    }void event::setrcb(const Callback &rcb){readcb_=rcb;
    }void event::setwcb(const Callback &wcb){writecb_=wcb;
    }void event::setecb(const Callback &ecb){eventcb_=ecb;
    }void event::setrevents(const uint32_t revents){revents_=revents;
    }void event::update_ep(){loop_->mod_event(this);
    }void event::enable_events(uint32_t op){events_|=op;update_ep();
    }void event::disable_events(uint32_t op){events_&=~op;update_ep();
    }bool event::readable(){return (events_&EPOLLIN);
    }bool event::writeable(){return (events_&EPOLLOUT);
    }void event::enable_read(){enable_events(EPOLLIN);
    }void event::disable_read(){disable_events(EPOLLIN);
    }void event::enable_write(){enable_events(EPOLLOUT);
    }void event::disable_write(){disable_events(EPOLLOUT);
    }void event::enable_ET(){enable_events(EPOLLET);
    }void event::disable_ET(){disable_events(EPOLLET);
    }void event::reset_events(){events_=0;update_ep();
    }void event::del_listen(){loop_->del_event(this);
    }void event::enable_listen(){loop_->add_event(this);
    }void event::disable_cb() {readcb_= nullptr;writecb_= nullptr;eventcb_= nullptr;
    }void event::handle_cb(){if((revents_ & EPOLLIN) || (revents_ & EPOLLRDHUP) || (revents_ & EPOLLPRI)){if(readcb_) readcb_();}if(revents_&EPOLLOUT){if(writecb_) writecb_();}if(revents_ & (EPOLLERR|EPOLLHUP)){if(eventcb_) eventcb_();}
    }event::Callback event::getrcb() {return readcb_;
    }event::Callback event::getwcb() {return writecb_;
    }event::Callback event::getecb() {return eventcb_;
    }
    
  3. loopthread类

    #include "loopthread.h"
    #include "eventloop.h"using namespace moon;loopthread::~loopthread(){if(loop_){loop_->loopbreak();if(t_.joinable()) t_.join();delete loop_;}else{if(t_.joinable()) t_.join();}
    }void loopthread::_init_(){eventloop *loop=new eventloop(this,timeout_);{std::unique_lock<std::mutex> lock(mx_);loop_=loop;cv_.notify_all();}loop_->loop();
    }eventloop* loopthread::getloop(){eventloop *ep=nullptr;{std::unique_lock<std::mutex> lock(mx_);cv_.wait(lock,[&](){return loop_!=nullptr;});ep=loop_;}return ep;
    }
    
  4. looptpool类

    #include "looptpool.h"
    #include "eventloop.h"
    #include "loopthread.h"using namespace moon;looptpool::looptpool(eventloop *base)
    :baseloop_(base){
    };looptpool::~looptpool(){stop();for(auto &t:loadvec_){delete t->getbaseloop();}loadvec_.clear();t_num=0;
    }void looptpool::init_pool(int timeout){timeout_=timeout;for(int i=0;i<t_num;++i){loopthread* lt=new loopthread(timeout);loadvec_.emplace_back(lt->getloop());}
    }void looptpool::create_pool(int n,int timeout){settnum(n);init_pool(timeout);
    }eventloop* looptpool::ev_dispatch(){if(t_num==0) return baseloop_;next_=(next_+1)%t_num;return loadvec_[next_];
    }void looptpool::addloop(){loopthread* lt=new loopthread(timeout_);loadvec_.emplace_back(lt->getloop());++t_num;
    }void looptpool::stop() {for(auto& ep : loadvec_) {ep->loopbreak();ep->getbaseloop()->join();}
    }
    
  5. acceptor类

    #include "acceptor.h"
    #include "event.h"
    #include "eventloop.h"
    #include "wrap.h"using namespace moon;acceptor::acceptor(int port,eventloop *base)
    :loop_(base),shutdown_(true){if(port!=-1)init_sock(port);
    }acceptor::~acceptor(){stop();delete ev_;close(lfd_);
    }void acceptor::init_sock(int port){int fd=Socket(AF_INET,SOCK_STREAM,0);setnonblock(fd);setreuse(fd);struct sockaddr_in ser_addr;bzero(&ser_addr,sizeof(ser_addr));ser_addr.sin_family=AF_INET;ser_addr.sin_addr.s_addr=INADDR_ANY;ser_addr.sin_port=htons(port);Bind(fd,(struct sockaddr*)&ser_addr,sizeof(ser_addr));Listen(fd,SOMAXCONN);lfd_=fd;
    }void acceptor::setcb(const Callback &accept_cb){cb_=accept_cb;
    }void acceptor::handle_accept(){struct sockaddr_in cli_addr;socklen_t cli_len = sizeof(cli_addr);bzero(&cli_addr,cli_len);while(true){int cfd=accept(lfd_,(struct sockaddr*)&cli_addr,&cli_len);if (cfd == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {break; // 没有更多的连接} else {perror("accept");break;}}setnonblock(cfd);settcpnodelay(cfd);if(cb_) cb_(cfd);}}void acceptor::listen(){if(!shutdown_) return;ev_=new event(loop_,lfd_,EPOLLIN|EPOLLET);ev_->setcb(std::bind(&acceptor::handle_accept,this),NULL,NULL);loop_->add_event(ev_);shutdown_=false;
    }void acceptor::stop(){if(shutdown_) return;ev_->del_listen();shutdown_=true;
    }
    
  6. wrap.cpp

    #include "wrap.h"void perr_exit(const char *s){perror(s);exit(1);
    }void settcpnodelay(int fd){int opt=1;int ret=setsockopt(fd,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));if(-1==ret){perr_exit("setsockopt tcp nodelay error");}
    }void setreuse(int fd){int opt=1;int ret=setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));if(-1==ret){perr_exit("setsockopt ipreuse error");}ret=setsockopt(fd,SOL_SOCKET,SO_REUSEPORT,&opt,sizeof(opt));if(-1==ret){perr_exit("setsockopt portreuse error");}
    }void setnonblock(int fd){fcntl(fd,F_SETFL, fcntl(fd,F_GETFL)|O_NONBLOCK);
    }int Bind(int fd, const struct sockaddr *sa, socklen_t salen){int n;if((n=bind(fd,sa,salen))<0){perr_exit("bind error");}return n;
    }int Listen(int fd, int backlog){int n;if((n=listen(fd,backlog))<0){perr_exit("listen error");return -1;}return n;
    }int Socket(int family, int type, int protocol){int n;if((n=socket(family,type,protocol))<0){perr_exit("socket error");return -1;}return n;
    }
    

使用示例

使用示例:使用这些组件实现一个简单的主从reactor多线程模型的tcp回声服务器

#include "eventloop.h"
#include "event.h"
#include "acceptor.h"
#include "looptpool.h"
#include "wrap.h"
#include <iostream>
#include <unistd.h>
#include <cstring>using namespace moon;void handle_read(event* conn_event){char buf[1024];int fd = conn_event->getfd();while (true) {ssize_t n = read(fd, buf, sizeof(buf));if (n > 0) {// 回显数据给客户端ssize_t sent = 0;while (sent < n) {ssize_t m = write(fd, buf + sent, n - sent);if (m > 0) {sent += m;} else if (m == -1 && errno == EAGAIN) {// 写缓冲区已满,稍后再试break;} else {// 发生错误,关闭连接perror("write error");conn_event->del_listen();delete conn_event;close(fd);return;}}} else if (n == 0) {// 客户端关闭连接std::cout << "Client disconnected, fd: " << fd << std::endl;conn_event->del_listen();delete conn_event;close(fd);return;} else {if (errno == EAGAIN || errno == EWOULDBLOCK) {// 数据读取完毕break;} else {// 发生错误,关闭连接perror("read error");conn_event->del_listen();delete conn_event;close(fd);return;}}}
}int main() {// 定义监听端口int port = 5005; // 可以根据需要更改端口号// 定义工作线程数量int thread_num = 4; // 根据需要调整线程数量// 创建主事件循环(主 Reactoreventloop *main_loop = new moon::eventloop();// 创建监听器(acceptor),设置监听端口和主事件循环acceptor *acceptor = new moon::acceptor(port, main_loop);// 创建事件循环线程池(从 Reactorlooptpool *loop_pool = new moon::looptpool(main_loop);loop_pool->create_pool(thread_num);// 设置 acceptor 的回调函数,当有新连接时调用acceptor->setcb([loop_pool](int fd) {// 从线程池中获取一个工作事件循环eventloop *worker_loop = loop_pool->ev_dispatch();// 在工作事件循环中为客户端连接创建一个事件event *conn_event = new event(worker_loop, fd, EPOLLIN | EPOLLET);// 设置读事件的回调函数,实现回声功能conn_event->setrcb(std::bind(handle_read,conn_event));// 将事件添加到工作事件循环中worker_loop->add_event(conn_event);});// 开始监听acceptor->listen();// 运行主事件循环main_loop->loop();// 清理资源delete acceptor;delete loop_pool;delete main_loop;return 0;
}

MoonNet网络库

MoonNetMoonNet 是一个专注于基于 Reactor 的高性能服务端网络库,提供基于主从 Reactor 多线程模型的服务端模块。它利用 Linuxepoll 机制,结合多线程和事件驱动设计,提供高效、可扩展的网络通信能力。MoonNet 支持 TCP 和 UDP 协议,并内置信号处理和定时器功能,适用于构建高并发、低延迟的服务器应用。

关于:MoonNet是我实现的高性能专注于linux服务端的网络库,该库将扩展以上讲的所有内容和代码,可供大家学习,如果感兴趣和喜欢的可以给我点个star

地址:https://github.com/MoonforDream/MoonNet


http://www.ppmy.cn/ops/130341.html

相关文章

parted 磁盘分区

目录 磁盘格式磁盘分区文件系统挂载使用扩展 - parted、fdisk、gdisk 区别 磁盘格式 parted /dev/vdcmklabel gpt # 设置磁盘格式为GPT p # 打印磁盘信息此时磁盘格式设置完成&#xff01; 磁盘分区 开始分区&#xff1a; mkpart data_mysql # 分区名&…

django图书管理系统-计算机毕业设计源码00648

摘要 图书管理系统在数字化阅读趋势、图书馆自动化管理、用户体验需求和信息技术应用等方面具有重要的研究意义。图书馆自动化管理系统的引入和应用提高了图书借阅过程的效率和准确性&#xff0c;减少了对手工操作和纸质记录的需求。用户对系统的易用性、查询速度、借还流程有更…

配置python环境

下载Anaconda 下载Python winR 输入cmd启动命令提示符 pip换源 pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple确认已经换源 pip config list打开Anaconda Prompt conda换源 conda config --add channels https://mirrors.tuna.tsinghua.edu.c…

ceph补充介绍

SDS-ceph ceph介绍 crushmap 1、crush算法通过计算数据存储位置来确定如何存储和检索&#xff0c;授权客户端直接连接osd 2、对象通过算法被切分成数据片&#xff0c;分布在不同的osd上 3、提供很多种的bucket&#xff0c;最小的节点是osd # 结构 osd (or device) host #主…

【TextIn:开源免费的AI智能文字识别产品(通用文档智能解析识别、OCR识别、文档格式转换、篡改检测、证件识别等)】

TextIn&#xff1a;开源免费的AI智能文字识别产品&#xff08;通用文档智能解析识别、OCR识别、文档格式转换、篡改检测、证件识别等&#xff09; 产品的官网&#xff1a;TextIn官网 希望感兴趣以及有需求的小伙伴们多多了解&#xff0c;因为这篇文章也是源于管网介绍才产出的…

使用Django REST framework构建RESTful API

使用Django REST framework构建RESTful API Django REST framework简介 安装Django REST framework 创建Django项目 创建Django应用 配置Django项目 创建模型 迁移数据库 创建序列化器 创建视图 配置URL 配置全局URL 配置认证和权限 测试API 使用Postman测试API 分页 过滤和排序…

Linux通过ifconfig命令ens33没有显示ip地址解决方式

一、问题&#xff1a;登录linux输入ifconfig, ether 00:0c:29:8f:a8:72 ens33: flags4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500ether 00:0c:29:55:f5:0b txqueuelen 1000 (Ethernet)RX packets 0 bytes 0 (0.0 B)RX errors 0 dropped 0 overruns 0 frame 0…

用Python设置、更新和获取Excel单元格的值

Excel工作簿作为一款广泛使用的数据管理工具&#xff0c;与Python相结合&#xff0c;可以使得自动化处理大量数据成为可能。通过Python来设置、更新以及读取Excel单元格的值&#xff0c;不仅可以极大地提高工作效率&#xff0c;减少重复劳动&#xff0c;还能增强数据处理流程的…