事件处理模式--reactor原理与实现

news/2024/11/2 19:30:48/

文章目录

      • reactor
      • api
      • code

reactor

reactor是是服务器的重要模型, 是一种事件驱动的反应堆模式
通过epoll_create() 创建句柄, epoll_ctrl()提前注册好不同的事件处理函数 , 当事件到来就由 epoll_wait () 获取同时到来的多个事件,并且根据数据的不同类型将事件分发给事件处理机制 (事件处理器),通过回调函数方式实现响应的功能(如创建客户端fd, 读/写IO)

优点:

  1. 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的;
  2. 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/ 进程的切换开销
  3. 可扩展性,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源
  4. 可复用性,reactor 框架本身与具体事件处理逻辑无关,具有很高的复用性

并发:
epoll 对应的主循环不用关心,主业务和网络部分隔离 ,如客户端发送的数据放到rbuffer中,由线程池处理rbuffer, 处理的结果放到wbuffer中,发送给客户端, rbuffer可以由数据包的头部确定,在内存池中申请空间

流程:

  1. 注册事件 和 对应的事件处理器
  2. 多路复用器等待事件到来
  3. 事件到来,激发事件分发器分发事件到对应的处理器
  4. 事件处理器处理事件,然后注册新的事件 (如服务器接收buffer 后 发送buffer)

水平触发和边沿触发:

水平触发:Level Triggered (LT)
只要有数据都会触发

默认为水平触发方式

ev.events = EPOLLIN;

边沿触发:Edge-Triggered(ET)
只有数据到来才触发,不管缓存区中是否还有数据

ev.events = EPOLLIN | EPOLLET;

api

  • epol_create: 创建fd
int epoll_create(int size);

创建一个epoll的句柄,size通常为1,当创建好epoll句柄后,它就是会占用一个fd

  • epoll_ctrl: epoll的事件注册函数
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_ctl向 epoll对象中添加、修改或者删除感兴趣的事件,返回0表示成功,否则返回–1,此时需要根据errno错误码判断错误类型
epfd : epoll_create()的返回值
op : 表示动作
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;

fd : 需要监听的fd

*event : 告诉内核需要监听什么事

struct epoll_event结构:

typedef union epoll_data {void *ptr;int fd;__uint32_t u32;__uint64_t u64;
} epoll_data_t;struct epoll_event {__uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */
};
  • epoll_wait: 等待事件产生,类似与select调用
 int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

epfd: epoll的描述符
*event : events则是分配好的 epoll_event结构体数组,epoll将会把发生的事件复制到 events数组中
maxevnets : 表示本次可以返回的最大事件数目,通常 maxevents参数与预分配的events数组的大小是相等的。
timeout : 表示在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout为0,则表示 epoll_wait在 rdllist链表中为空,立刻返回,不会等待

code

