I/O 复用使得程序能够同时监听多个文件描述符,从而提高程序的性能。I/O 复用本身是阻塞的。Linux 下实现 I/O 复用的系统调用主要有 select
、poll
和 epoll
。
select 系统调用
select API
select
系统调用:在一段指定时间内,监听用户感兴趣的文件描述符上的可读、可写、异常事件:
#include <sys/select.h>
int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);
参数:
nfds
:指定监听的文件描述符的总数,通常是select
监听的所有文件描述符中的最大值+1。(因为文件描述符从 0 开始计数)。readfds
,writefds
,exceptfds
:分别指向可读、可写、异常等事件对应文件描述符集合。应用调用select
时,我们通过这3个参数传入自己感兴趣的文件描述符,当select
函数返回时,内核将修改它们来通知应用进程哪些文件描述符已经就绪。
-
fd_set
结构体:
#include <typesizes.h>
#define __FD_SETSIZE 1024#include <sys/select.h>
#define FD_SETSIZE __FD_SETSIZE
typedef long int __fd_mask;
#undef __NFDBITS
#define __NFDBITS (8 * (int) sizeof(__fd_mask))
typedef struct
{
#ifdef __USE_XOPEN__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
#define __FDS_BITS(set) ((set)->fds_bits);
#else__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
#define __FDS_BITS(set) ((set)->__fds_bits);
} fd_set;
fd_set
能容纳的文件描述符数量由FD_SETSIZE
指定
timeout
:设置select
函数的超时时间。
struct timeval{long tv_sec; //秒数long tv_usec; //微秒数
}
返回值:
select
成功时返回就绪(可读、可写、异常)文件描述符的总数,如果在超时时间内没有任何文件描述符就绪,select
返回0。select
失败返回会-1并设置errno
,如果在select
等待期间,程序收到信号,select
立刻返回-1并设置errno
为 EINTR
。
文件描述符就绪条件
哪些情况下文件描述符可以被认为是可读、可写或者出现异常:
下列情况socket可读
- socket内核接收缓冲区字节数大于等于低水位标记
SO_RCVLOWAT
; - socket通信对方关闭连接,此时socket读操作返回0;
- 监听socket上有新的连接请求;
- socket 上有未处理错误,可用
getsockopt
读取和清除错误。
下列情况socket可写
- socket内核发送缓冲区可用字节数大于等于低水位标记
SO_SNDLOWAT
; - socket通信写操作被关闭,对写操作关闭的socket执行写操作会触发
SIGPIPE
信号; - socket使用非阻塞
connect
连接成功或失败(超时)之后; - socket上有未处理错误,可用
getsockopt
读取和清除错误。
socket能处理的异常
- 只有一种情况:socket接收到带外数据。
处理带外数据
socket上接收到普通数据和带外数据都将使select函数返回,但 socket 处于不同的就绪状态:前者处于可读状态,后者处于异常状态。
下面的代码清单描述了 select
如何同时处理二者:
/*
#include ...
*/int main(int argc, char *argv[]) {// 省略其他操作while (1) {memset(buf, '\0', sizeof(buf));// 设置fdset的位fdFD_SET(connfd, &read_fds);FD_SET(connfd, &exception_fds);ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL);if (ret < 0) {printf("selection failure\n");break;}// 可读事件,采用普通的recv函数读取数据if (FD_ISSET(connfd, &read_fds)) {ret = recv(connfd, buf, sizeof(buf) - 1, 0);if (ret <= 0) {break;}printf("get %d bytes of normal data: %s\n", ret, buf);} // 异常事件,采用带MSG_OOB标志的recv函数读取带外数据else if (FD_ISSET(connfd, &exception_fds)) {ret = recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);if (ret <= 0) {break;}printf("get %d bytes of oob data: %s\n", ret, buf);}}close(connfd);close(listenfd);return 0;
}
recv
调用的描述见:TCP 数据读写
poll 系统调用
在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者。
#include <poll.h>
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
参数:
fds
:指定所有我们感兴趣的文件描述符上发生的可读、可写、异常事件。
struct pollfd{int fd; // 文件描述符short events; // events成员告诉poll函数监听fd成员上的哪些事件,它是一系列事件的按位或short revents; // 由内核修改,以通知应用程序fd上实际发生了哪些事件。
};
nfds
:指定被监听事件集合fds
参数数组的大小
typedef unsigned long int nfds_t
timeout
:指定poll
的超时值,单位是 ms。timeout == -1
则poll
调用会一直阻塞,直到某个事件发生;当timeout == 0
,poll
调用立刻返回。
poll
支持的事件类型:
自Linux内核2.6.17开始,GNU为poll
系统调用增加了POLLRDHUP
事件,它在socket上接收到对方关闭连接的请求后触发。
返回值:
- 与 select 相同。
select
成功时返回就绪(可读、可写、异常)文件描述符的总数,如果在超时时间内没有任何文件描述符就绪,select
返回0。select
失败返回会-1并设置errno
,如果在select
等待期间,程序收到信号,select
立刻返回-1并设置errno
为 EINTR
。
epoll 系列系统调用
内核事件表
epoll
函数是Linux特有的I/O复用函数,它在实现和使用上与select
、poll
函数有很大差异:
epoll
函数使用一组函数来完成任务,而非单个函数;epoll
函数把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select
和poll
函数那样每次调用都要重复传入文件描述符集或事件集;epoll
需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表,这个文件描述符使用epoll_create
函数来创建:
#include <sys/epoll.h>
int epoll_create(int size);
该函数返回的文件描述符将用作其他所有epoll
系统调用的第1个参数,以指定要访问的内核事件表。
操作内核事件表 epoll_ctl
:
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数:
fd
:要操作的文件描述符;op
:指定操作类型。
操作类型有如下三种:
-
EPOLL_CTL_ADD
:往事件表中注册fd
上的事件。EPOLL_CTL_MOD
:修改fd
上的注册事件。EPOLL_CTL_DEL
:删除fd
上注册事件。
event
:指定事件。
struct epoll_event{__uin32_t events; // epoll事件epoll_data_t data; // 用户数据
};
events
成员描述事件类型,epoll
函数支持的事件类型和poll
函数基本相同,表示epoll
事件类型的宏是在poll
对应的宏前加上E
,如epoll
的数据可读事件是EPOLLIN
,但epoll
有两个额外的事件类型EPOLLET
和EPOLLONESHOT
。epoll_data_t
定义如下:
typedef union epoll_data{void *ptr;int fd;uint32_t u32;uint64_t u64;
} epoll_data_t;
epoll_data_t
是一个联合体,其4个成员中使用最多的是fd
成员,它指定事件所从属的文件描述符。prt
成员是指向用户定义数据的指针,但由于epoll_data_t
是一个联合体,我们不能同时使用其ptr
成员和fd
成员。
返回值:
epoll_ctl
函数成功时返回0,失败时返回-1并设置errno
。
epoll_wait 函数
epoll
系列系统调用的主要接口是epoll_wait
,它在一段超时时间内等待一组文件描述符上的事件:
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
参数:
timeout
:设置epoll_wait
函数的超时时间;maxevents
:指定最多监听多少个事件,必须大于 0;events
:epoll_wait
函数如果检测到事件,就将所有就绪的事件从内核事件表(由epfd
参数指定)中复制到它的第二个参数events
指向的数组中,这个数组只用于输出epoll_wait
函数检测到的就绪事件,而不像select
和poll
函数的数组参数那样既用于传入用户注册的事件,又用于输出内核检测到的就绪事件,这样就极大地提高了应用进程索引就绪文件描述符的效率。
例如,我们要索引 poll
返回的就绪文件描述符:
int ret = poll(fds, MAX_EVENT_NUMBER, -1);// 遍历所有已注册文件描述符并找到其中的就绪者(当然可用ret来稍做优化)
for (int i = 0; i < MAX_EVENT_NUMBER; ++i) {if ( fds[i].revents & POLLIN ) { // 判断第i个文件描述符是否就绪int sockfd = fds[i].fd;// 处理socket}
}
而索引 epoll
返回的就绪文件描述符:
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
// 仅遍历就绪的ret个文件描述符
for (int i = 0; i < ret; ++i) {int sockfd = events[i].data.fd;// socketfd肯定就绪,直接处理
}
返回值:
成功时返回就绪的文件描述符个数,失败时返回-1
并设置errno
。
LT 模式和 ET 模式
epoll
对文件描述符的操作有两种模式:LT(Level Trigger,电平触发)模式和ET(Edge Trigger,边沿触发)模式。
LT模式是默认的工作模式,这种模式下epoll
相当于一个效率较高的poll
。当往epoll
内核事件表中注册一个文件描述符上的EPOLLET
事件时,epoll
将以ET模式来操作该文件描述符。ET模式是epoll
的高效工作模式。
对于采用LT工作模式的文件描述符,当epoll_wait
函数检测到其上有事件发生并将此事件通知应用进程后,应用进程可以不立即处理该事件,这样,当应用进程下次调用epoll_wait
时,epoll_wait
函数还会再次向应用进程通告此事件,直到该事件被处理。
而对于采用ET工作模式的文件描述符,当epoll_wait
函数检测到其上有事件发生并将此事件通知应用进程后,应用进程应立即处理该事件,因为后续的epoll_wait
调用将不再向应用进程通知这一事件。可见ET模式降低了同一个epoll
事件被重复触发的次数,因此效率比LT模式高。以下代码体现了LT和ET在工作方式上的差异(这里我只给出了主要的代码段,想了解完整代码的请参阅 《Linux 高性能服务器编程》第九章 P154-157):
LT 模式的工作流程:
void lt(epoll_event *events, int number, int epollfd, int listenfd) {char buf[BUFFER_SIZE];for (int i = 0; i < number; ++i) {int sockfd = events[i].data.fd;if (sockfd == listenfd) {struct sockaddr_in client_address;socklen_t client_addrlength = sizeof(client_address);int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);addfd(epollfd, connfd, false);} else if (events[i].events & EPOLLIN) {/* 只要socket读缓存中还有未读出的数据,这段代码就被触发 */printf("event trigger once\n");memset(buf, '\0', BUFFER_SIZE);int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);if (ret <= 0) {close(sockfd);continue;}printf("get %d bytes of content: %s\n", ret, buf);} else {printf("something else happened\n");}}
}
ET 模式工作流程:
void et(epoll_event *events, int number, int epollfd, int listenfd) {char buf[BUFFER_SIZE];for (int i = 0; i < number; ++i) {int sockfd = events[i].data.fd;if (sockfd == listenfd) {struct sockaddr_in client_address;socklen_t client_addrlength = sizeof(client_address);int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);addfd(epollfd, connfd, true);} else if (events[i].events & EPOLLIN) {// 这段代码不会被重复触发,所以需要循环读取数据printf("event trigger once\n");while (1) {memset(buf, '\0', BUFFER_SIZE);int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);if (ret < 0) {if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {printf("read later\n");break;}close(sockfd);break;} else if (ret == 0) {close(sockfd);} else {printf("get %d bytes of content: %s\n", ret, buf);}}} else {printf("something else happened\n");}}
}
ET模式下事件被触发的次数比LT模式下少很多。
使用ET模式的文件描述符应该是非阻塞的。如果文件描述符是阻塞的,那么读或写操作将会因为没有后续事件而一直处于阻塞状态。
EPOLLONESHOT 事件
即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次,这在并发程序中会引起问题,比如一个线程(或进程,下同)在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN
再次被触发),此时另一个线程被唤醒来读取这些新数据,于是就出现了两个线程同时操作一个socket的局面,这不是我们所期望的,我们期望的是一个socket连接在任一时刻都只被一个线程处理,这可用EPOLLONESHOT
事件实现。
对于注册了EPOLLONESHOT
事件的文件描述符,操作系统最多触发其上注册的一个可读、可写、异常事件,且只触发一次,除非我们使用epoll_ctl
函数重置该文件描述符上注册的EPOLLONESHOT
事件,这样,当一个线程在处理某个socket时,其他线程不可能有机会操作socket。注册了EPOLLONESHOT
事件的socket一旦被某个线程处理完毕,该线程就应立即重置这个socket上的EPOLLONESHOT
事件,以确保这个socket下次可读时,其EPOLLIN
事件能触发,从而让其他线程有机会处理这个socket。
以下代码展示了EPOLLONESHOT
事件的使用:
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024struct fds {int epollfd;int sockfd;
};int setnonblocking(int fd) {int old_option = fcntl(fd, F_GETFL);int new_option = old_option | O_NONBLOCK;fcntl(fd, F_SETFL, new_option);return old_option;
}// 将fd参数上的EPOLLIN和EPOLLET事件注册到epollfd参数指示的内核事件表中
// 参数oneshot指定是否注册fd参数上的EPOLLONESHOT事件
void addfd(int epollfd, int fd, bool oneshot) {epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET;if (oneshot) {event.events |= EPOLLONESHOT;}epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);setnonblocking(fd);
}// 重置fd参数上的事件,这样操作后,可以再次触发fd参数上的事件
void reset_oneshot(int epollfd, int fd) {epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}// 工作线程
void *worker(void *arg) {int sockfd = ((fds *)arg)->sockfd;int epollfd = ((fds *)arg)->epollfd;printf("start new thread to receive data on fd: %d\n", sockfd);char buf[BUFFER_SIZE];memset(buf, '\0', BUFFER_SIZE);// 循环读取sockfd上的数据,直到遇到EAGAIN错误while (1) {int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);if (ret == 0) {close(sockfd);printf("foreiner closed the connection\n");break;} else if (ret < 0) {if (errno == EAGAIN) {reset_oneshot(epollfd, sockfd);printf("read later\n");break;}} else {printf("get content: %s\n", buf);// 休眠5s,模拟数据处理过程sleep(5);}}printf("end thread receiving data on fd: %d\n", sockfd);
}int main(int argc, char *argv[]) {if (argc != 3) {printf("usage: %s ip_address port_number\n", basename(argv[0]));return 1;}const char *ip = argv[1];int port = atoi(argv[2]);int ret = 0;struct sockaddr_in address;bzero(&address, sizeof(address));address.sin_family = AF_INET;inet_pton(AF_INET, ip, &address.sin_addr);address.sin_port = htons(port);int listenfd = socket(PF_INET, SOCK_STREAM, 0);assert(listenfd >= 0);ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));assert(ret != -1);ret = listen(listenfd, 5);assert(ret != -1);epoll_event events[MAX_EVENT_NUMBER];int epollfd = epoll_create(5);assert(epollfd != -1);// 监听socket上不能注册EPOLLONESHOT事件,否则只能处理一个客户连接// 后续的连接请求将不再触发listenfd上的EPOLLIN事件addfd(epollfd, listenfd, false);while (1) {int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);if (ret < 0) {printf("epoll failure\n");break;}for (int i = 0; i < ret; ++i) {int sockfd = events[i].data.fd;if (sockfd == listenfd) {struct sockaddr_in client_address;socklen_t client_addrlength = sizeof(client_address);int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);// 对每个非监听文件描述符都注册EPOLLONESHOT事件addfd(epollfd, connfd, true);} else if (events[i].events & EPOLLIN) {pthread_t thread;fds fds_for_new_worker;fds_for_new_worker.epollfd = epollfd;fds_for_new_worker.sockfd = sockfd;// 对每个客户请求都启动一个工作线程为其服务pthread_create(&thread, NULL, worker, (void *)&fds_for_new_worker);} else {printf("something else happened\n");}}}close(listenfd);return 0;
}
从工作线程函数worker
来看,如果一个工作线程处理完某个socket上的一次请求(我们用休眠5秒来模拟此过程)之后,又接收到该socket上新的客户请求,则该线程将继续为这个socket服务,并且由于该socket上注册了EPOLLONESHOT
事件,主线程中epoll_wait
函数不会返回该描述符的可读事件,从而不会有其他线程读这个socket,如果工作线程等待5秒后仍没收到该socket上的下一批客户数据,则它将放弃为该socket服务,同时调用reset_oneshot
来重置该socket上的注册事件,这将使epoll
有机会再次检测到该socket上的EPOLLIN
事件,进而使得其他线程有机会为该socket服务。
有了EPOLLONESHOT
,尽管一个socket在不同时间可能被不同的线程处理,但同一时刻肯定只有一个线程在为它服务,这就保证了连接的完整性,从而避免了很多可能的竞态条件。
三组 I/O 复用函数的比较
事件集
int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);
select
函数的参数类型fd_set
没有将文件描述符和事件绑定,它仅仅是一个文件描述符集合,因此select
函数需要3个fd_set
类型参数来分别传入和输出可读、可写、异常事件,这使得select
函数不能处理更多类型的事件,另一方面,由于内核对fd_set
集合的修改,应用进程下次调用select
前不得不重置这3个fd_set
集合。
int poll(struct pollfd* fds, nfds_t nfds, int timeout);struct pollfd{int fd; // 文件描述符short events; // events成员告诉poll函数监听fd成员上的哪些事件,它是一系列事件的按位或short revents; // 由内核修改,以通知应用程序fd上实际发生了哪些事件。
};
poll
函数的参数类型pollfd
把文件描述符和事件都定义其中,任何事件都被统一处理,从而使得编程接口简洁地多,且内核每次修改的是pollfd
结构体的revents
成员,而events
成员保持不变,因此下次调用poll
时应用进程无需重置pollfd
类型的事件集参数。
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
由于每次select
和poll
函数都返回整个用户注册的事件集合(包括就绪和未就绪的),所以应用索引就绪文件描述符的时间复杂度为O(n)
。epoll
则采用与select
和poll
函数不同的方式来管理用户注册的事件,它在内核中维护一个事件表,并提供一个独立的系统调用epoll_ctl
来往内核事件表中添加、删除、修改事件,这样,每次epoll_wait
调用都直接从内核表中取得用户注册的事件,而无须反复从用户空间读入这些事件,epoll_wait
函数的events
参数仅用来返回就绪的事件,这使得应用进程索引就绪文件描述符的时间复杂度达到O(1)
。
最大支持的文件描述符数
poll
和epoll_wait
函数分别用nfds
和maxevents
参数指定最多监听多少文件描述符和事件,这两个数值都能达到系统允许打开的最大文件描述符数目,即65535(cat /proc/sys/fd/file-max)。而select
函数允许监听的最大文件描述符数量通常有限制,虽然用户可以修改这个限制,但这可能导致不可预期的后果。
工作模式
select
和poll
函数只能工作在相对低效的LT模式,而epoll
函数能工作在高效的ET模式,且epoll
函数还支持EPOLLONESHOT
事件,该事件能进一步减少可读、可写、异常事件触发的次数。
具体实现
从实现原理上说,select
和poll
函数采用的都是轮询方式,即每次调用都要扫描整个注册文件描述符集合,因此它们检测就绪事件的算法时间复杂度是O(n)
。
而epoll_wait
函数采用回调的方式,内核检测到就绪的文件描述符时,将触发回调函数,回调函数将该文件描述符上对应的事件插入内核就绪事件队列,然后内核在适当的时机将该就绪事件队列中的内容拷贝到用户空间,因此epoll_wait
函数无须轮询整个文件描述符集合来检测哪些事件已经就绪,其算法时间复杂度为O(1)
。
当活动连接比较多时,epoll_wait
的回调函数被触发的过于频繁,会导致 epoll_wait
的效率反而降低,因此 epoll_wait
适用于连接数量多,但是活动连接较少的情况。
I/O 复用的高级应用一:非阻塞 connect
connect
系统调用的man手册中有如下一段内容:
这段话描述了connect
函数出错时的一种errno
值(EINPROGRESS
),这种错误发生在对非阻塞的socket调用connect
,而连接又没有立即建立时,此时,我们可以调用select
、poll
等函数来监听这个连接失败的socket上的可写事件,当select
、poll
等函数返回后,再利用getsockopt
函数来读取错误码并清除该socket上的错误,如果错误码是0,表示连接成功建立,否则连接建立失败。
通过非阻塞connect
,我们就能同时发起多个连接并一起等待。但是这种方法存在几处移植性问题。
I/O 复用的高级应用二:聊天室程序
像ssh这样的登录服务通常需要同时处理套接字描述符和用户输入输出描述符,这可用I/O复用来实现,下面用poll
函数为例实现一个简单的聊天室程序,该聊天室程序能让所有用户同时在线群聊,它分为客户端和服务器两部分。
客户端有两个功能
服务器的功能是接收客户数据,并把客户数据发送给每个登录到该服务器上的客户端(数据发送者除外)。
客户端程序使用poll
函数同时监听用户输入和网络连接,并利用splice
函数将用户输入内容直接定向到网络连接上发送,从而实现数据零拷贝,提高了程序执行效率。splice函数
服务器使用poll
函数同时管理监听socket和连接socket,且使用牺牲空间换取事件的策略来提高服务器性能。
I/O 复用的高级应用三:同时处理 TCP 和 UDP 服务
以上讨论的服务器程序只监听一个端口,实际应用中,有些服务器程序能同时监听多个端口,如超级服务器inetd和android的调试服务adbd。
从bind
系统调用的参数来看,一个socket只能与一个socket地址绑定,即一个socket只能用来监听一个端口,因此,如果服务器要同时监听多个端口,就必须创建多个socket,并将它们分别绑定到各个端口上,这样,服务器就需要同时管理多个监听socket,这可使用I/O复用技术实现。另外,即使是同一个端口,如果服务器要同时处理该端口上的TCP和UDP请求,也需要创建两个不同的socket,一个是流socket,另一个是数据报socket,并将它们都绑定到该端口上。
超级服务 xinetd
Linux因特网服务inetd是超级服务,它同时管理着多个子服务,即监听多个端口,现在Linux上使用的inetd服务程序通常是其升级版本xinetd,xinetd程序的原理与inetd的相同,但增加了一些控制选项,并提高了安全性。