多路IO转接服务器也叫做多任务IO服务器。该类服务器实现的主旨思想:不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件。
一. select
1. select函数介绍:
#include <sys/select.h>
/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
nfds: 监控的文件描述符集里最大文件描述符加1,因为此参数会告诉内核检测前多少个文件描述符的状态。
readfds: 监控读数据到达文件描述符集合,传入传出参数
writefds: 监控写数据到达文件描述符集合,传入传出参数
exceptfds: 监控异常发生到达文件描述符集合,如带外数据到达异常,传入传出参数
timeout: 定时阻塞监控时间,3种情况
1.NULL,永远等下去
2.设置timeval,等待固定时间
3.设置timeval里时间均为0,检查描述字后立即返回,轮询
2. select内核操作原理:
内核中使用位图机制来实现数据的集合,如果将对应位设置为1,表示通知内核监视对应的文件描述符。然而定义的位图并不允许我们直接使用位操作进行修改,系统专门为此提供了一套函数进行操作。
void FD_CLR(int fd, fd_set *set); //把文件描述符集合里fd位清0
int FD_ISSET(int fd, fd_set *set); //测试文件描述符集合里fd是否置1
void FD_SET(int fd, fd_set *set); //把文件描述符集合里fd位置1
void FD_ZERO(fd_set *set); //把文件描述符集合里所有位清0
socket -> setsockopt -> bind -> listen -> while(
temps = reads;
//1. temp是一个传入传出参数,传入需要监听的文件描述符,传出有读事件的文件描述符。
//2. 所以此处需要设置一个临时变量,以保留设置的需要监听的文件描述符的集合。
//3. 如果需要监听写事件,那么可以设置select的参数三,将需要监听写事件的文件描述符添加的该集合中。
ret = select(maxfd+1, &temps, NULL, NULL, NULL);//
if (FD_ISSET(lfd, &temps)) { ... } //判断是否有新连接。有则添加到reads数组中。
for(i=lfd+1; i<=maxfd; ++i) { ... } //遍历检测的文件描述符是否有读操作。
);
因为系统返回的文件描述符均符合当前系统最小可用,所以文件描述符本身的数值即为位图中的编号。
3. select 总结:
优点:
1. 跨平台。
缺点:
1. 调用select ,集合拷贝:用户态 -> 内核态 -> 用户态。
2. 遍历文件描述符的集合。
3. 支持的文件描述符少,只有1024个。
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <sys/select.h>#define SERV_PORT 8989int main(int argc, const char* argv[])
{int lfd, cfd;struct sockaddr_in serv_addr, clien_addr;int serv_len, clien_len;// 创建套接字lfd = socket(AF_INET, SOCK_STREAM, 0);// 初始化服务器 sockaddr_in memset(&serv_addr, 0, sizeof(serv_addr));serv_addr.sin_family = AF_INET; // 地址族 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有的IPserv_addr.sin_port = htons(SERV_PORT); // 设置端口 serv_len = sizeof(serv_addr);// 绑定IP和端口bind(lfd, (struct sockaddr*)&serv_addr, serv_len);// 设置同时监听的最大个数listen(lfd, 36);printf("Start accept ......\n");int ret;int maxfd = lfd;// reads 实时更新,temps 内核检测fd_set reads, temps;/*===============================================================*/// 记录要检测的文件描述符的数组int allfd[FD_SETSIZE]; // 1024// 记录数组中最后一个元素的下标int last_index = 0;// 初始化数组for(int i=0; i<FD_SETSIZE; ++i){allfd[i] = -1; // 无效文件描述符值}allfd[0] = lfd; // 监听的文件描述符添加到数组中/*===============================================================*/// 初始化监听的读集合FD_ZERO(&reads);FD_SET(lfd, &reads);while(1){// 每次都需要更新,否则select不会重新检测temps = reads;ret = select(maxfd+1, &temps, NULL, NULL, NULL);if(ret == -1){perror("select error");exit(1);}int i = 0;char bufip[64];// 判断是否有新连接if(FD_ISSET(lfd, &temps)){// 接受连接请求clien_len = sizeof(clien_len);int cfd = accept(lfd, (struct sockaddr*)&clien_addr, &clien_len);printf("client ip: %s, port: %d\n",inet_ntop(AF_INET, &clien_addr.sin_addr.s_addr, bufip, sizeof(bufip)),ntohs(clien_addr.sin_port));// 文件描述符放入检测集合FD_SET(cfd, &reads);// 更新最大文件描述符maxfd = maxfd < cfd ? cfd : maxfd;// cfd添加到检测数组中for(i=0; i<FD_SETSIZE; ++i){if(allfd[i] == -1){allfd[i] = cfd;break;}}// 更新数组最后一个有效值下标last_index = last_index < i ? i : last_index; }// 遍历检测的文件描述符是否有读操作for(i=lfd+1; i<=maxfd; ++i){if(FD_ISSET(i, &temps)){// 读数据char buf[1024] = {0};int len = read(i, buf, sizeof(buf));if(len == -1){perror("read error");exit(1);}else if(len == 0){// 对方关闭了连接FD_CLR(i, &reads);close(i);if(maxfd == i){maxfd--;}allfd[i] = -1;printf("对方已经关闭了连接。。。。。。\n");}else{printf("read buf = %s\n", buf);for(int j=0; j<len; ++j){buf[j] = toupper(buf[j]);}printf("--buf toupper: %s\n", buf);write(i, buf, strlen(buf)+1);}}}}close(lfd);return 0;
}
二. pselect
#include <sys/select.h>
int pselect(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, const struct timespec *timeout,const sigset_t *sigmask);
pselect 函数是一个防止信号干扰的增强型select 函数,pselect从形式向看相对于select多了一个参数,而具体区别如下:
1. pselect 使用timespec 结构,能指定到纳秒级(旧结构只能指定到微秒级)
2. pselect 增加了指向信号集的指针sigmask,表示信号屏蔽集。若sigmask为空,那么在与信号有关的方面,pselect的运行状况和select相同;否则,sigmask指向一信号屏蔽字,在调用pselect时,以原子操作的方式安装该信号屏蔽字。在返回时恢复以前的信号屏蔽字。
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/select.h>#define SIZE 128//服务端 select
int main(void)
{int ret = -1;int sockfd = -1;int connfd = -1;char buf[SIZE];struct sockaddr_in addr;struct sockaddr_in from;socklen_t len = sizeof(from);//select相关的参数int maxfd = -1;fd_set readfds;struct timespec tmo;//1. 创建套接字sockfd = socket(AF_INET, SOCK_STREAM, 0);if (-1 == sockfd){perror("sockfd"); goto err0;}//2. 绑定//指定IP和端口memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; //指定协议族addr.sin_port = htons(10086); //指定端口inet_pton(AF_INET, "192.168.73.42", (void *)&addr.sin_addr);ret = bind(sockfd, (void*)&addr, sizeof(addr));if (-1 == ret){perror("bind"); goto err1;}//3. 监听ret = listen(sockfd, 10); if (-1 == ret){perror("listen"); goto err1;}printf("服务端正在监听客户端的连接...\n");//4. 接受客户端的连接connfd = accept(sockfd, (void*)&from, &len);if (-1 == connfd){perror("accept");goto err1;}//输出连接服务端的客户端的IP和Portprintf("\033[32mclient IP: %s port: %d\033[0m\n", inet_ntop(AF_INET, (void*)&from.sin_addr, buf, SIZE), ntohs(from.sin_port));//5. 循环的接收和发送数据while(1){//设置超时的时间tmo.tv_sec = 3;tmo.tv_nsec = 0;//清空读集合FD_ZERO(&readfds); //将标准输入和套接字加入到监听读集合中FD_SET(STDIN_FILENO, &readfds); FD_SET(connfd, &readfds);//比较文件描述符maxfd = STDIN_FILENO > connfd ? STDIN_FILENO : connfd;//由内核监视对应的文件描述符ret = pselect(maxfd + 1, &readfds, NULL, NULL, &tmo, NULL);if (-1 == ret){perror("pselect"); break;}else if (0 == ret){printf("3 seconds timeout....\n"); continue;}else{//有数据可以读 标准输入有数据if (FD_ISSET(STDIN_FILENO, &readfds)) {memset(buf, 0, SIZE);fgets(buf, SIZE, stdin);//去掉最后一个换行if ('\n' == buf[strlen(buf) - 1])buf[strlen(buf) - 1] = '\0';//发送数据ret = send(connfd, buf, strlen(buf), 0);if (ret <= 0){perror("send"); break;}printf("server send %d bytes\n", ret);}//有数据可以读 套接字上有数据if (FD_ISSET(connfd, &readfds)) {memset(buf, 0, SIZE);ret = recv(connfd, buf, SIZE, 0); if (ret <= 0){perror("recv"); break;}printf("\033[31mrecv: %s\033[0m\n", buf);}}}//6. 关闭连接close(sockfd);close(connfd);return 0;
err1:close(sockfd);
err0:return 1;
}
三. poll
int poll(struct pollfd *fd, nfds_t nfds, int timeout);
struct pollfd {int fd; /* 文件描述符 */short events; /* 等待的事件 */short revents; /* 实际发生的事件 */
};
1. poll参数介绍:
pollfd:数组的地址
nfds:数组的最大长度, 数组中最后一个使用的元素下标+1
1. 内核会轮询检测fd数组的每个文件描述符
timeout:
1. == -1: 永久阻塞
2. == 0: 调用完成立即返回
3. >0: 等待的时长毫秒
2. poll评价:
优点:
1. 传入、传出事件分离。无需每次调用时,重新设定监听事件。
2. 文件描述符上限,可突破1024限制。能监控的最大上限数可使用配置文件调整。
缺点:(同select)
1. 每次调用都会出现一次从用户空间到内核空间的拷贝。
2. 每次返回都会出现一次从内核空间到用户空间的拷贝。
3. 返回后需要用户依次扫描fds数组,因此会做很多没必要的检查。
4. 不能跨平台。
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <poll.h>#define SERV_PORT 8989int main(int argc, const char* argv[])
{int lfd, cfd;struct sockaddr_in serv_addr, clien_addr;int serv_len, clien_len;// 创建套接字lfd = socket(AF_INET, SOCK_STREAM, 0);// 初始化服务器 sockaddr_in memset(&serv_addr, 0, sizeof(serv_addr));serv_addr.sin_family = AF_INET; // 地址族 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有的IPserv_addr.sin_port = htons(SERV_PORT); // 设置端口 serv_len = sizeof(serv_addr);// 绑定IP和端口bind(lfd, (struct sockaddr*)&serv_addr, serv_len);// 设置同时监听的最大个数listen(lfd, 36);printf("Start accept ......\n");// poll结构体struct pollfd allfd[1024];int max_index = 0;// initfor(int i=0; i<1024; ++i){allfd[i].fd = -1;allfd[i].events = POLLIN;}allfd[0].fd = lfd;while(1){int i = 0;int ret = poll(allfd, max_index+1, -1); if(ret == -1){perror("poll error");exit(1);}// 判断是否有连接请求if(allfd[0].revents & POLLIN){clien_len = sizeof(clien_addr);// 接受连接请求int cfd = accept(lfd, (struct sockaddr*)&clien_addr, &clien_len);printf("============\n");// cfd添加到poll数组for(i=0; i<1024; ++i){if(allfd[i].fd == -1){allfd[i].fd = cfd;break;}}// 更新最后一个元素的下标max_index = max_index < i ? i : max_index;}// 遍历数组for(i=1; i<=max_index; ++i){int fd = allfd[i].fd;if(fd == -1){continue;}if(allfd[i].revents & POLLIN){// 接受数据char buf[1024] = {0};int len = recv(fd, buf, sizeof(buf), 0);if(len == -1){perror("recv error");exit(1);}else if(len == 0){allfd[i].fd = -1;close(fd);printf("客户端已经主动断开连接。。。\n");}else{printf("recv buf = %s\n", buf);for(int k=0; k<len; ++k){buf[k] = toupper(buf[k]);}printf("buf toupper: %s\n", buf);send(fd, buf, strlen(buf)+1, 0);}}}}close(lfd);return 0;
}
四. epoll
epoll可以显著提高在大量并发连接中只有少量活跃的情况下的系统CPU利用率,它会复用文件描述符集合,内部维护了红黑树,只需要遍历被内核IO事件异步唤醒而加入的文件描述符集合即可。
1. epoll操作函数:与其他IO复用不同的一点,epoll需要三个操作函数来完成。
1. 创建一个红黑树的根节点:
int epoll_create(int size);
size参数用来告诉内核监听的数目,但这仅仅是一个建议,如今的系统中只要大于0即可。
2. 事件注册函数:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 感兴趣的事件和被触发的事件struct epoll_event {__uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */};// 保存触发事件的某个文件描述符相关的数据(与具体使用方式有关)typedef union epoll_data {void *ptr;int fd;__uint32_t u32;__uint64_t u64;} epoll_data_t;
op:表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的 fd 到 epfd 中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从 epfd 中删除一个 fd;
fd:需要监听的文件描述符。
event:告诉内核要监听什么事件,struct epoll_event 结构。
events 可以是以下几个宏的集合:需要使用位或( | )进行操作。
EPOLLIN :表示对应的文件描述符可以读(包括对端 SOCKET 正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET :将 EPOLL 设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
3. 等待事件的产生
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout );
1. epfd: epoll 专用的文件描述符,epoll_create()的返回值
2. events: 分配好的 epoll_event 结构体数组,epoll 将会把发生的事件赋值到events 数组中(events 不可以是空指针,内核只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存)。
3. maxevents: maxevents 告诉内核这个 events 有多大 。
4. timeout: 超时时间,单位为毫秒,为 -1 时,函数为阻塞。
2. epoll的应用:epoll默认为边沿触发,当没有即使处理事件时会继续触发,所以为了减少触发的次数以提高效率,应该使用水平触发,使用循环读取一次性读完所有数据。但是数据读完就进程就会阻塞,所以需要将与客户端进行通讯的文件描述符设置为非阻塞。即最终选定的模式为epoll非阻塞边沿触发,触发一次,读完所有数据。
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <sys/epoll.h>
#include <fcntl.h>#define SIZE 128
#define SIZE1 6//服务端 select
int main(void)
{int ret = -1;int sockfd = -1;int connfd = -1;int i = 0;int op = 0;char buf[SIZE];char buf1[SIZE1];struct sockaddr_in addr;struct sockaddr_in from;socklen_t len = sizeof(from);//epoll相关参数int epoll_fd = -1;int timeout = 3000;struct epoll_event event;struct epoll_event revent[SIZE]; //返回准备好的文件描述符的集合int count = 0;//1. 创建套接字sockfd = socket(AF_INET, SOCK_STREAM, 0);if (-1 == sockfd){perror("sockfd"); goto err0;}//设置端口复用op = 1;ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&op, sizeof(op));if (-1 == ret){perror("setsockopt"); goto err1;}//2. 绑定//指定IP和端口memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; //指定协议族addr.sin_port = htons(10086); //指定端口inet_pton(AF_INET, "192.168.73.42", (void *)&addr.sin_addr);ret = bind(sockfd, (void*)&addr, sizeof(addr));if (-1 == ret){perror("bind"); goto err1;}//3. 监听ret = listen(sockfd, 10); if (-1 == ret){perror("listen"); goto err1;}printf("服务端正在监听客户端的连接...\n");//4. 创建epoll文件描述符epoll_fd = epoll_create(1024);if (-1 == epoll_fd){perror("epoll_create"); goto err1;}printf("epoll_fd = %d\n", epoll_fd);//5. 上树 添加监听的文件描述符和事件event.events = EPOLLIN; //监听读事件 event.data.fd = sockfd; //监听的文件描述符ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event);if (-1 == ret){perror("epoll_ctl"); goto err2;}//6. 检测while(1){//雇佣了一个秘书//检测树中节点文件描述符是否有对应的事件发生 revent返回ret = epoll_wait(epoll_fd, revent, SIZE, timeout);if (-1 == ret){perror("epoll_wait"); break;}else if (0 == ret){printf("3 seconds timeout....\n"); continue;}else{//如果成功了 秘书就会告诉我们有多少个描述符准备好count = ret;for (i = 0; i < count; i++){//表示有数据可以读if (revent[i].events & EPOLLIN){//表示有新的客户端连接服务端if (revent[i].data.fd == sockfd){//4. 接受客户端的连接connfd = accept(sockfd, (void*)&from, &len);if (-1 == connfd){perror("accept");continue;}//输出连接服务端的客户端的IP和Portprintf("\033[32mclient IP: %s port: %d\033[0m\n", inet_ntop(AF_INET, (void*)&from.sin_addr, buf, SIZE), ntohs(from.sin_port));//设置文件描述符为非阻塞int val;//获取对应文件描述符的属性val = fcntl(connfd, F_GETFL);val |= O_NONBLOCK; //追加非阻塞属性//设置非阻塞属性fcntl(connfd, F_SETFL, val);//新节点上树event.events = EPOLLIN | EPOLLET; //读事件 设置边沿触发event.data.fd = connfd; //文件描述符ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, connfd, &event);if (-1 == ret){perror("epoll_ctl"); break;}} //sockfdelse{while(1){//接收数据 每次接收5个字节memset(buf1, 0, SIZE1); ret = recv(revent[i].data.fd, buf1, SIZE1 - 1, 0);//printf("=====>ret:%d\n", ret);if (ret < 0){perror("----->recv");break; }if (0 == ret)break;printf("\033[31mbuf: %s\033[0m", buf1); send(revent[i].data.fd, buf1, strlen(buf1), 0);}printf("\n");}}}}}//while(1)//6. 关闭连接close(epoll_fd);close(sockfd);return 0;
err2:close(epoll_fd);
err1:close(sockfd);
err0:return 1;
}