reactor 模型:
1.事件驱动 EPOLLIN/ EPOLLOUT
2. 回调:
listenfd --> accept_cb
clientfd–>recv_cb
clientfd–>send_cb

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <netinet/in.h>#include <sys/epoll.h>
#include <sys/epoll.h>#include <errno.h>
#include <fcntl.h>
#include <pthread.h>#define BUFFER_LENGTH 1024
#define EVENT_LENGTH 1024//回调函数
typedef int (*ZVCALLBACK) (int fd, int events, void *arg);// 定义连接客户端所对应的fd, 每个客户端对应一个rbuffer和一个写buffer
typedef struct zv_connect_s {int fd;ZVCALLBACK cb;char rbuffer[BUFFER_LENGTH];char wbuffer[BUFFER_LENGTH];int count;int rc;int wc;
}typedef struct zv_connblock_s {zv_connect_t *block;int blkcnt; //第几块struct zv_connblock_s *next;
}zv_connblock_t;//创建reactor
typedef struct zv_reactor_s {int epfd; // pool 所对应的fd//struct epoll_event_events[EVENT_LENGTH];zv_connblock_t *blockheader; // connblock的头指针
}zv_reactor_t;int zv_connect_block(zv_reactor * reactor) {if(!reactor) return NULL;// 尾插法zv_connblock _t *blk = reactor->blockheader;while(blk->next != NUll ) blk = blk->next;//malloc blockzv_connblock_t * connblock = malloc(sizeof(zv_connblock_t) + 1024 * sizeof(zv_connect_t));connblock->block = (zv_connect_t *)(reactor->blockheader +1);connblock->next = NULL;blk->next = connblock;reactor->blkcnt++;
}zv_connect_t *zv_connnet_idx(zv_reactor,int fd) {if(!reactor ) return NULL;int blkidx = fd / EVENT_LENGH; // 放到第几块while(blkidx >= reactor->blkcnt) {//malloc 新政加一块zv_connect_block(reactor);}int i = 0;zv_connblock_t *blk = reactor->blockheader;while(i++ <blkidx) {blk = blk->next; }return blk->block[fd % EVENT_LENGTH];
}// 初始化reactor
int zv_init_reactor(zv_reactor_t * reactor) {if(!reactor) return -1;// 分配connblock头节点空间 + block 所对应的空间(1024个block, 对应1024个用户)reactor->blockheader = malloc(sizeof(zv_connblock_t) + 1024 * sizeof(zv_connect_t));if(reactor->blockhead  == NULL) return -1;reactor->blockheader->block = (zc_connect_t *)reactor->blockhead + 1 ;  // (1 == sizeof(zv_connblock_t));reactor->blkcnt = 1;rector->blockhead->next = NULL;reactor->epfd = epoll_create(1);
}// 销毁reactor
void zv_dest_reactor(zv_reactor_t * reactor) {if(!reactor ) return -1;if(reactor->blockheader) free(reactor->blockheader);close(reactor->epfd);
}//申明这些事件处理器函数
int recv_cb(int fd, int events, void *arg);
int accept_cb(int fd, int events, void* arg);
int send_cb(int fd, int evnts, void* arg);int set_listener(zv_reactor_t * reactor, int fd , ZVCALLBACK cb) {if(!reactor || !reactor->blockheader))return -1;reactor->blockheader->block[fd].fd = fd;reactor->blockheader->block[fd].cb = cb;// epoll_event 时库函数定义,相当于一个袋子,装fdstruct epoll_event ev;ev.events = EPOLLIN | EPOLLET;ev.data.fd = fd;epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, clientfd, &ev);}// 发送数据
int send_cb(int fd, int events, void* arg) {zv_reactor_t *reactor = (zv_reactor_t *)arg;zv_connect_t * conn =zv_connetct_idx(reactor,fd);send(fd,conn->wbuffer,conn->wc,0);conn->cb = recv_cb;struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = fd;epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);
}int recv_cb(int fd, int events, void *arg) {zv_reactor_t *reactor = (zv_reactor_t *)arg;zv_connect_t * conn = zv_connetct_idx(reactor,fd);int ret = recv(fd, conn->rbuffer + conn->rc, conn->count, 0);if(ret < 0) {return -1;} else if (ret = 0) {//清空数据,置为不可用conn->fd = -1;conn->rc = 0;conn->wc = 0;//epoll_ctrl 移除epoll_ctrl(reactor->epfd,EPOLL_CTL_DEL,fd,NULL);close(fd);return -1;}conn->rc +=ret;//printf("rbuffer:%s\n ret:%d\n ",conn->rbuffer,conn->rc);memcpy(conn->wbuffer,conn->rbuffer,conn->rc);conn->wc = conn->rc;// 读完之后写conn->cb = send_cb;struct epoll_event ev;ev.events = EPOLLOUT;ev.data.fd = fd;epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);}//处理新的连接, 将客户端的fd设置到block所对应的位置,同时,加入epoll,调用实在main 函数 while循环中的conn->cb进行调用// 第一次调用通过set_lisenten(accept_cb)进行调用
int accept_cb(int fd, int events, void* arg) {struct sockaddr_in clientaddr;memset(&clientaddr, 0, sizeof(clientaddr));socklen_t cli_len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr *)&clientaddr, &cli_len);if (clientfd  < 0) return -1;printf("clientfd : %d\n",clientfd);// 找到fd, conn用来存储信息zv_reactor_t *reactor = (zv_reactor_t *)arg;zv_connect_t * conn = zv_connetct_idx(reactor,fd);conn->fd = clientfd;conn->cb = recv_cb; //clientfd 可读进行回调conn->count = BUFFER_LENGTH;//注册读事件处理器struct epoll_event ev;ev.events = EPOLLIN | EPOLLET;ev.data.fd = clientfd;epoll_ctl(reactor->epfd,EPOLL_CTL_ADD, clientfd, &ev)return clientfd;
}// 初始化server
int init_server(short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serv_addr;int port = atoi(argv[1]);//确定服务端协议地址簇memset(&serv_addr, 0, sizeof(serv_addr));serv_addr.sin_family = AF_INET;serv_addr.sin_addr.s_addr = INADDR_ANY;serv_addr.sin_port = htons(port);//进行绑定if (-1 == bind(sockfd, (SA*)&serv_addr, sizeof(serv_addr))) {fprintf(stderr, "bind error");return 2;}if (-1 == listen(sockfd, 5)) {fprintf(stderr, "listen error");return 3;}return sockfd;
}
int main(int argc, char* argv[]) {if (argc != 2) {return -1;}int port = atoi(argv[1]);int sockfd = init_server(port);zv_reactor_t reactor;zv_init_reactor(&reactor);  // 初始化epoll//为sockfd绑定事件set_listener(&reactor,sockfd,accept_cb);struct epoll_event events[1024] = {0};while(1) {int nready = epoll_wait(reactor.epfd, events, 512, -1);int i = 0;//循环分发所有的IO事件给处理器for (i = 0; i < nready; ++i) {// 通过events中的fd 设置reactor 中的fd// events中的fd 通过 sockfd 设置int connfd = events[i].data.fd;zv_connect_t *conn = zv_connect_idx(&reactor,connfd);//有事件,调用对应的回调函数if (events[i].events & EPOLLIN) {// 通过reactor头指针和fd 定位到资源所在的位置// cb为函数指针,cb调用,进行回调conn->cb(ev->fd,events[i].events,&reactor);}if (events[i].events & EPOLLOUT) {conn->cb(ev->fd,events[i].events,&reactor);}}}return 0;
}

