Linux网络:多路转接 epoll

news/2024/12/20 9:32:30/

Linux网络:多路转接 epoll

    • 系统调用
      • epoll_create
      • epoll_ctl
      • epoll_wait
    • echo server
      • SelectServer类
      • 构造函数
      • 事件循环
      • 事件派发
      • 事件处理
      • 测试
    • 原理
    • 模式
      • Level Trigger
      • Edge Trigger


多路转接是非常高效的一种IO模型,它可以在同一时间等待多个套接字,从而提高效率。Linux提供了三种系统调用实现多路转接:selectpollepoll。其中select可见博客:[Linux网络:多路转接 select],本博客讲解epoll

epoll是经过改进的poll,在Linux 2.5.44版本引入内核,并认为是Linux2.6最好的多路转接实现方案。


系统调用

epoll_create

epoll_create用于创建一个epoll模型,需要头文件<sys/epoll.h>,函数原型如下:

int epoll_create(int size);

此处的参数size已经被废弃,可以填入大于0的任何值。

返回值是一个文件描述符,通过这个文件描述符,可以操控Linux底层创建的epoll


epoll_ctl

epoll_ctl用于控制epoll模型,需要头文件<sys/epoll.h>,函数原型如下:

int epoll_ctl(int epfd, int op, int fd,struct epoll_event *_Nullable event);

参数:

  • epfd:通过epoll_create获取到的文件描述符
  • op:本次执行的操作,传入宏:
    • EPOLL_CTL_ADD:新增一个文件描述符到epoll
    • EPOLL_CTL_MOD:修改一个epoll中的文件描述符
    • EPOLL_CTL_DEL:从epoll中删除一个文件描述符
  • fd:要监听的文件的文件描述符
  • event:对文件要执行的监听类型

其中event的类型是struct epoll_event*,该结构体定义如下:

struct epoll_event {uint32_t      events;  /* Epoll events */epoll_data_t  data;    /* User data variable */
};union epoll_data {void     *ptr;int       fd;uint32_t  u32;uint64_t  u64;
};

这个结构体中,包含eventsdata两个字段:

  • events:一个位图,存储要监听的事件以及一些其它配置
    • EPOLLIN:监听读事件
    • EPOLLOUT:监听写事件
    • EPOLLERR:监听错误事件
    • EPOLLHUP:文件描述符被关闭
    • EPOLLONESHOT:只监听一次事件,本次监听完毕,文件描述符被从epoll中移除
  • data:当epoll返回时,携带的数据

此处的data是一个联合体,它可以存储四种类型的数据:ptr指针,int文件描述符,uint32_tuint64_t的整型。

当一个epoll返回已经就绪的文件时,用户其实无法得知这个文件的描述符,那么就可以通过这个data.fd获取到文件描述符,当然也可以通过其它的参数,传递更复杂的信息。


epoll_wait

epoll_wait用于等待epoll模型中的文件就绪,需要头文件<sys/epoll.h>,函数原型如下:

int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

参数:

  • epfd:通过epoll_create获取到的文件描述符
  • events:输出型参数,指向一个epoll_event数组,获取之前通过epoll_ctl传入的events
  • maxevents:用户传入的events数组的最大长度
  • timeout:超时时间,以ms为单位

此处用户要传入一个epoll_event数组,这个数组用于存储本次就绪的所有文件的epoll_event,为了防止越界,所以还要传入maxevents

就是说,epoll的使用方式是通过epoll_wait获取就绪的文件,这些文件存储到epoll_event数组中。函数返回,用户可以遍历数组,获取到所有就绪的文件的epoll_event结构体。这个结构体是在epoll_ctl时传入的,从它的events字段可以得知这个文件监听的事件,从data字段可以获取之前预设的其他信息,一般会预设data.fd获取这个文件的描述符。

返回值:

  • 0:超时,指定时间内没有文件就绪
  • <0:出现错误
  • >0:就绪的文件的个数

通过此处,已经可以看出epoll相比于select的优势了:

epoll返回时,把已经就绪的文件放到数组中,后续遍历数组,每一个元素都是已经就绪的文件

