Linux网络:多路转接 epoll
- 系统调用
- epoll_create
- epoll_ctl
- epoll_wait
- echo server
- SelectServer类
- 构造函数
- 事件循环
- 事件派发
- 事件处理
- 测试
- 原理
- 模式
- Level Trigger
- Edge Trigger
多路转接是非常高效的一种IO模型,它可以在同一时间等待多个套接字,从而提高效率。Linux
提供了三种系统调用实现多路转接:select
、poll
、epoll
。其中select
可见博客:[Linux网络:多路转接 select],本博客讲解epoll
。
epoll
是经过改进的poll
,在Linux 2.5.44
版本引入内核,并认为是Linux2.6
最好的多路转接实现方案。
系统调用
epoll_create
epoll_create
用于创建一个epoll
模型,需要头文件<sys/epoll.h>
,函数原型如下:
int epoll_create(int size);
此处的参数size
已经被废弃,可以填入大于0
的任何值。
返回值是一个文件描述符,通过这个文件描述符,可以操控Linux
底层创建的epoll
。
epoll_ctl
epoll_ctl
用于控制epoll
模型,需要头文件<sys/epoll.h>
,函数原型如下:
int epoll_ctl(int epfd, int op, int fd,struct epoll_event *_Nullable event);
参数:
epfd
:通过epoll_create
获取到的文件描述符op
:本次执行的操作,传入宏:EPOLL_CTL_ADD
:新增一个文件描述符到epoll
中EPOLL_CTL_MOD
:修改一个epoll
中的文件描述符EPOLL_CTL_DEL
:从epoll
中删除一个文件描述符
fd
:要监听的文件的文件描述符event
:对文件要执行的监听类型
其中event
的类型是struct epoll_event*
,该结构体定义如下:
struct epoll_event {uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */
};union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;
};
这个结构体中,包含events
和data
两个字段:
events
:一个位图,存储要监听的事件以及一些其它配置EPOLLIN
:监听读事件EPOLLOUT
:监听写事件EPOLLERR
:监听错误事件EPOLLHUP
:文件描述符被关闭EPOLLONESHOT
:只监听一次事件,本次监听完毕,文件描述符被从epoll
中移除
data
:当epoll
返回时,携带的数据
此处的data
是一个联合体,它可以存储四种类型的数据:ptr
指针,int
文件描述符,uint32_t
和uint64_t
的整型。
当一个epoll
返回已经就绪的文件时,用户其实无法得知这个文件的描述符,那么就可以通过这个data.fd
获取到文件描述符,当然也可以通过其它的参数,传递更复杂的信息。
epoll_wait
epoll_wait
用于等待epoll
模型中的文件就绪,需要头文件<sys/epoll.h>
,函数原型如下:
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
参数:
epfd
:通过epoll_create
获取到的文件描述符events
:输出型参数,指向一个epoll_event
数组,获取之前通过epoll_ctl
传入的events
maxevents
:用户传入的events
数组的最大长度timeout
:超时时间,以ms
为单位
此处用户要传入一个epoll_event
数组,这个数组用于存储本次就绪的所有文件的epoll_event
,为了防止越界,所以还要传入maxevents
。
就是说,epoll
的使用方式是通过epoll_wait
获取就绪的文件,这些文件存储到epoll_event
数组中。函数返回,用户可以遍历数组,获取到所有就绪的文件的epoll_event
结构体。这个结构体是在epoll_ctl
时传入的,从它的events
字段可以得知这个文件监听的事件,从data
字段可以获取之前预设的其他信息,一般会预设data.fd
获取这个文件的描述符。
返回值:
0
:超时,指定时间内没有文件就绪<0
:出现错误>0
:就绪的文件的个数
通过此处,已经可以看出epoll
相比于select
的优势了:
epoll
返回时,把已经就绪的文件放到数组中,后续遍历数组,每一个元素都是已经就绪的文件
在select
中,就绪的事件通过一张位图返回,用户需要遍历整个位图所有元素,并判断该元素是否就绪,那么就会浪费大量的时间在未就绪的文件上。
epoll
返回时,不会把已经加入epoll
的文件删除,而是继续监听该文件
这是另一大优势,在select
中,每次返回都会重置用户传入的位图,因此用户在每次轮询都要重新把文件描述符设置到select
。
当然,用户也可以在epoll_ctl
的时候,设置EPOLLONESHOT
,那么这个文件被epoll
返回后,就会从epoll
中删除,也就是只监听一次事件。
echo server
接下来使用epoll
系统调用,实现一个简单的echo server
。
总代码地址:[多路转接Epoll-EchoServer ]
SelectServer类
首先是一个错误类型的枚举,用于在遇到错误时进行简单的报错。
enum
{SOCKET_ERROR = 1, // 套接字错误BIND_ERROR, // 绑定错误LISTEN_ERROR, // 监听连接错误EPOLL_ERROR // 多路转接错误
};
EpollServer
类结构如下:
class EpollServer
{const static int MAX_SZ = 64;
public:EpollServer(int16_t port);void start();private:void handelEvents(int sz);void acceptClient();void serviceIO(int sockfd);private:int _listenfd;int _epollfd;struct epoll_event _events[MAX_SZ];
};
在EpollServer
类中,包含三个成员变量:
_listenfd
:TCP
的监听套接字的文件描述符,用于监听到来的客户端连接_epollfd
:epoll_create
返回的文件描述符,用户控制epoll
_events
:用于接收epoll_wait
返回的就绪文件的数组
除此之外,还维护了一个常量MAX_SZ
,这是_events
的最大长度,表示每次epoll_wait
最多处理的文件数目。
构造函数:
EpollServer(int16_t port); // 构造函数
构造函数接收一个端口号,表示这个服务监听的端口。
开启服务:
void start() // 开启网络服务
这个函数用于进行死循环,每一轮循环进行一次epoll_wait
的调用,获取本次就绪的文件。
事件派发:
void handelEvents(int sz); // 派发事件
这个函数用于进行事件派发,接受一个sz
表示这次epoll_wait
就绪的文件数目。随后在handelEvents
内部就可以知道本次_events
数组内有几个事件就绪了,并根据文件描述符的类型,进行不同类型的业务处理。
事件处理:
void acceptClient(); // 接收客户端连接void serviceIO(int sockfd); // 处理客户端数据
套接字包含两种类型:listensockfd
用于接收客户端连接,以及一般的客户端套接字sockfd
用于完成echo server
,这需要两个不同的函数进行处理。
acceptClient
用于接收客户端连接,而serviceIO
用于与客户端通信,serviceIO
接受一个参数sockfd
,表示与客户端通信的套接字。
构造函数
构造函数代码如下:
EpollServer(int16_t port)
{// 创建套接字_listenfd = socket(AF_INET, SOCK_STREAM, 0);if (_listenfd < 0){std::cerr << "socket error!" << std::endl;exit(SOCKET_ERROR);}struct sockaddr_in addr;bzero(&addr, sizeof(addr));addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = INADDR_ANY;// 绑定套接字int n = bind(_listenfd, (sockaddr*)&addr, sizeof(addr));if (n < 0){std::cerr << "bind error!" << std::endl;exit(BIND_ERROR);}// 监听套接字n = listen(_listenfd, 16);if (n < 0){std::cerr << "listen error!" << std::endl;exit(LISTEN_ERROR);}// 创建epoll_epollfd = epoll_create(1);if (_epollfd < 0){std::cerr << "epoll error!" << std::endl;exit(EPOLL_ERROR);}// 初始化epollepoll_event event;event.events = EPOLLIN;event.data.fd = _listenfd;epoll_ctl(_epollfd, EPOLL_CTL_ADD, _listenfd, &event);
}
首选是Linux
中基本的创建TCP
服务的流程:创建套接字
、绑定套接字
、监听套接字
。
当创建完TCP
服务后,开始进行epoll
的处理,首先通过epoll_create
创建一个epoll
,返回值是一个文件描述符epollfd
,后续使用这个描述符操控epoll
。
随后对epoll
进行初始化,把_listenfd
添加到epoll
中,这个过程代码需要解析一下:
epoll_event event; // 创建一个事件结构体
event.events = EPOLLIN; // 监听读事件
event.data.fd = _listenfd; // data字段存储该套接字的文件描述符// EPOLL_CTL_ADD 表示添加,把 _listenfd 以及 event 添加到 _epollfd 管理的 epoll 中
epoll_ctl(_epollfd, EPOLL_CTL_ADD, _listenfd, &event);
后续所有的epoll
的操作都和以上代码类似。
事件循环
事件循环代码如下:
void start()
{while (true){int n = epoll_wait(_epollfd, _events, MAX_SZ, 1000);switch(n){case 0:std::clog << "epoll timeout..." << std::endl;break;case -1:std::cerr << "epoll error!" << std::endl;exit(EPOLL_ERROR);default:handelEvents(n);}}
}
开启循环后,进入一个while(true)
死循环,每一轮循环通过epoll_wait
获取本轮循环就绪的文件:
// 把本次就绪的文件对应的 event 结构体存到 _events 数组中,最多存储 MAX_SZ 个
int n = epoll_wait(_epollfd, _events, MAX_SZ, 1000);
// 超时时间不超过 1000ms 也就是 1s
随后依据返回值n
进行不同处理,如果返回值为0
表示超时,返回值为-1
表示发生错误。>0
表示正常,n
为本次就绪的文件的个数,把它传给handelEvents(n)
处理。
事件派发
事件派发就是判断文件描述符是_listenfd
还是普通的sockfd
,调用不同的函数进行处理。
void handelEvents(int sz)
{for (int i = 0; i < sz; i++){if (_events[i].data.fd == _listenfd)acceptClient();elseserviceIO(_events[i].data.fd);}
}
遍历整个_events
数组,这个数组中前sz
个元素是有效的,所以循环sz
次。
随后进行判断,如果是_listenfd
就执行acceptClient
,反之执行serviceIO
。如果处理的是普通的客户端连接,那么要把该客户端套接字对应的文件描述符传进serviceIO
中,这已经保存在data.fd
中。
事件处理
- 处理
listenfd
:
void acceptClient()
{// 接收连接struct sockaddr_in peer;bzero(&peer, sizeof(peer));socklen_t len;int clientfd = accept(_listenfd, (sockaddr*)&peer, &len);if (clientfd < 0){std::clog << "accept error!" << std::endl;return;}// 新连接添加到 epoll 中进行监听struct epoll_event event;event.events = EPOLLIN;event.data.fd = clientfd;epoll_ctl(_epollfd, EPOLL_CTL_ADD, clientfd, &event);
}
先通过accept
接收这个连接,拿到这个连接对应的套接字的描述符clientfd
。
随后把这个新的连接添加到epoll
中,进行后续的监听,此处的逻辑和构造函数中的是一模一样的,就是把这个描述符以及一个事件结构体event
一起通过epoll_ctl
添加到epoll
中,此处event.data
依然存储文件描述符fd
,监听的事件类型为EPOLLIN
。
接收请求:
void acceptClient()
{// 接收连接struct sockaddr_in peer; // 预设客户端的地址结构体bzero(&peer, sizeof(peer));socklen_t len;int clientfd = accept(_listenfd, (sockaddr*)&peer, &len); // 接收客户端连接if (clientfd < 0){std::cerr << "accept error!" << std::endl;exit(ACCEPT_ERROR);}// 把套接字插入 _sockfds 数组中
}
- 处理客户端
echo server
:
void serviceIO(int sockfd)
{// 接收数据char buffer[1024];int n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);// 处理数据if (n > 0){buffer[n] = '\0';std::cout << "message: " << buffer << std::endl;std::string ret = "echo: " + (std::string)buffer;send(sockfd, ret.c_str(), ret.size(), 0);}else if (n == 0){std::clog << sockfd << " exit..." << std::endl;epoll_ctl(_epollfd, EPOLL_CTL_DEL, sockfd, nullptr);close(sockfd);}else{std::cerr << sockfd << " error..." << std::endl;epoll_ctl(_epollfd, EPOLL_CTL_DEL, sockfd, nullptr);close(sockfd);}
}
首先通过recv
接收来自客户端的数据,最后进行数据的处理。
如果n > 0
,那么说明收到的数据,直接把数据通过send
返回客户端。
如果n == 0
,说明客户端发起了关闭连接的请求,或者n < 0
说明发生异常。这两种情况下通过close
关闭连接,并通过epoll_ctl
把这个套接字移出epoll
。移除时通过EPOLL_CTL_DEL
,并且event
这个参数设置为空指针,因为删除无需进行event
的配置
测试
最后通过一个main
函数启动这个EpollServer
:
#include <iostream>#include "EpollServer.hpp"int main(int argc, char* argv[])
{if (argc != 2){std::cout << "Usage: " << argv[0] << " port" << std::endl;exit(-1);}uint16_t port = std::stoi(argv[1]);EpollServer svr(port);svr.start(); return 0;
}
运行效果:
左侧是EpolltServer
,右侧是telnet
客户端。起初没有数据到来,一直触发timeout
,当telnet
发起连接,此时触发listenfd
的事件,5 add to epoll
表示新的连接建立成功,并被epoll
开始监听了。
随后用户发送hello
、world
都正常得到了响应,message: hello
表示成功处理了客户端请求。
原理
在Linux 2.6.26
原码中,epoll
通过一个eventpoll
结构体管理,定义如下:
struct eventpoll {// 保护 eventpoll 结构体的自旋锁spinlock_t lock; // 保护整个 epoll 实例的互斥锁struct mutex mtx; // 供 epoll_wait() 使用的等待队列wait_queue_head_t wq; // 供 file->poll() 使用的等待队列wait_queue_head_t poll_wait;// 存储已就绪文件描述符的链表struct list_head rdllist; // 存储被监视的文件描述符的红黑树struct rb_root rbr; // 临时存储新事件的单链表struct epitem *ovflist;
};
这个地方还是有点复杂的,内部包含了两把锁,两个链表,两个队列,以及一颗红黑树。
rbr
与rdllist
:
先介绍红黑树rbr
与就绪链表rdllist
,这两个数据结构,每个节点都是结构体epitem
:
struct epitem {struct rb_node rbn;struct list_head rdllink;struct epitem *next;struct epoll_filefd ffd;int nwait;struct list_head pwqlist;struct eventpoll *ep;struct list_head fllink;struct epoll_event event;
};
这个结构体描述了一个被监听的文件,比如ffd
描述了这个文件的相关信息,event
则是epoll_ctl
的时候传入的event
,描述了这个文件所监听的事件。
除此之外,rbn
是红黑树节点,rdllink
是就绪链表节点,说明这个epitem
可以连接到红黑树rbr
中,也可以连接到就绪链表rdllist
中。
那么这个红黑树和就绪链表有啥用?
所有被加入
epoll
文件,都会进入红黑树rbr
进行统一管理
此处使用红黑树,是因为它搜索的复杂度为logN
,算是比较高效,相比于哈希表占用的内存更少。当一个网络报文到达,epoll
就可以快速到红黑树内部进行搜索,看看这个报文对应的套接字的文件在不在epoll
中。
当一个被监听的事件触发,内核就会把红黑树中对应的节点,连接到就绪链表
rdllink
中
刚才说到,如果网络报文到达,epoll
会去红黑树搜索,查看对应的文件信息ffd
是否匹配,以及监听的事件event
是否匹配,如果都匹配,那么说明epoll
监听的某个文件触发了,就会把这个epitem
连接到rdlink
中。
当用户调用epoll_wait
的时候,直接访问rdlink
就可以拿到所有已经就绪的事件,非常高效。
wq
:
这是一个等待队列,当用户调用 epoll_wait
且没有就绪事件时,调用线程会被添加到这个队列中,并且进入睡眠状态。
当rdlink
就绪链表中有新的事件就绪了,会唤醒等待队列中的所有线程,让它们去读取rdlink
中已经就绪的事件。
那么问题来了,被唤醒的多个线程同时去访问一个就绪链表,这不就出现了线程安全问题吗?因此需要引入一把锁,来完成互斥。
lock
:
这是一把自旋锁,当线程访问eventpoll
结构体之前,都要去争用这把锁。只有持有这把锁的线程才能访问eventpoll
,进而访问到就绪链表rdlink
。
当用户调用epoll_wait
的时候,线程会去争用lock
这把锁。随后先读取rdlink
查看是否有事件就绪,如果没有事件就绪,就去wq
中进行等待,并释放locl
这把锁,直到自己超时,或者再次被唤醒。
如果线程在超时之前被唤醒,那么就说明rdlink
有事件就绪了,此时所有唤醒的线程同时争用一把lock
锁,争到锁的线程才能拿到已经就绪的事件。
当一个线程正在读取rdlink
,此时它持有lock
这把锁,刚好内核中有新的事件来了,那么又有问题来了:内核应该直接把新事件添加到rdlink
吗?内核也要争用lock
这把锁吗?
这个地方,Linux
做了特殊处理,如果用户正在读取rdlink
,内核同时给这个rdlink
添加新的数据,那么就会再次造成线程安全问题。
ovflist
:
这是一个单链表,用于临时存放已经就绪的事件。当用户正在访问rdlink
的时候,又有新的事件就绪,如果内核去争用lock
这把锁,就会导致内核效率降低。为此,内核在有用户访问rdlink
的时候,把新的事件添加到ovflist
中,当rdlink
空闲了,再把ovflist
的数据转移到rdlink
中。这样内核就可以在不争用lock
锁的情况下,快速把已经就绪的事件写入。
mtx
:
这是一把互斥锁,它的用途与lock
不太相同,mtx
主要任务是对文件资源进行管理。比如用户调用poll_ctl
向epoll
中添加或者删除文件的时候,内核需要持有mtx
这把锁,防止其它线程同时来修改epoll
导致错误。
至此,已经讲解完了绝大部分eventpoll
的内容,简单总结如下:
lock
(自旋锁):保护eventpoll
结构体,特别是rdllist
。当内核需要操作rdllist
链表时,需要获取这个锁,主要用于同一个进程内的并发控制mtx
(互斥锁):当一个线程试图修改epoll
,如通过epoll_ctl
添加或删除文件描述符,需要持有这个锁,主要用于防止同一进程的不同线程同时修改epoll
实例wq
(等待队列):供epoll_wait
使用,当一个线程调用epoll_wait
且没有就绪事件时,该线程会将自己添加到这个等待队列中,,然后进入睡眠状态,直到有事件发生或超时rdllist
(就绪链表):存储已经就绪的文件,当一个被监视的文件描述符上发生对应的事件时,相应的epitem
会被添加到这个链表中,当进程调用epoll_wait
时,内核会将这个链表中的事件复制到用户空间rbr
(红黑树):存储所有被监视的文件描述符,每个被监视的文件描述符对应一个epitem
,这些epitem
按照文件描述符的值排序存储在红黑树中,方便快速地查找和操作被监视的文件描述符。ovflist
(单链表):在向用户空间传输就绪事件的过程中,临时存储新发生的事件。这是为了支持内核不持有lock
情况下写入就绪事件,从而提高并发性能。
模式
思考一个问题:如果用户通过epoll
检测到某个socket
的事件已经就绪了,但是这个用户没有处理这个事情,下一次epoll_wait
还要不要返回这一个事件?
就是基于这个问题,衍生了两种epoll
工作模式:LT
模式与ET
模式
Level Trigger
LT
模式下,当用户没有处理事件,那么事件就一直保留在就绪链表rdlink
中,每次调用epoll_wait
都会返回这个事件
这种模式是epoll
的默认模式。
用户接收到事件后,可能某个报文太长了,一次读不完。那么LT
模式下一次还会进行通知,用户可以把剩下的报文读完。但是这就可能导致一个报文,需要调用更多次的epoll_wait
。
Edge Trigger
ET
模式下,当用户通过epoll_wait
拿到事件后,事件直接从rdlink
中删除,下一次不再进行通知
这种模式比LT
更加高效,这可以从两个角度解读:
- 这种模式下,一个报文只需要调用一次
epolll_wait
,因此效率高一点 - 这倒逼程序员必须一次性把报文读完,那么就会更快的进行业务处理,报文响应速度也更快
这里主要是第二点比较重要,当一个报文太长了,但是ET
模式下只进行一次通知。那么程序员收到通知后,就需要用一个while
循环一直读取套接字,直到读不出数据为止。这样一次通知程序员就能拿到完整报文,进而更早的进行业务处理,更早响应。而且提早把数据读走,内核的缓冲区也会被空出来,接收更多的新数据。
默认情况下,从文件读取文件是阻塞的,当最后一次while
循环读取不出内容了,程序就会阻塞住。因此这种情况下,要把文件读取改为非阻塞读取,如果读不出内容直接返回。
但是这也导致ET
的程序会比LT
更加复杂,实际开发中需要进行权衡。