reactor网络模型

devtools/2025/3/30 4:08:59/

一、介绍

1.为什么需要reactor网络模型

1.1 高并发支持

  • 非阻塞I/O:Reactor模型通过非阻塞I/O操作,允许单线程处理多个连接,减少线程切换开销,提升并发能力。

  • 事件驱动:基于事件驱动机制,系统只在有事件发生时处理,避免忙等待,提高资源利用率。

1.2 资源高效

  • 减少线程开销:传统多线程模型中,每个连接需要一个线程,Reactor模型通过少量线程处理多个连接,降低线程创建和上下文切换的开销。

  • 内存占用低:由于线程数减少,内存占用也相应降低,适合资源受限的环境。

1.3 可扩展性强

  • 模块化设计:Reactor模型将事件分发与处理逻辑分离,便于扩展和维护。

  • 支持多种I/O多路复用:如select、poll、epoll等,适应不同平台和需求。

1.4 响应迅速

  • 低延迟:事件驱动机制确保事件能快速响应,减少等待时间,提升系统响应速度。

1.5 简化编程

  • 统一事件处理:所有事件通过同一接口处理,简化编程模型,降低复杂度。

  • 避免竞态条件:单线程或少量线程处理事件,减少多线程环境下的竞态问题。

1.6 适用场景广泛

  • 高并发服务器:如Web服务器、游戏服务器等。

  • 实时系统:如即时通讯、在线交易等低延迟场景。

  • 分布式系统:如消息队列、RPC框架等。

2.实现思路

根据不同的io事件,调用不同的回调函数

io                                                     event                                                       callback

listenfd                                             EPOLLIN                                                 accept_cb

clientfd                                             EPOLLIN                                                 recv_cb

clientfd                                             EPOLLOUT                                              send_cb

二、代码

1.创建服务端

int initserver(int port){int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;memset(&servaddr, 0, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(port);if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))){perror("bind failed!");printf("bind failed!");}listen(sockfd, 10);return sockfd;}

对传入的端口进行绑定

2.创建epoll

创建一个io的一个结构体,定义io的连接数量

struct conn{int fd;char rbuffer[BUFFERLENGTH];int rbufferlength;char wbuffer[BUFFERLENGTH];int wbufferlength;CALLBACK send_callback;union {CALLBACK accept_callback;CALLBACK recv_callback;}r_action;
};struct conn conn_list[CONNECTIONSIZE] = {0};

创建epoll将服务端的fd加入到epoll中,并设置事件类型

epfd = epoll_create(1);conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.accept_callback = accept_cb;set_event(sockfd, EPOLLIN, 1);
int set_event(int fd, int event, int flag){struct epoll_event ev;ev.data.fd = fd;ev.events = event;if(flag){epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);}else{epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}}

3.进入mainloop,根据事件类型调用相应的回调函数

while(1){struct  epoll_event events[1024]={0};int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for(i = 0; i < nready; i++){int connfd = events[i].data.fd;printf("connfd:%d\n", connfd);//这个地方使用两个if而不是使用if-else if是因为该io可能同时存在EPOLLIN和EPOLLOUTif(events[i].events & EPOLLIN){conn_list[connfd].r_action.recv_callback(connfd);}if(events[i].events & EPOLLOUT){conn_list[connfd].send_callback(connfd);}}}

3.1 三个回调函数

1.accept_cb:返回监听到的clientfd
int accept_cb(int fd){struct sockaddr_in clientaddr;int len = sizeof(clientaddr);memset(&clientaddr, 0, sizeof(clientaddr));int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);if(clientfd < 0){printf("err clientf < 0");}event_register(clientfd, EPOLLIN);return clientfd;
}
2.recv_cb:接收相应客户端发送的数据
int recv_cb(int fd){memset(conn_list[fd].rbuffer, 0, sizeof(conn_list[fd].rbuffer));int count = recv(fd, conn_list[fd].rbuffer, sizeof(conn_list[fd].rbuffer), 0);if(count == 0){printf("client disconnect: %d\n", fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);close(fd);}else if(count < 0){printf("count:%d, errno:%s\n", count, strerror(errno));epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);close(fd);}memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, count);conn_list[fd].wbufferlength = count;printf("[%d]RECV: %s\n", count, conn_list[fd].rbuffer);set_event(fd, EPOLLOUT, 0);return count;
}
3.send_cb:发送数据
int send_cb(int fd){int count = 0;if(conn_list[fd].wbufferlength > 0){count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wbufferlength, 0);if(count <= 0) {printf("send failed: %s\n", strerror(errno));epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);close(fd);return -1;}conn_list[fd].wbufferlength = 0; // 清空发送缓冲区长度}set_event(fd, EPOLLIN, 0);return count;
}