select中,就绪的事件通过一张位图返回,用户需要遍历整个位图所有元素,并判断该元素是否就绪,那么就会浪费大量的时间在未就绪的文件上。

epoll返回时,不会把已经加入epoll的文件删除,而是继续监听该文件

这是另一大优势,在select中,每次返回都会重置用户传入的位图,因此用户在每次轮询都要重新把文件描述符设置到select

当然,用户也可以在epoll_ctl的时候,设置EPOLLONESHOT,那么这个文件被epoll返回后,就会从epoll中删除,也就是只监听一次事件。


echo server

接下来使用epoll系统调用,实现一个简单的echo server

总代码地址:[多路转接Epoll-EchoServer ]

SelectServer类

首先是一个错误类型的枚举,用于在遇到错误时进行简单的报错。

enum
{SOCKET_ERROR = 1, // 套接字错误BIND_ERROR,       // 绑定错误LISTEN_ERROR,     // 监听连接错误EPOLL_ERROR      // 多路转接错误
};

EpollServer类结构如下:


class EpollServer
{const static int MAX_SZ = 64;
public:EpollServer(int16_t port);void start();private:void handelEvents(int sz);void acceptClient();void serviceIO(int sockfd);private:int _listenfd;int _epollfd;struct epoll_event _events[MAX_SZ];
};

EpollServer类中,包含三个成员变量:

  • _listenfdTCP的监听套接字的文件描述符,用于监听到来的客户端连接
  • _epollfdepoll_create返回的文件描述符,用户控制epoll
  • _events:用于接收epoll_wait返回的就绪文件的数组

除此之外,还维护了一个常量MAX_SZ ,这是_events的最大长度,表示每次epoll_wait最多处理的文件数目。

构造函数:

EpollServer(int16_t port); // 构造函数

构造函数接收一个端口号,表示这个服务监听的端口。

开启服务:

void start() // 开启网络服务

这个函数用于进行死循环,每一轮循环进行一次epoll_wait的调用,获取本次就绪的文件。

事件派发:

void handelEvents(int sz); // 派发事件

这个函数用于进行事件派发,接受一个sz表示这次epoll_wait就绪的文件数目。随后在handelEvents内部就可以知道本次_events数组内有几个事件就绪了,并根据文件描述符的类型,进行不同类型的业务处理。

事件处理:

void acceptClient(); // 接收客户端连接void serviceIO(int sockfd); // 处理客户端数据

套接字包含两种类型:listensockfd用于接收客户端连接,以及一般的客户端套接字sockfd用于完成echo server,这需要两个不同的函数进行处理。

acceptClient用于接收客户端连接,而serviceIO用于与客户端通信,serviceIO接受一个参数sockfd,表示与客户端通信的套接字。


构造函数

构造函数代码如下:

EpollServer(int16_t port)
{// 创建套接字_listenfd = socket(AF_INET, SOCK_STREAM, 0);if (_listenfd < 0){std::cerr << "socket error!" << std::endl;exit(SOCKET_ERROR);}struct sockaddr_in addr;bzero(&addr, sizeof(addr));addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = INADDR_ANY;// 绑定套接字int n = bind(_listenfd, (sockaddr*)&addr, sizeof(addr));if (n < 0){std::cerr << "bind error!" << std::endl;exit(BIND_ERROR);}// 监听套接字n = listen(_listenfd, 16);if (n < 0){std::cerr << "listen error!" << std::endl;exit(LISTEN_ERROR);}// 创建epoll_epollfd  = epoll_create(1);if (_epollfd < 0){std::cerr << "epoll error!" << std::endl;exit(EPOLL_ERROR);}// 初始化epollepoll_event event;event.events = EPOLLIN;event.data.fd = _listenfd;epoll_ctl(_epollfd, EPOLL_CTL_ADD, _listenfd, &event);
}

首选是Linux中基本的创建TCP服务的流程:创建套接字绑定套接字监听套接字

当创建完TCP服务后,开始进行epoll的处理,首先通过epoll_create创建一个epoll,返回值是一个文件描述符epollfd,后续使用这个描述符操控epoll

