1.select
// sizeof(fd_set) = 128 1024
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);- 参数:- nfds : 委托内核检测的最大文件描述符的值 + 1- readfds : 要检测的文件描述符的读的集合,委托内核检测哪些文件描述符的读的属性- 一般检测读操作- 对应的是对方发送过来的数据,因为读是被动的接收数据,检测的就是读缓冲区- 是一个传入传出参数- writefds : 要检测的文件描述符的写的集合,委托内核检测哪些文件描述符的写的属性- 委托内核检测写缓冲区是不是还可以写数据(不满的就可以写)- exceptfds : 检测发生异常的文件描述符的集合- timeout : 设置的超时时间struct timeval {long tv_sec; /* seconds */long tv_usec; /* microseconds */};- NULL : 永久阻塞,直到检测到了文件描述符有变化- tv_sec = 0 tv_usec = 0, 不阻塞- tv_sec > 0 tv_usec > 0, 阻塞对应的时间- 返回值 :- -1 : 失败- >0(n) : 检测的集合中有n个文件描述符发生了变化// 将参数文件描述符fd对应的标志位设置为0
void FD_CLR(int fd, fd_set *set);
// 判断fd对应的标志位是0还是1, 返回值 : fd对应的标志位的值,0,返回0, 1,返回1
int FD_ISSET(int fd, fd_set *set);
// 将参数文件描述符fd 对应的标志位,设置为1
void FD_SET(int fd, fd_set *set);
// fd_set一共有1024 bit, 全部初始化为0
void FD_ZERO(fd_set *set);
可以帮助我们去检测一系列的文件描述符的集合,并且检测这些文件描述符的状态,这一系列的文件描述符分为两类:一类是监听的,监听的文件描述符集合;另一类是通信的,通信的文件描述符集合。在服务器端监听的文件描述符,它也就只有一个,lfd。另外一类就是通信的文件描述符,它有n个,在服务器端每次建立一个新连接,都会得到一个用于通信的文件描述符。
意味着通过select能检测的文件描述符的个数:n+1,其中n是通信的文件描述符的个数,1是一个监听的文件描述符的个数。这n+1个文件描述符,不管是读缓冲区还是写缓冲区,都可以基于select来进行检测。
【思考】为什么要让最大的文件描述符加一呢?(nfds : 委托内核检测的最大文件描述符的值 + 1)
【回答】select的内部是基于一个线性表来进行文件描述符的状态检测的,因此需要把最大的那个文件描述符指定出来。如果指定出来,那么在进行线性遍历的时候,它就是结束的一个标志。若不告诉遍历的最大值,就无法确定结束的位置。
另外,在指定第一个参数的时候,若不知道写多少,就是说找不到最大的那个文件描述符,可以写成1024,因为select能检测的最大的文件描述符的个数就是1024个。其实这和每个进程地址空间里边存储的默认的文件描述符的个数是相同的。因为只要创建一个进程,在这个进程的内核区就有一个文件描述符表。这个文件描述符表默认里边存储的文件描述符的个数是1024个。
它们按顺序是0~1023,不管是我们打开一个磁盘文件,或者说创建一个用于监听/通信的套接字,都会在这个文件描述符表里边占用一个对应的文件描述符,内核里边维护的文件描述符表默认是1024个,但是我们可以通过一些操作来修改文件描述符表里边存储的文件描述符的最大上限。但是这个select能检测的最大的文件描述符的个数是1024,一般情况下是不能改变的。
若想改变这个值,需要修改内核的源码,然后再重新编译一下这个内核。还有一个点,因为这个select是跨平台,若在linux上使用这个select函数,nfds:委托内核检测的这三个集合中最大的文件描述符 + 1,内核需要线性遍历这些集合中的文件描述符,这个值是循环结束的条件;在Windows中这个参数是无效的,指定为-1即可。
【举个栗子】
假设说这个集合里边放进去的是readfds:5、6、7、8、9;writefds:7、8、9、10;exceptfds:5、6、7、8、9、10,也就是每个文件描述符可以通过select同时检测它们的读缓冲区,写缓冲区以及它们是不是有异常。
一定要注意:是可以同时检测这个文件描述符的读写或者异常
当这三个要被检测的集合传递给select之后,select就会把这个数据给到内核,那么内核就会把这三个集合拷贝一份。就是内核里边同样有了这三份数据。那么内核就会基于这个线性表,去检测三个集合里边的文件描述符的读写或者是异常事件。当内核通过线性遍历,把这个要检测的文件描述符它们对应的读写缓冲区以及异常检测一遍之后,发现某些文件描述符的读缓冲区里边有数据了,或者说这些文件描述符的写缓冲区可写,或者说这些文件描述符发生了异常。那么它就会给我们传出对应的这三个集合。那么传出的三个集合。
假设读集合的5、6号文件描述符,它的读缓冲区里边有数据,5、6、7、8、9、10它们的写缓冲区可写,异常是没有的,所以这个异常集合啥也没有,那么内核会把这三个集合传出去。
【思考】那么传出的时候怎么给到我们用户?
它还会把这个传出的数据再次写入这个读写集合,这个指针指向的这个内存地址里边,也就是这个内存里面的数据是被内核修改了。当一个select的函数被调用成功之后,那么这三个参数指针指向的内存地址里的数据会被内核修改,那么我们就可以通过这个select函数的返回值,来判断内核检测到了哪些文件描述符。
这个readfds里边的数据就从5、6、7、8、9 变成 5、6,也就是说内核告诉我们5、6号文件描述符的读缓冲区里边有数据了。writefds里边的数据是7、8、9、10,内核告诉我们7、8、9、10号文件描述符的写缓冲区是可写的。内核检测了一遍发现没有异常,所以传入5、6、7、8、9、10,传出是空集合。
接着基于的得到的集合,比如说读集合里边有5、6号文件描述符,再判断一下这个5、6号描述符到底适用于监听的,还是用于通信的。若用于监听,就和客户端建立连接,建立连接要用accept()函数。若用于通信,就去接收数据,检测到的这个文件描述符的读缓冲区是可读的,就调用read或者recv函数可以把数据读出来;检测到的这个文件描述符的读缓冲区是可读的,就调用read或者recv函数可以把数据读出来;检测到的这个文件描述符的写缓冲区是可写的,就调用write或者send函数把这个数据发出去;对于这个异常集合,若检测到文件描述符里边有数据,就处理这个异常。若判断这个集合是空的,说明就没有任何异常,就不需要做任何处理了。
其实这三个参数就是对应的这三个集合其实是传入传出参数,传入传出意味着在指定实参的时候,需要给这个实参进行初始化,传出就是这个函数体内部对这个数据做了读操作之后,它会进行一系列的后续处理,处理完毕之后,它会把最新的数据再次写入到这个指针指向这块内存地址里面。当函数调用完毕之后,就会得到最新的数据。我们就可以基于这个新的数据进行对应的相关操作了,其实这个传出参数就相当于是返回值,通过传出参数就可以返回很多数据。这就是select函数的第2、3、4个参数是传入传出参数。
select函数的第5个参数,它是一个结构体,通过这个结构体可以指定一个时长,这个时长就是select函数的检测时长。因为通过select的函数可以检测这些集合里边的文件描述符,假设这些文件描述符没有满足条件的,就是这个异常集合里边没有异常,或者说读集合里边对应的文件描述符的读缓冲区里边都没有数据,或者说写集合里边对应的文件描述符的写缓冲区里边都是满的,不能再往里边写数据了,这就相当于条件不满足。若条件不满足,这个select就会持续的进行检测。我们可以指定检测时间timeout,假设指定为5s,那么这个select最多会检测5s,当5s之后,发现还是没有满足条件的文件描述符,那么这个select就返回了。如果说在5s之内,发现有满足条件的文件描述符,也就是说这个文件描述符的状态是就绪了,它在这个指定的时间内,检测到有就绪的文件描述符就直接返回了。若没有检测到,在5s之后,这个函数也就返回。
【并发处理流程图】
2.实现一个简易的select并发服务器
server.c
#include <stdio.h>
#include <ctype.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/select.h>
#include <sys/stat.h>
#include <string.h>
#include <fcntl.h>
#include <arpa/inet.h>// server
int main(int argc, const char *argv[]) {// 创建监听的套接字int lfd = socket(AF_INET, SOCK_STREAM, 0);if(lfd == -1) {perror("socket error");exit(1);}// 绑定struct sockaddr_in saddr;memset(&saddr, 0, sizeof(saddr));saddr.sin_family = AF_INET;saddr.sin_port = htons(9999);saddr.sin_addr.s_addr = htonl(INADDR_ANY);// 绑定到所有网卡int ret = bind(lfd,(struct sockaddr *)&saddr, sizeof(saddr));if(ret == -1) {perror("bind error");exit(1);}// 监听ret = listen(lfd, 64);if(ret == -1) {perror("listen error");exit(-1);}fd_set rdset,tmpset;FD_ZERO(&rdset);FD_SET(lfd, &rdset);int maxfd = lfd;while(1) {tmpset = rdset;int ret = select(maxfd + 1, &tmpset, NULL, NULL, NULL);// 判断是不是监听的fdif(FD_ISSET(lfd, &tmpset)) {// 接受客户端的连接int cfd = accept(lfd, NULL, NULL);FD_SET(cfd, &rdset);maxfd = maxfd > cfd ? maxfd : cfd;}for (int i = 0; i <= maxfd; i++) {if(i!=lfd && FD_ISSET(i, &tmpset)) {// 接收数据char buf[1024] = {0};int len = recv(i, buf, sizeof(buf), 0);if(len == -1) {perror("recv error");exit(1); }else if(len == 0) {printf("客户端已经断开连接...\n");close(i);FD_CLR(i, &rdset);break;}printf("read buf = : %s\n", buf);// 小写转大写for (int i = 0; i < len; i++) {buf[i] = toupper(buf[i]);}printf("after buf = %s\n",buf);// 大写串发给客户端ret = send(i, buf, strlen(buf)+1, 0);if(ret == 1) {perror("send error");exit(1);}}}}close(lfd);return 0;
}
client.c
#include <stdio.h>
#include <ctype.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/select.h>
#include <sys/stat.h>
#include <string.h>
#include <fcntl.h>
#include <arpa/inet.h>// server
int main(int argc, const char *argv[]) {// 创建监听的套接字int lfd = socket(AF_INET, SOCK_STREAM, 0);if(lfd == -1) {perror("socket error");exit(1);}// 绑定struct sockaddr_in saddr;memset(&saddr, 0, sizeof(saddr));saddr.sin_family = AF_INET;saddr.sin_port = htons(9999);saddr.sin_addr.s_addr = htonl(INADDR_ANY);// 绑定到所有网卡int ret = bind(lfd,(struct sockaddr *)&saddr, sizeof(saddr));if(ret == -1) {perror("bind error");exit(1);}// 监听ret = listen(lfd, 64);if(ret == -1) {perror("listen error");exit(1);}fd_set rdset,tmpset;FD_ZERO(&rdset);FD_SET(lfd, &rdset);int maxfd = lfd;while(1) {tmpset = rdset;int ret = select(maxfd + 1, &tmpset, NULL, NULL, NULL);// 判断是不是监听的fdif(FD_ISSET(lfd, &tmpset)) {// 接受客户端的连接int cfd = accept(lfd, NULL, NULL);FD_SET(cfd, &rdset);maxfd = maxfd > cfd ? maxfd : cfd;}for (int i = 0; i <= maxfd; i++) {if(i!=lfd && FD_ISSET(i, &tmpset)) {// 接收数据char buf[1024] = {0};int len = recv(i, buf, sizeof(buf), 0);if(len == -1) {perror("recv error");exit(1); }else if(len == 0) {printf("客户端已经断开连接...\n");FD_CLR(i, &rdset);close(i);break;}printf("read buf = %s\n", buf);// 小写转大写for (int i = 0; i < len; i++) {buf[i] = toupper(buf[i]);}printf("after buf = %s\n",buf);// 大写串发给客户端ret = send(i, buf, strlen(buf)+1, 0);if(ret == -1) {perror("send error");exit(1);}}}}close(lfd);return 0;
}
【思考】为什么初始化的是读集合?
用来监听的文件描述符是用来检测有无客户端的连接,假设客户端有一个连接请求,它会把这个数据发送过来,存储到用于监听文件描述符对应的读缓冲区里边,因此初始化的集合应该是读集合。
【思考】当检测到通信的文件描述符里边对应的读缓冲区有数据的时候,就接收数据。建立连接或者接收数据,它们是并行操作的吗?
【回答】肯定不是,因为在程序里边只有一个线程,或者说只有一个进程,那么这个进程在建立连接的时候,它就不能够去通信。它在通信的时候就不能建立连接,那它是怎么来完成一系列的任务呢?那肯定是线性的来执行的。程序在处理的时候,按照代码指定的先后顺序,如果读集合里边有监听的文件描述符,那么它就先建立连接,连接建立成功之后,再向下执行,判断一下这个读集合里面有没有通信的文件描述符。如果有的话,依次处理接收数据。虽然这种方式可以实现并发,但是它的效率相对来说还是比较低的。因为如果我们有多个客户端,同时和这个服务器端通信,这个服务器需要依次来处理客户端的请求,若客户端不够多,或者说客户端的请求足够复杂,那么服务器在处理这一系列的请求的时候,其实用的时间还是非常的长的,服务器给客户端回复数据,那么客户端这边还需要等待的时间也是比较长的。为了提高效率,可以使用多线程的方式。
3.多线程 select 并发服务器
server.c
#include <stdio.h>
#include <ctype.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/select.h>
#include <sys/stat.h>
#include <string.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <pthread.h>pthread_mutex_t mutex;typedef struct fdinfo {int fd;int *maxfd; fd_set *rdset;
}FDInfo;void* acceptConn(void* arg) {printf("子线程线程ID:%ld\n",pthread_self());FDInfo* info = (FDInfo*)arg;int cfd = accept(info->fd, NULL, NULL);pthread_mutex_lock(&mutex);FD_SET(cfd, info->rdset);*info->maxfd = *info->maxfd > cfd ? *info->maxfd : cfd;pthread_mutex_unlock(&mutex);free(info);return NULL;
}void* communication(void* arg) {printf("子线程线程ID:%ld\n",pthread_self());FDInfo* info = (FDInfo*)arg;char buf[1024] = {0};int len = recv(info->fd, buf, sizeof(buf), 0);if(len == -1) {perror("recv error");free(info);return NULL;}else if(len == 0) {printf("客户端已经断开连接...\n");pthread_mutex_lock(&mutex);FD_CLR(info->fd, info->rdset);pthread_mutex_unlock(&mutex);close(info->fd);free(info);return NULL;}printf("read buf = %s\n", buf);// 小写转大写for (int i = 0; i < len; i++) {buf[i] = toupper(buf[i]);}printf("after buf = %s\n",buf);// 大写串发给客户端int ret = send(info->fd, buf, strlen(buf)+1, 0);if(ret == -1) {perror("send error");}free(info);return NULL;
}// server
int main(int argc, const char *argv[]) {pthread_mutex_init(&mutex,NULL);// 创建监听的套接字int lfd = socket(AF_INET, SOCK_STREAM, 0);if(lfd == -1) {perror("socket error");exit(1);}// 绑定struct sockaddr_in saddr;memset(&saddr, 0, sizeof(saddr));saddr.sin_family = AF_INET;saddr.sin_port = htons(9999);saddr.sin_addr.s_addr = htonl(INADDR_ANY);// 绑定到所有网卡int ret = bind(lfd,(struct sockaddr *)&saddr, sizeof(saddr));if(ret == -1) {perror("bind error");exit(1);}// 监听ret = listen(lfd, 64);if(ret == -1) {perror("listen error");exit(-1);}fd_set rdset,tmpset;FD_ZERO(&rdset);FD_SET(lfd, &rdset);int maxfd = lfd;while(1) {pthread_mutex_lock(&mutex);tmpset = rdset;pthread_mutex_unlock(&mutex);int ret = select(maxfd + 1, &tmpset, NULL, NULL, NULL);// 判断是不是监听的fdif(FD_ISSET(lfd, &tmpset)) {// 接受客户端的连接FDInfo* info = (FDInfo*)malloc(sizeof(FDInfo));info->fd = lfd;info->maxfd = &maxfd;info->rdset = &rdset;// 创建子线程pthread_t tid;pthread_create(&tid, NULL, acceptConn, info);pthread_detach(tid);}for (int i = 0; i <= maxfd; i++) {if(i!=lfd && FD_ISSET(i, &tmpset)) {// 接收数据// 创建子线程pthread_t tid;FDInfo* info = (FDInfo*)malloc(sizeof(FDInfo));info->fd = i;info->rdset=&rdset;pthread_create(&tid, NULL, communication, info);pthread_detach(tid);}}}pthread_mutex_destroy(&mutex);close(lfd);return 0;
}
client.c
#include <stdio.h>
#include <ctype.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>int main(int argc,char* argv[]) {int fd = socket(AF_INET,SOCK_STREAM,0);if(fd == -1) {perror("socket");return -1;}struct sockaddr_in saddr;saddr.sin_family = AF_INET;saddr.sin_port = htons(9999);inet_pton(AF_INET,"127.0.0.1",&saddr.sin_addr.s_addr);// 连接服务器int ret = connect(fd,(struct sockaddr*)&saddr,sizeof(saddr));if(ret == -1) {perror("connect");return -1;}// 通信int num = 0;while (1) {char sendBuf[1024] = {0};// fgets(sendBuf,sizeof(sendBuf),stdin);sprintf(sendBuf,"hello,world,%d\n...",num++);write(fd,sendBuf,strlen(sendBuf) + 1);// 接收// 阻塞等待int len = read(fd,sendBuf,sizeof(sendBuf));if(len == -1) {perror("read error");return -1;}else if(len > 0) {printf("read buf = %s\n",sendBuf);}else{printf("服务器已经断开连接...\n");break;}sleep(1);}close(fd);return 0;
}
推荐我的文章:
I/O多路复用 select 、poll_呵呵哒( ̄▽ ̄)"的博客-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/132474888?spm=1001.2014.3001.5501