http://www.ppmy.cn/news/1450533.html

相关文章

unity制作app(3)--gps定位

1.unity中定位Unity之GPS定位&#xff08;高德解析&#xff09;_unity gps定位-CSDN博客 代码需要稍微修改一下&#xff0c;先把脚本绑到一个button上试一试&#xff01; 2.先去高德地图认证&#xff08;app定位&#xff09; 创建应用和 Key-Web服务 API | 高德地图API (ama…

预训练模型介绍

一、什么是GPT GPT 是由人工智能研究实验室 OpenAI 在2022年11月30日发布的全新聊天机器人模型, 一款人工智能技术驱动的自然语言处理工具 它能够通过学习和理解人类的语言来进行对话, 还能根据聊天的上下文进行互动,能完成撰写邮件、视频脚本、文案、翻译、代码等任务 二、 为…

Ubuntu Kylin创建桌面快捷方式【以Pycharm为例】

安装Pycharm专业版后想创建桌面快捷方式以方便访问而不用去调用命令访问&#xff08;安装Pycharm专业版教程请访问笔者的另一篇文章&#xff1a;http://t.csdnimg.cn/bRVHw&#xff09; 以Ubuntu Kylin 22.04系统为例&#xff0c;在此附上优麒麟官方新闻分享&#xff1a;干货分…

EMP.DLL是什么东西?游戏提示EMP.DLL文件缺失怎么解决

emp.dll文件是Windows操作系统中的一种动态链接库文件&#xff0c;它被设计为可以被多个程序共享使用的模块化文件。这种设计旨在提高系统效率&#xff0c;减少内存消耗&#xff0c;并简化软件的维护和更新。DLL文件通常包含了一系列相关的函数和变量&#xff0c;这些函数和变量…

docker 获取离线镜像包

docker 获取离线镜像包 1、问题背景2、问题分析 3、解决方法 1、问题背景 在内网服务器上因为不能访问互联网&#xff0c;不能使用docker pull命令拉取镜像包&#xff0c;怎么创建docker容器呢&#xff1f; 2、问题分析 在docker hub官网上没有提供下载镜像包的功能&#xf…

什么是网络安全等级保护测评(等保测评)?

什么是网络安全等级保护测评&#xff08;等保测评&#xff09;呢&#xff1f;今天永恒无限就为大家介绍下网络安全等级保护测评&#xff08;等保测评&#xff09; 网络安全等级保护测评&#xff08;等保测评&#xff09;是指对信息和信息系统按照重要性等级进行的保护测评。它…

深度学习的核心数学知识点

深度学习的数学知识点包括但不限于以下几个方面&#xff1a; 线性代数&#xff1a; 标量、向量、矩阵和张量&#xff1a;这些是线性代数的基础元素。标量是一个单独的数&#xff0c;向量是有序的数字列表&#xff0c;矩阵是二维数字网格&#xff0c;而张量则是更高维度的数据容…

【python】调整图像大小_自定义调整、等高宽调整

【python】调整图像大小_自定义调整、等高宽调整 【先赞后看养成习惯】求点赞+关注+收藏😀 文章目录 【python】调整图像大小_自定义调整、等高宽调整1、安装 Pillow 库:2、加载图像:3、使用 resize 方法调整图像大小:4、保持图像宽高比:5、保存调整大小后的图像:在Py…