随后对epoll进行初始化,把_listenfd添加到epoll中,这个过程代码需要解析一下:

epoll_event event;         // 创建一个事件结构体
event.events = EPOLLIN;    // 监听读事件
event.data.fd = _listenfd; // data字段存储该套接字的文件描述符// EPOLL_CTL_ADD 表示添加,把 _listenfd 以及 event 添加到 _epollfd 管理的 epoll 中
epoll_ctl(_epollfd, EPOLL_CTL_ADD, _listenfd, &event);

后续所有的epoll的操作都和以上代码类似。


事件循环

事件循环代码如下:

void start()
{while (true){int n = epoll_wait(_epollfd, _events, MAX_SZ, 1000);switch(n){case 0:std::clog << "epoll timeout..." << std::endl;break;case -1:std::cerr << "epoll error!" << std::endl;exit(EPOLL_ERROR);default:handelEvents(n);}}
}

开启循环后,进入一个while(true)死循环,每一轮循环通过epoll_wait获取本轮循环就绪的文件:

// 把本次就绪的文件对应的 event 结构体存到 _events 数组中,最多存储 MAX_SZ 个
int n = epoll_wait(_epollfd, _events, MAX_SZ, 1000);
// 超时时间不超过 1000ms 也就是 1s

随后依据返回值n进行不同处理,如果返回值为0表示超时,返回值为-1表示发生错误。>0表示正常,n为本次就绪的文件的个数,把它传给handelEvents(n)处理。


事件派发

事件派发就是判断文件描述符是_listenfd还是普通的sockfd,调用不同的函数进行处理。

void handelEvents(int sz)
{for (int i = 0; i < sz; i++){if (_events[i].data.fd == _listenfd)acceptClient();elseserviceIO(_events[i].data.fd);}
}

遍历整个_events数组,这个数组中前sz个元素是有效的,所以循环sz次。

随后进行判断,如果是_listenfd就执行acceptClient,反之执行serviceIO。如果处理的是普通的客户端连接,那么要把该客户端套接字对应的文件描述符传进serviceIO中,这已经保存在data.fd中。


事件处理

  • 处理listenfd
void acceptClient()
{// 接收连接struct sockaddr_in peer;bzero(&peer, sizeof(peer));socklen_t len;int clientfd = accept(_listenfd, (sockaddr*)&peer, &len);if (clientfd < 0){std::clog << "accept error!" << std::endl;return;}// 新连接添加到 epoll 中进行监听struct epoll_event event;event.events = EPOLLIN;event.data.fd = clientfd;epoll_ctl(_epollfd, EPOLL_CTL_ADD, clientfd, &event);
}

先通过accept接收这个连接,拿到这个连接对应的套接字的描述符clientfd

随后把这个新的连接添加到epoll中,进行后续的监听,此处的逻辑和构造函数中的是一模一样的,就是把这个描述符以及一个事件结构体event一起通过epoll_ctl添加到epoll中,此处event.data依然存储文件描述符fd,监听的事件类型为EPOLLIN

接收请求:

void acceptClient()
{// 接收连接struct sockaddr_in peer;    // 预设客户端的地址结构体bzero(&peer, sizeof(peer));socklen_t len;int clientfd = accept(_listenfd, (sockaddr*)&peer, &len); // 接收客户端连接if (clientfd < 0){std::cerr << "accept error!" << std::endl;exit(ACCEPT_ERROR);}// 把套接字插入 _sockfds 数组中
}
  • 处理客户端echo server
void serviceIO(int sockfd)
{// 接收数据char buffer[1024];int n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);// 处理数据if (n > 0){buffer[n] = '\0';std::cout << "message: " << buffer << std::endl;std::string ret = "echo: " + (std::string)buffer;send(sockfd, ret.c_str(), ret.size(), 0);}else if (n == 0){std::clog << sockfd << " exit..." << std::endl;epoll_ctl(_epollfd, EPOLL_CTL_DEL, sockfd, nullptr);close(sockfd);}else{std::cerr << sockfd << " error..." << std::endl;epoll_ctl(_epollfd, EPOLL_CTL_DEL, sockfd, nullptr);close(sockfd);}
}