4.完整代码

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/epoll.h>
#include <errno.h>#define BUFFERLENGTH 1024
#define CONNECTIONSIZE 1024typedef int (*CALLBACK)(int fd);int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);int epfd = 0;struct conn{int fd;char rbuffer[BUFFERLENGTH];int rbufferlength;char wbuffer[BUFFERLENGTH];int wbufferlength;CALLBACK send_callback;union {CALLBACK accept_callback;CALLBACK recv_callback;}r_action;
};struct conn conn_list[CONNECTIONSIZE] = {0};int set_event(int fd, int event, int flag){struct epoll_event ev;ev.data.fd = fd;ev.events = event;if(flag){epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);}else{epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}}int event_register(int fd, int event){   //将事件传入到conn_list中if(fd < 0){return -1;}conn_list[fd].fd = fd;conn_list[fd].r_action.accept_callback = recv_cb;conn_list[fd].send_callback = send_cb; memset(conn_list[fd].rbuffer, 0, BUFFERLENGTH);conn_list[fd].rbufferlength = 0;memset(conn_list[fd].wbuffer, 0, BUFFERLENGTH);conn_list[fd].wbufferlength = 0;set_event(fd, event, 1);
}int accept_cb(int fd){struct sockaddr_in clientaddr;int len = sizeof(clientaddr);memset(&clientaddr, 0, sizeof(clientaddr));int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);if(clientfd < 0){printf("err clientf < 0");}event_register(clientfd, EPOLLIN);return clientfd;
}int recv_cb(int fd){memset(conn_list[fd].rbuffer, 0, sizeof(conn_list[fd].rbuffer));int count = recv(fd, conn_list[fd].rbuffer, sizeof(conn_list[fd].rbuffer), 0);if(count == 0){printf("client disconnect: %d\n", fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);close(fd);}else if(count < 0){printf("count:%d, errno:%s\n", count, strerror(errno));epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);close(fd);}memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, count);conn_list[fd].wbufferlength = count;printf("[%d]RECV: %s\n", count, conn_list[fd].rbuffer);set_event(fd, EPOLLOUT, 0);return count;
}int send_cb(int fd){int count = 0;if(conn_list[fd].wbufferlength > 0){count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wbufferlength, 0);if(count <= 0) {printf("send failed: %s\n", strerror(errno));epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);close(fd);return -1;}conn_list[fd].wbufferlength = 0; // 清空发送缓冲区长度}set_event(fd, EPOLLIN, 0);return count;
}int initserver(int port){int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;memset(&servaddr, 0, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(port);if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))){perror("bind failed!");printf("bind failed!");}listen(sockfd, 10);return sockfd;}int main(){unsigned short port = 2077;int sockfd = initserver(port);epfd = epoll_create(1);conn_list[sockfd].fd = sockfd;conn_list[sockfd].r_action.accept_callback = accept_cb;set_event(sockfd, EPOLLIN, 1);while(1){struct  epoll_event events[1024]={0};int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for(i = 0; i < nready; i++){int connfd = events[i].data.fd;printf("connfd:%d\n", connfd);//这个地方使用两个if而不是使用if-else if是因为该io可能同时存在EPOLLIN和EPOLLOUTif(events[i].events & EPOLLIN){conn_list[connfd].r_action.recv_callback(connfd);}if(events[i].events & EPOLLOUT){conn_list[connfd].send_callback(connfd);}}}
}

 分享一个学习链接,有学习需要的同学可以看一下:

https://xxetb.xetslk.com/s/3yNycZ


http://www.ppmy.cn/devtools/171479.html

相关文章

基于Vue.js的组件化开发技术与实践探索

一、引言 1.1 研究背景与意义 在当今数字化时代&#xff0c;互联网应用的快速发展使得前端开发面临着日益增长的挑战和机遇。随着用户对 Web 应用交互性和体验性要求的不断提高&#xff0c;传统的前端开发模式已难以满足复杂应用的需求。Vue.js 作为一种流行的 JavaScript 框…

VMware打开ubuntu正在使用中怎么解决

1.如图1所示&#xff0c;打开ubuntu&#xff0c;出现该虚拟机正在使用中的情况&#xff1b; 图1 2.如图2所示&#xff0c;找到ubuntu文件夹下.lck的文件夹&#xff0c;删除它们即可&#xff1b; 图2 3.如图3所示&#xff0c;打开虚拟机正常&#xff0c;可以启动。 图3

计算机网络基础:Windows 与 Linux 网络配置

计算机网络基础:Windows 与 Linux 网络配置 一、前言二、网络基础概念2.1 IP 地址与子网掩码2.2 网关与 DNS2.3 网络协议(TCP/IP, UDP, ICMP)2.4 网络接口与路由表三、Windows 网络配置3.1 图形界面配置3.1.1 配置 IP 地址3.1.2 配置 DNS3.1.3 配置网关3.2 命令行配置3.2.1 …

Redis设计与实现-底层实现

Redis底层实现 1、事件1.1 文件事件1.2 时间事件1.3 事件调度 2、Redis客户端2.1 客户端的相关属性2.2 客户端的创建与关闭2.2.1 普通客户端的创建2.2.2 普通客户端的关闭2.2.3 AOF的伪客户端2.2.4 Lua脚本的伪客户端 3、Redis服务端3.1 命令请求的执行过程3.1.1 客户端发送命令…

AI小白的第七天:必要的数学知识(概率)

概率 Probability 1. 概率的定义 概率是一个介于 0 和 1 之间的数&#xff0c;表示某个事件发生的可能性&#xff1a; 0&#xff1a;事件不可能发生。1&#xff1a;事件必然发生。0 到 1 之间&#xff1a;事件发生的可能性大小。 例如&#xff0c;掷一枚公平的硬币&#xf…

Redis常见阻塞原因总结

Redis常见阻塞原因总结 Redis 可能出现阻塞的情况主要包括以下几种原因&#xff0c;并针对不同的场景提供优化方案&#xff1a; 1. 慢查询阻塞 原因 执行耗时较长的命令&#xff0c;如 keys *、hgetall、smembers、flushall。查询的数据量过大&#xff0c;导致单个命令执行时…

HTML5 初探:新特性与本地存储的魔法

HTML5 初探&#xff1a;新特性与本地存储的魔法 作为一名前端新手&#xff0c;你可能听说过 HTML5 这个名词。它是 HTML 的第五代版本&#xff0c;不仅让网页变得更强大&#xff0c;还带来了许多新功能和工具。今天&#xff0c;我们就来聊聊 HTML5 的新特性&#xff0c;以及它…

Kotlin中 StateFlow 或 SharedFlow 的区别

StateFlow 和 SharedFlow 是 Kotlin 协程&#xff08;Coroutines&#xff09; 提供的两种 响应式数据流&#xff08;Reactive Streams&#xff09;&#xff0c;用于在应用程序中处理异步数据流&#xff0c;类似于 RxJava 的 Observable 或 Flowable&#xff0c;但更轻量且与 Ko…