首先通过recv接收来自客户端的数据,最后进行数据的处理。

如果n > 0,那么说明收到的数据,直接把数据通过send返回客户端。

如果n == 0,说明客户端发起了关闭连接的请求,或者n < 0说明发生异常。这两种情况下通过close关闭连接,并通过epoll_ctl把这个套接字移出epoll。移除时通过EPOLL_CTL_DEL,并且event这个参数设置为空指针,因为删除无需进行event的配置


测试

最后通过一个main函数启动这个EpollServer

#include <iostream>#include "EpollServer.hpp"int main(int argc, char* argv[])
{if (argc != 2){std::cout << "Usage: " << argv[0] << " port" << std::endl;exit(-1);}uint16_t port = std::stoi(argv[1]);EpollServer svr(port);svr.start(); return 0;
}

运行效果:

在这里插入图片描述

左侧是EpolltServer,右侧是telnet客户端。起初没有数据到来,一直触发timeout,当telnet发起连接,此时触发listenfd的事件,5 add to epoll表示新的连接建立成功,并被epoll开始监听了。

随后用户发送helloworld都正常得到了响应,message: hello表示成功处理了客户端请求。


原理

Linux 2.6.26原码中,epoll通过一个eventpoll结构体管理,定义如下:

struct eventpoll {// 保护 eventpoll 结构体的自旋锁spinlock_t lock;  // 保护整个 epoll 实例的互斥锁struct mutex mtx; // 供 epoll_wait() 使用的等待队列wait_queue_head_t wq; // 供 file->poll() 使用的等待队列wait_queue_head_t poll_wait;// 存储已就绪文件描述符的链表struct list_head rdllist; // 存储被监视的文件描述符的红黑树struct rb_root rbr; // 临时存储新事件的单链表struct epitem *ovflist; 
};

这个地方还是有点复杂的,内部包含了两把锁,两个链表,两个队列,以及一颗红黑树。

  • rbrrdllist

先介绍红黑树rbr与就绪链表rdllist,这两个数据结构,每个节点都是结构体epitem

struct epitem {struct rb_node rbn;struct list_head rdllink;struct epitem *next;struct epoll_filefd ffd;int nwait;struct list_head pwqlist;struct eventpoll *ep;struct list_head fllink;struct epoll_event event;
};

这个结构体描述了一个被监听的文件,比如ffd描述了这个文件的相关信息,event则是epoll_ctl的时候传入的event,描述了这个文件所监听的事件。

除此之外,rbn是红黑树节点,rdllink是就绪链表节点,说明这个epitem可以连接到红黑树rbr中,也可以连接到就绪链表rdllist中。

在这里插入图片描述

那么这个红黑树和就绪链表有啥用?

所有被加入epoll 文件,都会进入红黑树rbr进行统一管理

此处使用红黑树,是因为它搜索的复杂度为logN,算是比较高效,相比于哈希表占用的内存更少。当一个网络报文到达,epoll就可以快速到红黑树内部进行搜索,看看这个报文对应的套接字的文件在不在epoll中。

当一个被监听的事件触发,内核就会把红黑树中对应的节点,连接到就绪链表rdllink

刚才说到,如果网络报文到达,epoll会去红黑树搜索,查看对应的文件信息ffd是否匹配,以及监听的事件event是否匹配,如果都匹配,那么说明epoll监听的某个文件触发了,就会把这个epitem连接到rdlink中。

用户调用epoll_wait的时候,直接访问rdlink就可以拿到所有已经就绪的事件,非常高效

  • wq

这是一个等待队列,当用户调用 epoll_wait 且没有就绪事件时,调用线程会被添加到这个队列中,并且进入睡眠状态。

rdlink就绪链表中有新的事件就绪了,会唤醒等待队列中的所有线程,让它们去读取rdlink中已经就绪的事件。

那么问题来了,被唤醒的多个线程同时去访问一个就绪链表,这不就出现了线程安全问题吗?因此需要引入一把锁,来完成互斥。

  • lock

这是一把自旋锁,当线程访问eventpoll结构体之前,都要去争用这把锁。只有持有这把锁的线程才能访问eventpoll,进而访问到就绪链表rdlink

当用户调用epoll_wait的时候,线程会去争用lock这把锁。随后先读取rdlink查看是否有事件就绪,如果没有事件就绪,就去wq中进行等待,并释放locl这把锁,直到自己超时,或者再次被唤醒。

如果线程在超时之前被唤醒,那么就说明rdlink有事件就绪了,此时所有唤醒的线程同时争用一把lock锁,争到锁的线程才能拿到已经就绪的事件。

当一个线程正在读取rdlink,此时它持有lock这把锁,刚好内核中有新的事件来了,那么又有问题来了:内核应该直接把新事件添加到rdlink吗?内核也要争用lock这把锁吗?

这个地方,Linux做了特殊处理,如果用户正在读取rdlink,内核同时给这个rdlink添加新的数据,那么就会再次造成线程安全问题。

  • ovflist

这是一个单链表,用于临时存放已经就绪的事件。当用户正在访问rdlink的时候,又有新的事件就绪,如果内核去争用lock这把锁,就会导致内核效率降低。为此,内核在有用户访问rdlink的时候,把新的事件添加到ovflist中,当rdlink空闲了,再把ovflist的数据转移到rdlink。这样内核就可以在不争用lock锁的情况下,快速把已经就绪的事件写入。

  • mtx

这是一把互斥锁,它的用途与lock不太相同,mtx主要任务是对文件资源进行管理。比如用户调用poll_ctlepoll中添加或者删除文件的时候,内核需要持有mtx这把锁,防止其它线程同时来修改epoll导致错误。

至此,已经讲解完了绝大部分eventpoll的内容,简单总结如下:

  1. lock (自旋锁):保护 eventpoll 结构体,特别是 rdllist。当内核需要操作rdllist链表时,需要获取这个锁,主要用于同一个进程内的并发控制
  2. mtx (互斥锁):当一个线程试图修改 epoll ,如通过 epoll_ctl 添加或删除文件描述符,需要持有这个锁,主要用于防止同一进程的不同线程同时修改 epoll 实例
  3. wq (等待队列):供 epoll_wait 使用,当一个线程调用 epoll_wait 且没有就绪事件时,该线程会将自己添加到这个等待队列中,,然后进入睡眠状态,直到有事件发生或超时
  4. rdllist (就绪链表):存储已经就绪的文件,当一个被监视的文件描述符上发生对应的事件时,相应的 epitem 会被添加到这个链表中,当进程调用 epoll_wait 时,内核会将这个链表中的事件复制到用户空间
  5. rbr (红黑树):存储所有被监视的文件描述符,每个被监视的文件描述符对应一个 epitem,这些 epitem 按照文件描述符的值排序存储在红黑树中,方便快速地查找和操作被监视的文件描述符。
  6. ovflist (单链表):在向用户空间传输就绪事件的过程中,临时存储新发生的事件。这是为了支持内核不持有 lock情况下写入就绪事件,从而提高并发性能。

模式

思考一个问题:如果用户通过epoll检测到某个socket的事件已经就绪了,但是这个用户没有处理这个事情,下一次epoll_wait还要不要返回这一个事件?

就是基于这个问题,衍生了两种epoll工作模式:LT模式与ET模式

Level Trigger

LT模式下,当用户没有处理事件,那么事件就一直保留在就绪链表rdlink中,每次调用epoll_wait都会返回这个事件

这种模式是epoll的默认模式

用户接收到事件后,可能某个报文太长了,一次读不完。那么LT模式下一次还会进行通知,用户可以把剩下的报文读完。但是这就可能导致一个报文,需要调用更多次的epoll_wait


Edge Trigger

ET模式下,当用户通过epoll_wait拿到事件后,事件直接从rdlink中删除,下一次不再进行通知

这种模式比LT更加高效,这可以从两个角度解读:

  1. 这种模式下,一个报文只需要调用一次epolll_wait,因此效率高一点
  2. 这倒逼程序员必须一次性把报文读完,那么就会更快的进行业务处理,报文响应速度也更快

这里主要是第二点比较重要,当一个报文太长了,但是ET模式下只进行一次通知。那么程序员收到通知后,就需要用一个while循环一直读取套接字,直到读不出数据为止。这样一次通知程序员就能拿到完整报文,进而更早的进行业务处理,更早响应。而且提早把数据读走,内核的缓冲区也会被空出来,接收更多的新数据。

默认情况下,从文件读取文件是阻塞的,当最后一次while循环读取不出内容了,程序就会阻塞住。因此这种情况下,要把文件读取改为非阻塞读取,如果读不出内容直接返回

但是这也导致ET的程序会比LT更加复杂,实际开发中需要进行权衡。



http://www.ppmy.cn/news/1556619.html

相关文章

大模型与呼叫中心结合的呼出机器人系统

大模型与呼叫中心结合的呼出机器人系统 原作者&#xff1a;开源呼叫中心FreeIPCC&#xff0c;其Github&#xff1a;https://github.com/lihaiya/freeipcc 随着人工智能技术的发展&#xff0c;特别是大模型&#xff08;large language models, LLMs&#xff09;的进步&#xf…

自动化立体仓库堆垛机SRM控制系统提升控制功能块开发设计

1、堆垛机SRM控制系统硬件组态如下图 提升控制G120变频器,通信报文111 G120变频器配置调试 2、堆垛机SRM控制系统HMI屏幕页面如下图 运行、起升、货叉相关参数设定 3、堆垛机SRM控制系统中相关变量定义如下图 其中包含起升控制相关变量:提升条码位置反馈、提升条码速度反馈…

国标GB28181网页直播平台EasyGBS:网络摄像机中的音频及音频编码技术解析

在网络摄像机领域&#xff0c;音频质量及其编码方式对于视频监控系统的整体性能至关重要。音频作为视频监控系统的重要组成部分&#xff0c;不仅能够提供现场的声音信息&#xff0c;增强监控的实时性和准确性&#xff0c;还能在事件发生后为调查提供宝贵的语音证据。 一、网络摄…

Kubernetes中subPath

在Kubernetes&#xff08;K8s&#xff09;中&#xff0c;当容器关闭时&#xff0c;其使用的subPath目录的清理行为取决于几个因素。首先&#xff0c;subPath允许Pod中的容器挂载共享卷的一个子目录&#xff0c;而不是整个卷。这种机制有助于避免不同容器间的文件冲突。 容器关闭…

澳门某客户:通过HAP平台整合18个系统,节省20%仓储成本

案例摘要 中国澳门某客户面临系统分散、数据整合难、仓储管理复杂及跨地区文件管理低效等挑战&#xff0c;Nocoly的伙伴明日数据科技有限公司利用HAP平台将18个系统成功整合&#xff0c;实现了数据统一管理&#xff0c;降低了约20%的仓储成本&#xff0c;同时文件审批流程得到…

[python]使用flask-caching缓存数据

简介 Flask-Caching 是 Flask 的一个扩展&#xff0c;为任何 Flask 应用程序添加了对各种后端的缓存支持。它基于 cachelib 运行&#xff0c;并通过统一的 API 支持 werkzeug 的所有原始缓存后端。开发者还可以通过继承 flask_caching.backends.base.BaseCache 类来开发自己的…

卡尔曼网络 针对部分已知动力学的神经网络辅助卡尔曼滤波法

KalmanNet: Neural Network Aided Kalman Filtering for Partially Known Dynamics 摘要 卡尔曼滤波器(Kalman Filter, KF)是在线性高斯系统中进行状态估计的主要工具,前提是系统动态已知。然而,当模型动态部分未知或存在误差时,KF 的性能可能会显著下降。在本文中,我们…

36.在 Vue 3 中使用 OpenLayers 上传包含 SHP 的 ZIP 文件并显示图形

在 Web 开发中&#xff0c;地图相关的应用需求越来越多。尤其是在地理信息系统&#xff08;GIS&#xff09;领域&#xff0c;如何在 Web 中展示地理数据、上传文件并显示图形成为了开发中的一个常见需求。今天&#xff0c;我将为大家介绍如何使用 Vue 3 和 OpenLayers 来实现一…