高级【IO】

news/2024/11/27 2:11:50/

目录

一.五种IO模型

(1)阻塞IO:

 (2)非阻塞IO

(3)信号驱动IO:

(4)IO多路转接

(5)异步IO

二.高级IO概念

1.同步通信、异步通信

2.阻塞、非阻塞

3.其它高级IO

三.非阻塞IO

1.fcntl

四.多路转接之select

五.多路转接之poll

六.多路转接之epoll

1.epoll接口

2.工作原理

3.工作方式


        前言:这一篇介绍五种IO模型,以及多路转接的select、poll和epoll,着重介绍epoll,并用epoll实现服务器。

一.五种IO模型

        IO就是:等+数据拷贝

1.等 - IO事件就绪(检测功能成分)

2.数据拷贝

        高效的IO就是:单位时间,等的比重越小,IO的效率越高

IO模型:

①阻塞式IO:钓鱼中,一直盯着鱼竿,直到鱼上钩就钓【自己等(阻塞),自己钓】

②非阻塞IO:钓鱼中,看一会手机,再看看鱼上钩没,上钩就钓【自己等(非阻塞 / 轮询),自己钓】

③信号驱动式IO:在鱼竿上放个铃铛,听到铃铛上,说明上钩,直接钓,其它时间一直看手机【没有直接等,自己钓】

④多路转接(多路复用):一次带来几百个鱼竿,一起钓,哪个上钩就钓哪个【自己等(一次检测多个),自己钓】

⑤异步IO:让别人帮自己钓鱼,但是别人钓鱼的成果要给自己【没有自己等,没有自己钓,但是要拿结果】

        5中IO模型中,效率最高的是多路转接.

        除了异步IO外,其它4个都是同步IO. 同步IO和异步IO的区别就是:有没有参与IO细节,参与了的就是同步IO,没参与就是异步IO.

        正常使用中,90%是:阻塞IO,部分是非阻塞和多路转接.

(1)阻塞IO:

        在内核将数据准备好之前, 系统调用会一直等待. 所有的套接字, 默认都是阻塞方式.

        阻塞IO是最常见的IO模型.

 (2)非阻塞IO

        如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EWOULDBLOCK错误码

        非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询. 这对CPU来说是较大的浪费, 一般只有特定场景下才使用.

(3)信号驱动IO:

        内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作.

(4)IO多路转接

        虽然从流程图上看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件
描述符的就绪状态.

(5)异步IO

        由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据).

        任何IO过程中, 都包含两个步骤. 第一是等待, 第二是拷贝. 而且在实际的应用场景中, 等待消耗的时间往往都远远高于拷贝的时间. 让IO更高效, 最核心的办法就是让等待的时间尽量少.

二.高级IO概念

1.同步通信、异步通信

        同步和异步关注的是消息通信机制:

        所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回. 但是一旦调用返回,就得到返回值了; 换句话说,就是由调用者主动等待这个调用的结果
        异步则是相反, 调用在发出之后,这个调用就直接返回了,所以没有返回结果; 换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果; 而是在调用发出后, 被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用

        另外, 我们回忆在讲多进程多线程的时候, 也提到同步和互斥. 这里的同步通信和进程之间的同步是完全不想干的概念.

        进程/线程同步也是进程/线程之间直接的制约关系
        是为完成某种任务而建立的两个或多个线程,这个线程需要在某些位置上协调他们的工作次序而等待、传递信息所产生的制约关系. 尤其是在访问临界资源的时候

        看到同步的时候一定要先搞清楚大背景是什么. 这个同步, 是同步通信异步通信的同步, 还是同步与互斥的同步.

2.阻塞、非阻塞

        阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.

        阻塞调用是指调用结果返回之前,当前线程会被挂起. 调用线程只有在得到结果之后才会返回.
        非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程

3.其它高级IO

        非阻塞IO,纪录锁,系统V流机制, I/O多路转接(也叫I/O多路复用) ,readv和writev函数以及存储映射IO(mmap),这些统称为高级IO

三.非阻塞IO

        一个文件描述符, 默认都是阻塞IO.

1.fcntl

int fcntl(int fd, int cmd, ... /* arg */ )

①第一个参数fd:要设置哪个文件描述符

②第二个参数cmd:要做什么

③第三个参数arg:要设置什么状态

        

        fcntl函数有5种功能:

复制一个现有的描述符(cmd=F_DUPFD) .
获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD).
获得/设置文件状态标记(cmd=F_GETFL或F_SETFL).
获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN).
获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW.

        实现一个非阻塞IO:

main.c:

 #include "util.hpp"#include <string>#include <cstring>#include <vector>#include <functional>using namespace std;using namespace Util;using func_t = std::function<void()>;void show()
{cout << "正在处理其它的事情" << endl;
}int main(){std::vector<func_t> funcs;funcs.push_back(show);SetNonBlock(0);char buffer[1024];while(true){buffer[0] = 0;int n = scanf("%s", buffer);if(n == -1){cout << "errno: " << errno << " desc: " << strerror(errno) << endl;for(const auto &f : funcs){f();}}else{cout << "刚刚获取的内容是# " << buffer << endl;}sleep(1);}return 0;}

util.hpp:

#ifndef _UTIL_HPP_
#define _UTIL_HPP_#include <iostream>
#include <unistd.h>
#include <fcntl.h>namespace Util
{bool SetNonBlock(int sock){int flag = fcntl(sock, F_GETFL);if(flag == -1) return false;int n = fcntl(sock, F_SETFL, flag | O_NONBLOCK);if(n == -1) return false;return true;}
}#endif

结果:

四.多路转接之select

        文件描述符状态的变化:① 可读不可读的变化 ② 可写不可写的变化 ③ 异常不异常的变化

        因此,等fd,其实就是在等事件:① 读事件就绪 ② 写事件就绪 ③ 异常事件

        系统提供select函数来实现多路复用输入/输出模型

        select只负责一件事情:等(等待文件描述符事件变化)

        select系统调用是用来让我们的程序监视多个文件描述符的状态变化的
        程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

        select函数:

int select(int nfds, fd_set *readfds, fd_set *writefds,

        fd_set *exceptfds, struct timeval *timeout)

        第一个参数nfds:所要等的众多的文件描述符中最大的文件描述符+1.

        第二、三、四个参数readfds、writedfs、exceptfds是读文件描述符集.

输入时:用户告诉OS,要注意我设置的多个fd中的读事件是否就绪.

输出时:OS告诉用户,你让我注意的多个fd中,有哪些已经就绪. 

fd_set是位图结构的.

        以读事件readfds为例:

1.比特位的位置代表fd的编号”是否“的概念

2.输入时(用户告诉内核):要注意我设置的多个fd"是否"中的事件是否就绪.

3.输出时(内核告诉用户):用户曾经让我注意的多个fd中,有哪些事件已经就绪. 

        比如readfds的位图结构为0110 1010(输入时,用户给OS的),这里需要OS注意4个fd,然后输出时readfds的位图结构为0000 0010,这就就说明OS注意的4个fd中,只有第7位的fd事件已就绪.

输入和输出用的是同一张位图.

        第五个参数timeout为结构timeval,用来设置select()的等待时间.

这里等待的策略问题也可以分为:① 阻塞等待 ② 非阻塞等待 ③ 设定deadline

        这个timeout就是设置一个deadline(一个时间),如果阻塞,超时就会立马返回,如果在时间之前完成,就返回剩余的时间值.

NULL:则表示select()没有timeout, select将一直被阻塞,直到某个文件描述符上发生了事件;
0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生。
特定的时间值:如果在指定的时间段里没有事件发生, select将超时返回、

        返回值int:① > 0 :有几个fd就绪了

                           ② == 0:timeout

                           ③ <0:报错了

下面编写一段select代码(只实现读):

        Sock.hpp:

#pragma once#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>
#include <cerrno>
#include <cassert>class Sock
{
public:static const int gbacklog = 20;static int Socket(){int listenSock = socket(PF_INET, SOCK_STREAM, 0);if (listenSock < 0){exit(1);}int opt = 1;setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));return listenSock;}static void Bind(int socket, uint16_t port){struct sockaddr_in local; // 用户栈memset(&local, 0, sizeof local);local.sin_family = PF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;// 2.2 本地socket信息,写入sock_对应的内核区域if (bind(socket, (const struct sockaddr *)&local, sizeof local) < 0){exit(2);}}static void Listen(int socket){if (listen(socket, gbacklog) < 0){exit(3);}}static int Accept(int socket, std::string *clientip, uint16_t *clientport){struct sockaddr_in peer;socklen_t len = sizeof(peer);int serviceSock = accept(socket, (struct sockaddr *)&peer, &len);if (serviceSock < 0){// 获取链接失败return -1;}if(clientport) *clientport = ntohs(peer.sin_port);if(clientip) *clientip = inet_ntoa(peer.sin_addr);return serviceSock;}
};

        SelectServer.cc:

#include <iostream>
#include <sys/select.h>
#include "Sock.hpp"int fdsArray[sizeof(fd_set) * 8] = {0}; // 保存历史上所有的合法fd
int gnum = sizeof(fdsArray) / sizeof(fdsArray[0]);#define DFL -1using namespace std;static void showArray(int arr[], int num)
{cout << "当前合法sock list# ";for (int i = 0; i < num; i++){if (arr[i] == DFL)continue;elsecout << arr[i] << " ";}cout << endl;
}static void usage(std::string process)
{cerr << "\nUsage: " << process << " port\n"<< endl;
}
// readfds: 现在包含就是已经就绪的sock
static void HandlerEvent(int listensock, fd_set &readfds)
{for (int i = 0; i < gnum; i++){if (fdsArray[i] == DFL)continue;if (i == 0 && fdsArray[i] == listensock){// 我们是如何得知哪些fd,上面的事件就绪呢?if (FD_ISSET(listensock, &readfds)){// 具有了一个新链接cout << "已经有一个新链接到来了,需要进行获取(读取/拷贝)了" << endl;string clientip;uint16_t clientport = 0;int sock = Sock::Accept(listensock, &clientip, &clientport); // 不会阻塞if (sock < 0)return;cout << "获取新连接成功: " << clientip << ":" << clientport << " | sock: " << sock << endl;// read/write -- 不能,因为你read不知道底层数据是否就绪!!select知道!// 想办法把新的fd托管给select?如何托管??int i = 0;for (; i < gnum; i++){if (fdsArray[i] == DFL)break;}if (i == gnum){cerr << "我的服务器已经到了最大的上限了,无法在承载更多同时保持的连接了" << endl;close(sock);}else{fdsArray[i] = sock; // 将sock添加到select中,进行进一步的监听就绪事件了!showArray(fdsArray, gnum);}}} // end if (i == 0 && fdsArray[i] == listensock)else{// 处理普通sock的IO事件!if(FD_ISSET(fdsArray[i], &readfds)){// 一定是一个合法的普通的IO类sock就绪了// read/recv读取即可// TODO bugchar buffer[1024];ssize_t s = recv(fdsArray[i], buffer, sizeof(buffer), 0); // 不会阻塞if(s > 0){buffer[s] = 0;cout << "client[" << fdsArray[i] << "]# " << buffer << endl; }else if(s == 0){cout << "client[" << fdsArray[i] << "] quit, server close " << fdsArray[i] << endl;close(fdsArray[i]);fdsArray[i] = DFL; // 去除对该文件描述符的select事件监听showArray(fdsArray, gnum);}else{cout << "client[" << fdsArray[i] << "] error, server close " << fdsArray[i] << endl;close(fdsArray[i]);fdsArray[i] = DFL; // 去除对该文件描述符的select事件监听showArray(fdsArray, gnum);}}}}
}// ./SelectServer 8080
// 只关心读事件
int main(int argc, char *argv[])
{if (argc != 2){usage(argv[0]);exit(1);}// 是一种类型,位图类型,能定义变量,那么就一定有大小,就一定有上限// fd_set fds; // fd_set是用位图表示多个fd的// cout << sizeof(fds) * 8 << endl;int listensock = Sock::Socket();Sock::Bind(listensock, atoi(argv[1]));Sock::Listen(listensock);for (int i = 0; i < gnum; i++)fdsArray[i] = DFL;fdsArray[0] = listensock;while (true){// 在每次进行select的时候进行我们的参数重新设定int maxFd = DFL;fd_set readfds;FD_ZERO(&readfds);for (int i = 0; i < gnum; i++){if (fdsArray[i] == DFL)continue;                  // 1. 过滤不合法的fdFD_SET(fdsArray[i], &readfds); // 2. 添加所有的合法的fd到readfds中,方便select统一进行就绪监听if (maxFd < fdsArray[i])maxFd = fdsArray[i]; // 3. 更新出最大值}struct timeval timeout = {100, 0};// 如何看待监听socket,获取新连接的,本质需要先三次握手,前提给我发送syn -> 建立连接的本质,其实也是IO,一个建立好的// 连接我们称之为:读事件就绪!listensocket 只(也)需要关心读事件就绪!// accept: 等 + "数据拷贝"// int sock = Sock::Accept(listensock, );// 编写多路转接代码的时候,必须先保证条件就绪了,才能调用IO类函数!int n = select(maxFd + 1, &readfds, nullptr, nullptr, &timeout);switch (n){case 0:cout << "time out ... : " << (unsigned long)time(nullptr) << endl;break;case -1:cerr << errno << " : " << strerror(errno) << endl;break;default:HandlerEvent(listensock, readfds);// 等待成功// 1. 刚启动的时候,只有一个fd,listensock// 2. server 运行的时候,sock才会慢慢变多// 3. select 使用位图,采用输出输出型参数的方式,来进行 内核<->用户 信息的传递, 每一次调用select,都需要对历史数据和sock进行重新设置!!!// 4. listensock,永远都要被设置进readfds中!// 5. select 就绪的时候,可能是listen 就绪,也可能是普通的IO sock就绪啦!!break;}}return 0;
}

        结果:

        select编码特征:

① select之前要进行所有参数的重置,之后,要遍历所有的合法fd进行事件检测

② select需要用户自己维护第三方数组,来保存所有的合法fd,方便select进行批量处理

③ 一旦特定的fd事件就绪,本次读取或者写入不会被阻塞

        select优缺点:

1.优点:

        ①占用资源少,并且高效(对比之前的多进程、多线程)

2.缺点

        ① 每一次都要进行大量的重置工作,效率比较低

        ② 每一次能够坚持的fd数量是有上限的

        ③ 每一次都需要内核到用户,用户到内核传递位置参数,出现较为大量的数据拷贝工作

        ④ select编码特别不方便,需要用户自己维护数组

        ⑤ select底层需要同步遍历的方式,检测所有需要检测的fd(传入最大maxfd + 1)

五.多路转接之poll

        poll函数:

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

struct pollfd {
        int fd;               /* file descriptor */
        short events;   /* requested events */
        short revents;  /* returned events */
};

        第一个参数fds是struct pollfd*(指针类型,可以当作一个数组来看)类型的,这个结构体里包括所要等的文件描述符fd,请求事件events(用户告诉内核),请求的哪些事件就绪revent(内核告诉用户). 针对事件,poll将事件进行了拆分,输入和输出进行了分离. 因此,传入的相当于一个数组,数组中的每一个元素告诉内核一个文件描述符fd的哪一个事件要关心.(每一个fd,都可能有不同的事件要关心,或者发生).。

        第二个参数nfds表示fds数组的长度。

        第三个参数timeout表示poll函数的超时事件,单位是ms(timeout是输入型参数、单位是ms,select的timeout是输入输出型,单位是s).timeout设置为1000如果没有链接到来是每1s检测一次,timeout为0是一直在检测,是非阻塞式的检测,timeout为-1则不检测,代表的是永久阻塞。

events和revents的取值:、

         这里重点是POLLIN和POLLOUT,一个是读一个是写,这些都是宏,如果想要读和写当然是可以用 异或| 来连接的。

        总结一下:可以用一个结构体来表示一个文件描述符所对应的events(用户告诉内核)、revents(内核告诉用户)相关的事件,用指针(代表数组)可以让poll关心多个文件描述符,每一个文件描述符的输入输出事件都可以使用不同的位图来表征,所以select上的参数都可以使用poll来取代了。select需要一个第三方数组,而poll则不需要了,poll可以把这个结构体数组作为全局数组,就可以让poll进行事件监听,又可以让新链接把文件描述符加进来。如果将来有一个文件描述符不关心事件了,就可以将event清空,并且将fd设置为-1,poll在底层就自动不会去关心了。

使用一下poll:

        PollServe:

#include <iostream>
#include <poll.h>
#include "Sock.hpp"#define NUM 1024
struct pollfd fdsArray[NUM] = {0};// 保存历史上所有的合法fd#define DFL -1using namespace std;static void showArray(struct pollfd arr[], int num)
{cout << "当前合法sock list# ";for (int i = 0; i < num; i++){if (arr[i].fd == DFL)continue;elsecout << arr[i].fd << " ";}cout << endl;
}static void usage(std::string process)
{cerr << "\nUsage: " << process << " port\n"<< endl;
}
// readfds: 现在包含就是已经就绪的sock
static void HandlerEvent(int listensock)
{for (int i = 0; i < NUM; i++){if (fdsArray[i].fd == DFL)continue;if (i == 0 && fdsArray[i].fd == listensock){// 我们是如何得知哪些fd,上面的事件就绪呢?if (fdsArray[i].revents & POLLIN){// 具有了一个新链接cout << "已经有一个新链接到来了,需要进行获取(读取/拷贝)了" << endl;string clientip;uint16_t clientport = 0;int sock = Sock::Accept(listensock, &clientip, &clientport); // 不会阻塞if (sock < 0)return;cout << "获取新连接成功: " << clientip << ":" << clientport << " | sock: " << sock << endl;// read/write -- 不能,因为你read不知道底层数据是否就绪!!select知道!// 想办法把新的fd托管给select?如何托管??int i = 0;for (; i < NUM; i++){if (fdsArray[i].fd == DFL)break;}if (i == NUM){cerr << "我的服务器已经到了最大的上限了,无法在承载更多同时保持的连接了" << endl;close(sock);}else{fdsArray[i].fd = sock; // 将sock添加到select中,进行进一步的监听就绪事件了!fdsArray[i].events = POLLIN;fdsArray[i].revents = 0;showArray(fdsArray, NUM);}}} // end if (i == 0 && fdsArray[i] == listensock)else{// 处理普通sock的IO事件!if(fdsArray[i].revents & POLLIN){// 一定是一个合法的普通的IO类sock就绪了// read/recv读取即可// TODO bugchar buffer[1024];ssize_t s = recv(fdsArray[i].fd, buffer, sizeof(buffer), 0); // 不会阻塞if(s > 0){buffer[s] = 0;cout << "client[" << fdsArray[i].fd << "]# " << buffer << endl; }else if(s == 0){cout << "client[" << fdsArray[i].fd << "] quit, server close " << fdsArray[i].fd << endl;close(fdsArray[i].fd);fdsArray[i].fd = DFL; // 去除对该文件描述符的select事件监听fdsArray[i].events = 0;fdsArray[i].revents = 0;showArray(fdsArray, NUM);}else{cout << "client[" << fdsArray[i].fd << "] error, server close " << fdsArray[i].fd << endl;close(fdsArray[i].fd);fdsArray[i].fd = DFL; // 去除对该文件描述符的select事件监听fdsArray[i].events = 0;fdsArray[i].revents = 0;showArray(fdsArray, NUM);}}}}
}// ./SelectServer 8080
// 只关心读事件
int main(int argc, char *argv[])
{if (argc != 2){usage(argv[0]);exit(1);}// 是一种类型,位图类型,能定义变量,那么就一定有大小,就一定有上限// fd_set fds; // fd_set是用位图表示多个fd的// cout << sizeof(fds) * 8 << endl;int listensock = Sock::Socket();Sock::Bind(listensock, atoi(argv[1]));Sock::Listen(listensock);for (int i = 0; i < NUM; i++){fdsArray[i].fd = DFL;fdsArray[i].events = 0;fdsArray[i].revents = 0;}fdsArray[0].fd = listensock;fdsArray[0].events = POLLIN;int timeout = -1;while (true){int n = poll(fdsArray, NUM, timeout);switch (n){case 0:cout << "time out ... : " << (unsigned long)time(nullptr) << endl;break;case -1:cerr << errno << " : " << strerror(errno) << endl;break;default:HandlerEvent(listensock);// 等待成功// 1. 刚启动的时候,只有一个fd,listensock// 2. server 运行的时候,sock才会慢慢变多// 3. select 使用位图,采用输出输出型参数的方式,来进行 内核<->用户 信息的传递, 每一次调用select,都需要对历史数据和sock进行重新设置!!!// 4. listensock,永远都要被设置进readfds中!// 5. select 就绪的时候,可能是listen 就绪,也可能是普通的IO sock就绪啦!!break;}}return 0;
}

        Sock.hpp:

#pragma once#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>
#include <cerrno>
#include <cassert>class Sock
{
public:static const int gbacklog = 20;static int Socket(){int listenSock = socket(PF_INET, SOCK_STREAM, 0);if (listenSock < 0){exit(1);}int opt = 1;setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));return listenSock;}static void Bind(int socket, uint16_t port){struct sockaddr_in local; // 用户栈memset(&local, 0, sizeof local);local.sin_family = PF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;// 2.2 本地socket信息,写入sock_对应的内核区域if (bind(socket, (const struct sockaddr *)&local, sizeof local) < 0){exit(2);}}static void Listen(int socket){if (listen(socket, gbacklog) < 0){exit(3);}}static int Accept(int socket, std::string *clientip, uint16_t *clientport){struct sockaddr_in peer;socklen_t len = sizeof(peer);int serviceSock = accept(socket, (struct sockaddr *)&peer, &len);if (serviceSock < 0){// 获取链接失败return -1;}if(clientport) *clientport = ntohs(peer.sin_port);if(clientip) *clientip = inet_ntoa(peer.sin_addr);return serviceSock;}
};

        测试结果:

        poll去掉了select的一些缺点,优点更多了.

优点:

        ① poll的输入和输出进行了分离,pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比select更方便

        ② poll没有了最大数量限制 (但是数量过大后性能也是会下降).

缺点:

        ① 和select函数一样, poll返回后,需要轮询pollfd来获取就绪的描述符

        ② 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中

        ③ 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降

        注意:不光光是网络sock,本地的文件描述符也可以托管给多路转接,因此文件操作、管道这些技术,也是可以无缝对接到多路转接的。

六.多路转接之epoll

        学了select和poll,目前多路转接还存在这一些问题:

① select、poll都是基于多个fd进行遍历检测,来识别事件,链接多的时候,一定会引起遍历周期的增加。

② 对应事件(用户告诉内核,内核通知用户)需要使用的数据结构(数组),需要由程序员自己维护。

        因此,为了解决这些问题,epoll出现了。虽然epoll和poll的名称很像,但是实际上是完全不同的。

        epoll几乎解决了多路转接方案的所有缺点,具备之前所说的一切优点,是现在性能最好的多路IO就绪通知方法。

1.epoll接口

        不同与上面两个,epoll有3个接口。但是无论有多少个接口,核心工作都是:只负责等(包括:① 用户告诉内核 ② 内核告诉用户)。

(1)epoll_create

int epoll_create(int size);

        size参数不重要,现在一般设置为特定的一个值即可,128、256或者512等。

        重点是返回值,如果成功就返回一个文件描述符。会创建出一个epoll模型。  

(2)epoll_ctl

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

 完成用户告诉内核的工作。

        第一个参数:输入创建成功epoll模型之后,所返回的文件描述符

        第二个参数:要操作的方式(增加ADD,修改MOD,删除DEL )

        第三个参数:需要监听的fd

        第四个参数:用户告诉内核要监听的事件

总结:对epoll模型中特定的文件描述符所要关心的事件进行操作(增加、修改、删除)。

第二个参数的取值:

EPOLL_CTL_ADD :注册新的fd到epfd中;
EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
EPOLL_CTL_DEL :从epfd中删除一个fd

第四个参数的类型struct epoll_event:

struct epoll_event{

        uint32_t events;       /* Epoll events */

        epoll_data_t data;   /* User data variable */

}

        调用epoll_ctl的时候,向epoll模型中添加对应的文件描述符及其关心事件时,这个events代表的是用户告诉内核,当在epoll内返回的时候,拿到的事件是内核告诉用户。在接口上做了分离。

typedef union epoll_data

{

        void *ptr;

        int fd;

        uint32_t u32;

        uint64_t u64;
}epoll_data_t;

(3)epoll_wait

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

完成内核告诉用户的工作

        第一个参数:输入创建成功epoll模型之后,所返回的文件描述符

        第二个参数:输出型参数,请求的哪些事件就绪(内核告诉用户)

        第三个参数:告知这个events有多大

        第四个参数:与poll中的timeout完全相同

        从epoll模型中提取events,将已经就绪的事件返回。

2.工作原理

OS如何知道网络中的数据到来了呢?

        网卡先得到数据,然后会向CPU发送硬件中断,调用OS预设的中断函数,负责从外设进行数据拷贝,从外设拷贝到内核缓冲区中。

如何理解数据在硬件上流动?

        电信号的流动,高电压流到低电压,可以把高电压当作1,低电压当作0。

epoll模型:

        epoll存在一个回调机制,会针对特定的一个或者多个fd,设定对应的回调机制:当fd缓冲区中有数据的时候,进行回调。

        只要调用了这个函数(拷贝函数),底层有数据到来的时候,fd就不需要再轮询检测底层是否有数据,只要设定好回调,就会自动通知。

        回调函数作用:

① 获取就绪的fd

② 获取就绪的事件是什么

③ 构建queue_node节点

④ 将节点链入就绪队

        创建epoll模型的时候会创建一颗红黑树,是一个空树,节点字段:

用户向关心哪一个文件描述上的什么事件就由这个字段决定。

struct rb_struct

{

        int fd;          // 哪一个文件描述符由fd决定

        int events;  // 什么事件由events决定(events是位图结构,与poll中的一样)
}

        因此这个字段是为了解决用户告诉内核的问题。

        调用epoll_ctl这个函数本质就是在这个红黑树中新插入对应的节点。OS维护了这个红黑树,那么当用户历史上注册过很多的文件描述符和事件,OS就可以知道用户关心哪个文件描述符的哪些事件。

        key: fd、sockfd

        创建epoll模型还会创建一个队列结构,是一个空队列,节点字段:

内核中,在红黑树中,哪些fd上的哪些事件已经就绪由这个字段决定。

struct queue_node

{

        int fd;

        int revents;

}

        因此这个字段是为了解决内核告诉用户的问题

所以上面的三个函数的作用分别是:

        epoll_create: 创建整个epoll模型:建立好回调机制,创建一颗空的红黑树,创建一个就绪队列

        epoll_ctl: 操作红黑树:向红黑树中新增节点、查找红黑树节点找到后修改、删除红黑树中节点(这个红黑树等价于poll中的数组)

        epoll_wait: 检测就绪队列:检测就绪队列是否为空,不为空就直接从就绪队列中获取节点

        一旦建立epoll模型,本质上,哪些fd上的哪些event就绪,整个过程用户是不用关心的,都是OS自动做的。

那么epoll为什么高效呢?

        ① 采用红黑树来做管理(epoll_ctl的数据结构是红黑树,针对的场景是用户告诉内核的问题,红黑树节点是没有上限的,所以插入的文件描述符也是没有上限的。红黑树的管理成本非常低)

        ② 采用回调机制,彻底解放OS,不需要OS主动轮询,大大提高了检测效率(OS在拷贝函数时,只要有数据,就通过回调的方式获取对应的就绪文件描述符、就绪事件,构建就绪队列,链入到就绪队列,不需要OS在底层遍历每一个文件描述符了,只会为就绪的节点提供服务,未就绪的节点不需要管,就不再需要O(N)的时间遍历了)

        ③ epoll_wait调用的时候不再需要进行遍历了,直接在就绪队列中取节点即可。(就绪队列有数据时,就会可能告诉用户有就绪的数据了。有人拿节点。有人放节点,这就是生产者消费者模型,但是是不需要加锁了,epoll的代码考虑了线程安全的问题,epoll底层是多线程、多进程安全的)

实现一个简易epoll代码:

        EpollServer.hpp:

#pragma once#include <iostream>
#include <string>
#include <cstdlib>
#include <functional>
#include <cassert>
#include <sys/epoll.h>#include "Sock.hpp"
#include "Log.hpp"using namespace std;class EpollServer
{
public:static const int gsize = 128;static const int num = 256;using func_t = function<int(int)>;public:EpollServer(uint16_t port, func_t func) : port_(port), listensock_(-1), epfd_(-1), func_(func){}void InitEpollServer(){listensock_ = Sock::Socket();Sock::Bind(listensock_, port_);Sock::Listen(listensock_);// 这里直接使用原生接口epfd_ = epoll_create(gsize);if (epfd_ < 0){logMessage(FATAL, "%d:%s", errno, strerror(errno));exit(3);}logMessage(DEBUG, "创建监听套接字成功: %d", listensock_);logMessage(DEBUG, "创建epoll成功: %d", epfd_);}void HandlerEvents(struct epoll_event revs[], int n){for (int i = 0; i < n; i++){int sock = revs[i].data.fd;uint32_t revent = revs[i].events;if (revent & EPOLLIN) // 读事件就绪{if (sock == listensock_){string clientip;uint16_t clientport = 0;// 监听socket就绪, 获取新链接int sockfd = Sock::Accept(listensock_, &clientip, &clientport);if (sockfd < 0){logMessage(FATAL, "%d:%s", errno, strerror(errno));continue;}// 托管给epollstruct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = sockfd;int n = epoll_ctl(epfd_, EPOLL_CTL_ADD, sockfd, &ev);assert(n == 0);(void)n;}else{// 普通socket就绪,进行数据INPUT// bugint n = func_(sock);if (n == 0 || n < 0){// 先移除,在关闭int x = epoll_ctl(epfd_, EPOLL_CTL_DEL, sock, nullptr);assert(x == 0);(void)x;logMessage(DEBUG, "client quit: %d", sock);close(sock);}}}else{}}}void Run(){// 1. 先添加listensock_struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = listensock_;int n = epoll_ctl(epfd_, EPOLL_CTL_ADD, listensock_, &ev);assert(n == 0);(void)n;struct epoll_event revs[num];int timeout = 10000;while (true){// 关于n:就绪的fd的个数,只需要进行将底层的就绪队列中的节点,依次从0下标放入revs中即可!int n = epoll_wait(epfd_, revs, num, timeout);switch (n){case 0:cout << "time out ... : " << (unsigned long)time(nullptr) << endl;break;case -1:cerr << errno << " : " << strerror(errno) << endl;break;default:HandlerEvents(revs, n);break;}}}~EpollServer(){if (listensock_ != -1)close(listensock_);if (epfd_ != -1)close(epfd_);}private:int listensock_;int epfd_;uint16_t port_;func_t func_;
};

        main.cc:

#include "EpollServer.hpp"
#include <memory>static void usage(std::string process)
{cerr << "\nUsage: " << process << " port\n"<< endl;
}int myfunc(int sock)
{//bugchar buffer[1024];ssize_t s = recv(sock, buffer, sizeof(buffer)-1, 0); //不会被阻塞if(s > 0){buffer[s] = 0;logMessage(DEBUG, "client[%d]# %s", sock, buffer);}return s;
}int main(int argc, char *argv[])
{if (argc != 2){usage(argv[0]);exit(0);}unique_ptr<EpollServer> epollserver(new EpollServer(atoi(argv[1]), myfunc));epollserver->InitEpollServer();epollserver->Run();return 0;
}

        Sock.hpp:

#pragma once#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>
#include <cerrno>
#include <cassert>class Sock
{
public:static const int gbacklog = 20;static int Socket(){int listenSock = socket(PF_INET, SOCK_STREAM, 0);if (listenSock < 0){exit(1);}int opt = 1;setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));return listenSock;}static void Bind(int socket, uint16_t port){struct sockaddr_in local; // 用户栈memset(&local, 0, sizeof local);local.sin_family = PF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;// 2.2 本地socket信息,写入sock_对应的内核区域if (bind(socket, (const struct sockaddr *)&local, sizeof local) < 0){exit(2);}}static void Listen(int socket){if (listen(socket, gbacklog) < 0){exit(3);}}static int Accept(int socket, std::string *clientip, uint16_t *clientport){struct sockaddr_in peer;socklen_t len = sizeof(peer);int serviceSock = accept(socket, (struct sockaddr *)&peer, &len);if (serviceSock < 0){// 获取链接失败return -1;}if(clientport) *clientport = ntohs(peer.sin_port);if(clientip) *clientip = inet_ntoa(peer.sin_addr);return serviceSock;}
};

        Log.hpp:

#pragma once#include <cstdio>
#include <unistd.h>
#include <ctime>
#include <cstdarg>
#include <cassert>
#include <cassert>
#include <cstring>
#include <cerrno>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>#define DEBUG 0
#define NOTICE 1
#define WARINING 2
#define FATAL 3const char *log_level[] = {"DEBUG", "NOTICE", "WARINING", "FATAL"};#define LOGFILE "serverTcp.log"class Log
{
public:Log():logFd(-1){}void enable(){umask(0);logFd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666);assert(logFd != -1);dup2(logFd, 1);dup2(logFd, 2);}~Log(){if(logFd != -1) {fsync(logFd);close(logFd);}}
private:int logFd;
};// logMessage(DEBUG, "%d", 10);
void logMessage(int level, const char *format, ...)
{assert(level >= DEBUG);assert(level <= FATAL);char *name = getenv("USER");char logInfo[1024];va_list ap; // ap -> char*va_start(ap, format);vsnprintf(logInfo, sizeof(logInfo) - 1, format, ap);va_end(ap); // ap = NULL// 每次打开太麻烦// umask(0);// int fd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666);// assert(fd >= 0);FILE *out = (level == FATAL) ? stderr : stdout;fprintf(out, "%s | %u | %s | %s\n",log_level[level],(unsigned int)time(nullptr),name == nullptr ? "unknow" : name,logInfo);fflush(out); // 将C缓冲区中的数据刷新到OSfsync(fileno(out));   // 将OS中的数据尽快刷盘// close(fd);// char *s = format;// while(s){//     case '%'://         if(*(s+1) == 'd')  int x = va_arg(ap, int);//     break;// }
}

        结果:

        定义的struct epoll_event是我们在用户空间中分配好的内存,一定是需要将内核的数
据拷贝到这个用户空间的内存中的。

3.工作方式

        epoll有2种工作方式:① 水平触发(LT) 边缘触发(ET)

水平触发(LT):如果接收缓冲区里有数据,OS就会一直发消息通知你底层有数据

边缘触发(ET):如果接收缓冲区里有数据,OS通知你一次你没管,OS就不会再通知了

        LT:只要底层有数据,就会一直通知你,这是多路转接的默认模式。程序员在上层编码的时候,可以暂时不把数据读取完毕,不用担心底层不通知你进而导致的数据丢失的问题。

        ET:只有底层在从无到有、从有到多变化的时候,才会通知你。这需要程序员一旦受到通知,就必须将自己收到的数据从内核种全部读取完成,否则可能会有数据丢失的风险。

        一般来说是ET更高效,ET通知的量最小,拷贝的数据量也是最小的。本质是让上层尽快取走数据的一种机制,有更大的窗口大小。让对方不用对于滑动窗口,流量控制做太多的控制,可以尽快将数据交付,并且对方的延迟应答等其它提高效率的策略能够在TCP层面上体现出来,进而提高效率。

我怎么知道数据被取完了呢?

        read、recv不经过读取,是无法得知的,只有读取了之后才能知道是否取完了。

while(true) read(); (errno -> break)

        在网络中,读取的所有文件描述符默认是阻塞的,阻塞的特点是有数据就直接常规返回,没有数据就调用read,此时服务器会因为没有数据而被阻塞,所以因为必须要循环读,直到读取出错或者全部读取完再读一次才能知道没有数据了。那么这最后一次读取,一定会导致read/recv阻塞住。

        因此ET模式下,所有的fd、sock必须处于非阻塞模式。LT模式设阻塞模式不会受影响,但是也建议设置为非阻塞模式。

        LT工作模式:

 epoll默认状态下就是LT工作模式

        当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分.
        如果只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait. 仍然会立刻返回并通知socket读事件就绪.
        直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
        支持阻塞读写和非阻塞读写

        ET工作模式:

 如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式

        当epoll检测到socket上事件就绪时, 必须立刻处理.
        如果只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,epoll_wait 不会再返回了.
        也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会.
        ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epoll.
        只支持非阻塞的读写

        select和poll其实也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET

        LT是 epoll 的默认行为. 使用 ET 能够减少 epoll 触发的次数. 但是代价就是强逼着程序员一次响应就绪过程中就把所有的数据都处理完.

        相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的.

        另一方面, ET 的代码复杂程度更高了.

如何写入?

        读写事件就绪

读:底层有数据 -> recv -> 底层没有数据,不会读取

写:底层有空间 -> send -> 底层如果没有空间,不应该再写入

        默认只设置了让epoll帮我们关心读事件,没有关心写事件

        为什么没有关心写事件?因为最开始的时候,写空间一定是就绪的

        运行中可能会存在条件不满足 -> 写空间被写满了

写:

        ① 如果LT模式,一定是要先检测有没有对应的空间(先打开对写事件的关心,epoll会自动进行事件派发),然后才写入。LT模式下,只要打开了写入,下面写的代码就会自动进行调用sender方法,进行发送

        ② 如果是ET模式,也可以采用上面的方法,不过,一般用ET追求高效,直接发送,通过发送是否全部发送完成,来决定是否要进行打开对写事件进行关心

        a. 先发送,发完就完了

        b. 先发送,如果没有发完,打开写事件关心,让epoll自动关闭对写事件的关心

注意:一般写事件关心,不能常打开,一定是在需要的时候,再进行打开,不需要就要关闭对写事件的关心

        当我们开启对写事件关心的时候,首次或者每次打开都会自动触发一次epoll事件就绪

Reactor:反应堆

        基于多路转接方案,当事件就绪的时候,采用回调方式进行业务处理的模式。

        下面实现的是:

        单进程:半异步半同步(Linux服务最常用的)【既负责事件派发,又负责IO(又负责业务逻辑的处理)】

还有一种 Proactor:前摄模式

        只负责事件派发,就绪的事件会推送给后端的进程、线程池等,不关心IO(业务逻辑处理)【其它平台可能会出现的模式】

        下面实现一个 基于ET模式下的epoll服务器:

 Epoller.hpp:

#pragma once
#include <iostream>
#include <cerrno>
#include <cstdlib>
#include <unistd.h>
#include <sys/epoll.h>
#include "Log.hpp"class Epoller
{
public:static const int gsize = 128;
public:static int CreateEpoller(){int epfd = epoll_create(gsize);if (epfd < 0){logMessage(FATAL, "epoll_create : %d : %s", errno, strerror(errno));exit(3);}return epfd;}static bool AddEvent(int epfd, int sock, uint32_t event){struct epoll_event ev;ev.events = event;ev.data.fd = sock;int n = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);return n == 0;}static bool ModEvent(int epfd, int sock, uint32_t event){struct epoll_event ev;ev.events = event;ev.data.fd = sock;int n = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev);return n == 0;}static bool DelEvent(int epfd, int sock){int n = epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);return n == 0;}static int LoopOnce(int epfd, struct epoll_event revs[], int num){int n = epoll_wait(epfd, revs, num, -1);if(n == -1){logMessage(FATAL, "epoll_wait : %d : %s", errno, strerror(errno));}return n;}
};

Log.hpp:

#pragma once#include <cstdio>
#include <ctime>
#include <cstdarg>
#include <cassert>
#include <cassert>
#include <cstring>
#include <cerrno>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>#define DEBUG 0
#define NOTICE 1
#define WARNING 2
#define FATAL 3const char *log_level[] = {"DEBUG", "NOTICE", "WARINING", "FATAL"};#define LOGFILE "serverTcp.log"class Log
{
public:Log():logFd(-1){}void enable(){umask(0);logFd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666);assert(logFd != -1);dup2(logFd, 1);dup2(logFd, 2);}~Log(){if(logFd != -1) {fsync(logFd);close(logFd);}}
private:int logFd;
};// logMessage(DEBUG, "%d", 10);
void logMessage(int level, const char *format, ...)
{assert(level >= DEBUG);assert(level <= FATAL);char *name = getenv("USER");char logInfo[1024];va_list ap; // ap -> char*va_start(ap, format);vsnprintf(logInfo, sizeof(logInfo) - 1, format, ap);va_end(ap); // ap = NULL// 每次打开太麻烦// umask(0);// int fd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666);// assert(fd >= 0);FILE *out = (level == FATAL) ? stderr : stdout;fprintf(out, "%s | %u | %s | %s\n",log_level[level],(unsigned int)time(nullptr),name == nullptr ? "unknow" : name,logInfo);fflush(out); // 将C缓冲区中的数据刷新到OSfsync(fileno(out));   // 将OS中的数据尽快刷盘// close(fd);// char *s = format;// while(s){//     case '%'://         if(*(s+1) == 'd')  int x = va_arg(ap, int);//     break;// }
}

Makefile:

server:main.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f server

Protocol.hpp:

#pragma once#include <iostream>
#include <vector>
#include <cstring>
#include <string>
#include <cstdio>#define SEP 'X'
#define SEP_LEN sizeof(SEP)#define CRLF "\r\n"
#define CRLF_LEN strlen(CRLF) // 坑:sizeof(CRLF)
#define SPACE " "
#define SPACE_LEN strlen(SPACE)// bbbXcc
void PackageSplit(std::string &inbuffer, std::vector<std::string> *result)
{while (true){std::size_t pos = inbuffer.find(SEP);if (pos == std::string::npos)break;result->push_back(inbuffer.substr(0, pos));inbuffer.erase(0, pos + SEP_LEN);}
}struct Request
{int x;int y;char op;
};struct Response
{int code;int result;
};bool Parser(std::string &in, Request *req)
{// 1 + 1, 2 * 4, 5 * 9, 6 *1std::size_t spaceOne = in.find(SPACE);if (std::string::npos == spaceOne)return false;std::size_t spaceTwo = in.rfind(SPACE);if (std::string::npos == spaceTwo)return false;std::string dataOne = in.substr(0, spaceOne);std::string dataTwo = in.substr(spaceTwo + SPACE_LEN);std::string oper = in.substr(spaceOne + SPACE_LEN, spaceTwo - (spaceOne + SPACE_LEN));if (oper.size() != 1)return false;// 转成内部成员req->x = atoi(dataOne.c_str());req->y = atoi(dataTwo.c_str());req->op = oper[0];return true;
}void Serialize(const Response &resp, std::string *out)
{// "exitCode_ result_"std::string ec = std::to_string(resp.code);std::string res = std::to_string(resp.result);*out = ec;*out += SPACE;*out += res;*out += CRLF;
}

Service.hpp:

#pragma once#include "Protocol.hpp"
#include <functional>using service_t = std::function<Response (const Request &req)>;static Response calculator(const Request &req)
{Response resp = {0, 0};switch (req.op){case '+':resp.result = req.x + req.y;break;case '-':resp.result = req.x - req.y;break;case '*':resp.result = req.x * req.y;break;case '/':{ // x_ / y_if (req.y == 0)resp.code = -1; // -1. 除0elseresp.result = req.x / req.y;}break;case '%':{ // x_ / y_if (req.y == 0)resp.code = -2; // -2. 模0elseresp.result = req.x % req.y;}break;default:resp.code = -3; // -3: 非法操作符break;}return resp;
}

Sock.hpp:

#pragma once#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>
#include <cerrno>
#include <cassert>class Sock
{
public:static const int gbacklog = 20;static int Socket(){int listenSock = socket(PF_INET, SOCK_STREAM, 0);if (listenSock < 0){exit(1);}int opt = 1;setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));return listenSock;}static void Bind(int socket, uint16_t port){struct sockaddr_in local; // 用户栈memset(&local, 0, sizeof local);local.sin_family = PF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;// 2.2 本地socket信息,写入sock_对应的内核区域if (bind(socket, (const struct sockaddr *)&local, sizeof local) < 0){exit(2);}}static void Listen(int socket){if (listen(socket, gbacklog) < 0){exit(3);}}static int Accept(int socket, std::string *clientip, uint16_t *clientport){struct sockaddr_in peer;socklen_t len = sizeof(peer);int serviceSock = accept(socket, (struct sockaddr *)&peer, &len);if (serviceSock < 0){// 获取链接失败return -1;}if(clientport) *clientport = ntohs(peer.sin_port);if(clientip) *clientip = inet_ntoa(peer.sin_addr);return serviceSock;}
};

TcpServer.hpp:

#pragma once#include <iostream>
#include <string>
#include <vector>
#include <cerrno>
#include <unordered_map>
#include <functional>
#include "Sock.hpp"
#include "Epoller.hpp"
#include "Log.hpp"
#include "Util.hpp"
#include "Protocol.hpp"// 基于Reactor模式,编写一个充分读取和写入的,EPOLL(ET)的Serverclass Connection;
class TcpServer;using func_t = std::function<int(Connection *)>;
using callback_t = std::function<int(Connection *, std::string &)>;// event
class Connection
{
public:// 文件描述符int sock_;TcpServer *R_;// 自己的接受和发送缓冲区std::string inbuffer_;std::string outbuffer_;// 回调函数func_t recver_; func_t sender_;func_t excepter_;// 有效报文的集合// std::vector<std::string> requests;// status// int status;public:Connection(int sock, TcpServer *r) : sock_(sock), R_(r){}void SetRecver(func_t recver) { recver_ = recver; }void SetSender(func_t sender) { sender_ = sender; }void SetExcepter(func_t excepter) { excepter_ = excepter; }~Connection() {}
};//Reactor
//单进程:半异步半同步 -- Reactor -- Linux服务最常用 -- 几乎没有之一
// Reactor(tcp)服务器, 即负责事件派发, 又负责IO [又负责业务逻辑的处理]// Proactor: 前摄模式 -- 其他平台可能出现的模式
// 只负责负责事件派发,就绪的事件推送给后端的进程、线程池, 不关心 负责IO [又负责业务逻辑的处理]class TcpServer
{
public:TcpServer(callback_t cb, int port = 8080) : cb_(cb){revs_ = new struct epoll_event[revs_num];// 网络功能listensock_ = Sock::Socket();Util::SetNonBlock(listensock_);Sock::Bind(listensock_, port);Sock::Listen(listensock_);// 多路转接epfd_ = Epoller::CreateEpoller();// 添加listensock匹配的connectionAddConnection(listensock_, EPOLLIN | EPOLLET,std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);// // 添加listensock到epoll// Epoller::AddEvent(epfd_, listensock_, EPOLLIN | EPOLLET);// // 将listensock匹配的Connection也添加到当前的unordered_map中// Connection *conn = new Connection(listensock_, this);// conn->SetRecver(std::bind(&TcpServer::Accepter, this, std::placeholders::_1));// connections_.insert(std::make_pair(listensock_, conn));}void AddConnection(int sockfd, uint32_t event, func_t recver, func_t sender, func_t excepter){if (event & EPOLLET)Util::SetNonBlock(sockfd);// 添加sockfd到epollEpoller::AddEvent(epfd_, sockfd, event);// 将sockfd匹配的Connection也添加到当前的unordered_map中Connection *conn = new Connection(sockfd, this);conn->SetRecver(recver);conn->SetSender(sender);conn->SetExcepter(excepter);// conn->SetRecver(std::bind(&TcpServer::TcpRecver, this, std::placeholders::_1));// conn->SetSender(std::bind(&TcpServer::TcpSender, this, std::placeholders::_1));// conn->SetExcepter(std::bind(&TcpServer::TcpExcepter, this, std::placeholders::_1));connections_.insert(std::make_pair(sockfd, conn));logMessage(DEBUG, "添加新链接到connections成功: %d", sockfd);}int Accepter(Connection *conn){// demo - listensock 也是工作在ET,来一个连接,对应就有事件就绪,那么如何来一批呢?while (true){std::string clientip;uint16_t clientport = 0;int sockfd = Sock::Accept(conn->sock_, &clientip, &clientport);if (sockfd < 0){if (errno == EINTR)continue;else if (errno == EAGAIN || errno == EWOULDBLOCK)break;else{logMessage(WARNING, "accept error");return -1;}}logMessage(DEBUG, "get a new link: %d", sockfd);// 注意:默认我们只设置了让epoll帮我们关心读事件,没有关心写事件// 为什么没有关注写事件:因为最开始的时候,写空间一定是就绪的!// 运行中可能会存在条件不满足 -- 写空间被写满了AddConnection(sockfd, EPOLLIN | EPOLLET,std::bind(&TcpServer::TcpRecver, this, std::placeholders::_1),std::bind(&TcpServer::TcpSender, this, std::placeholders::_1),std::bind(&TcpServer::TcpExcepter, this, std::placeholders::_1));}return 0;}int TcpRecver(Connection *conn){// XXXXXXX\3XXXXXX\3while (true){char buffer[1024];ssize_t s = recv(conn->sock_, buffer, sizeof(buffer) - 1, 0);if (s > 0){buffer[s] = 0;conn->inbuffer_ += buffer;}else if (s == 0){logMessage(DEBUG, "client quit");conn->excepter_(conn);break;}else{if (errno == EINTR)continue;else if (errno == EAGAIN || errno == EWOULDBLOCK)break;else{// 出错了logMessage(DEBUG, "recv error: %d:%s", errno, strerror(errno));conn->excepter_(conn);break;}}}// 将本轮全部读取完毕std::vector<std::string> result;PackageSplit(conn->inbuffer_, &result);for (auto &message : result){cb_(conn, message);}return 0;}int TcpSender(Connection *conn){while(true){ssize_t n = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);if(n > 0){// 去除已经成功发送的数据conn->outbuffer_.erase(0, n);}else{if(errno == EINTR) continue;else if(errno == EAGAIN || errno == EWOULDBLOCK) break; //发完了,一定是outbuffer清空了吗?不一定(EPOLLOUT打开)else{conn->excepter_(conn);logMessage(DEBUG, "send error: %d:%s", errno, strerror(errno));break;}}}// if(conn->outbuffer_.empty()) EnableReadWrite(conn->sock_, true, false);// else EnableReadWrite(conn->sock_, true, true);return 0;}int TcpExcepter(Connection *conn){// 0.if(!IsExists(conn->sock_)) return -1;// 所有的服务器异常,都会被归类到这里// 坑:一定要先从epoll中移除,然后再关闭fd// 1.Epoller::DelEvent(epfd_, conn->sock_);logMessage(DEBUG, "remove epoll event!");// 2.close(conn->sock_);logMessage(DEBUG, "close fd: %d", conn->sock_);// 3. delete conn;delete connections_[conn->sock_];logMessage(DEBUG, "delete connection object done");// 4. connections_.erase(conn->sock_);logMessage(DEBUG, "erase connection from connections");return 0;}bool IsExists(int sock){auto iter = connections_.find(sock);if (iter == connections_.end())return false;elsereturn true;}// 打开或者关闭对于特定socket是否要关心读或者写//EnableReadWrite(sock, true, false);//EnableReadWrite(sock, true, true);void EnableReadWrite(int sock, bool readable, bool writeable){uint32_t event = 0;event |= (readable ? EPOLLIN : 0);event |= (writeable ? EPOLLOUT : 0);Epoller::ModEvent(epfd_, sock, event);}// 根据就绪事件,将事件进行事件派发void Dispatcher(){int n = Epoller::LoopOnce(epfd_, revs_, revs_num);for (int i = 0; i < n; i++){int sock = revs_[i].data.fd;uint32_t revent = revs_[i].events;if(revent & EPOLLHUP) revent |= (EPOLLIN|EPOLLOUT);if(revent & EPOLLERR) revent |= (EPOLLIN|EPOLLOUT);if (revent & EPOLLIN){if (IsExists(sock) && connections_[sock]->recver_)connections_[sock]->recver_(connections_[sock]);}if (revent & EPOLLOUT){if (IsExists(sock) && connections_[sock]->sender_)connections_[sock]->sender_(connections_[sock]);}}}void Run(){while (true){Dispatcher();}}~TcpServer(){if (listensock_ != -1)close(listensock_);if (epfd_ != -1)close(epfd_);delete[] revs_;}private:static const int revs_num = 64;// 1. 网络socketint listensock_;// 2. epollint epfd_;// 3. 将epoll和上层代码进行结合std::unordered_map<int, Connection *> connections_;// 4. 就绪事件列表struct epoll_event *revs_;// 5. 设置完整报文的处理方法callback_t cb_;// "http", httpservice;// std::unordered_map<std::string, callback_t> cbs_;
};

Util.hpp:

#pragma once#include <iostream>
#include <string>
#include <unistd.h>
#include <fcntl.h>class Util
{
public:static void SetNonBlock(int fd){int fl = fcntl(fd, F_GETFL);fcntl(fd, F_SETFL, fl | O_NONBLOCK);}
};

main.cc

#include "TcpServer.hpp"
#include "Service.hpp"
#include <memory>using namespace std;static void usage(std::string process)
{cerr << "\nUsage: " << process << " port\n"<< endl;
}
int BeginHandler(Connection *conn, std::string &message, service_t service)
{// 我们能保证,message一定是一个完整的报文,因为我们已经对它进行了解码Request req;// 反序列化,进行处理的问题if (!Parser(message, &req)){// 写回错误消息return -1;// 可以直接关闭连接// conn->excepter_(conn);}// 业务逻辑Response resp = service(req);std::cout << req.x << " " << req.op << " " << req.y << std::endl;std::cout << resp.code << " " << resp.result << std::endl;// 序列化std::string sendstr;Serialize(resp, &sendstr);// 处理完毕的结果,发送回给clientconn->outbuffer_ += sendstr;conn->sender_(conn);if(conn->outbuffer_.empty()) conn->R_->EnableReadWrite(conn->sock_, true, false);else conn->R_->EnableReadWrite(conn->sock_, true, true);// conn->R_->EnableReadWrite(conn->sock_, true, true);// conn->sender_()// 能不能直接调用send方法呢?// 谈谈多路转接的发送问题?std::cout << "这里就是上次的业务逻辑啦 --- end" << std::endl;// 如果我们处理完了结果了,如何返回呢???// conn->outbuffer_ += result;// epollreturn 0;
}// 1 + 1X2 + 3X5 + 6X8 -> 1 + 1
int HandlerRequest(Connection *conn, std::string &message)
{return BeginHandler(conn, message, calculator);
}int main(int argc, char *argv[])
{if (argc != 2){usage(argv[0]);exit(0);}// http.XXX("GET", "/aaa");unique_ptr<TcpServer> svr(new TcpServer(HandlerRequest, atoi(argv[1])));svr->Run();return 0;
}

http://www.ppmy.cn/news/61868.html

相关文章

Scala Option类型,异常处理,IO,高阶函数

Option类型 实际开发中, 在返回一些数据时, 难免会遇到空指针异常(NullPointerException), 遇到一次就处理一次相对来讲还是比较繁琐的. 在Scala中, 我们返回某些数据时&#xff0c;可以返回一个Option类型的对象来封装具体的数据&#xff0c;从而实现有效的避免空指针异常。S…

SpringBoot 自定义注解实现Redis缓存功能

背景 最近小A的公司要做一个大屏可视化平台&#xff0c;主要是给领导看的&#xff0c;领导说这个项目要给领导演示&#xff0c;效果好不好直接关系到能不能拿下这个项目&#xff0c;领导还补了一句“这项目至少是百万级的&#xff0c;大伙要全力以赴”&#xff0c;早上小A还想…

js高级知识汇总一

目录 1.怎么理解闭包&#xff1f; 2.闭包的作用&#xff1f; 3.闭包可能引起的问题&#xff1f; 4.变量提升 5.函数动态参数 6.剩余参数 ...&#xff08;实际开发中提倡使用&#xff09; 7.展开运算符 8.箭头函数 9.解构赋值&#xff08;数组、对象&#xff09; 10 创…

道德经-第十六章

致虚极&#xff0c;守静笃。 万物并作&#xff0c;吾以观复。 夫物芸芸&#xff0c;各复归其根。 归根曰静&#xff0c;是谓复命。 复命曰常&#xff0c;知常曰明。 不知常&#xff0c;妄作&#xff0c;凶。 知常容&#xff0c;容乃公。 公乃王&#xff0c;王乃天。 天乃道&…

云原生时代崛起的编程语言Go并发编程实战

文章目录 概述基础理论并发原语协程-Goroutine通道-Channel多路复用-Select通道使用超时-Timeout非阻塞通道操作关闭通道通道迭代 定时器-TimerAndTicker工作池-Worker Pools等待组-WaitGroup原子操作-Atomic互斥锁-Mutex读写互斥锁-RWMutex有状态协程单执行-Once条件-Cond上下…

使用prometheus时发现mongodb exporter的/metrics数据展示很慢,延迟高

项目场景&#xff1a; 使用prometheusgrafana搭建对mongoDB集群的监控。 问题描述 使用prometheus时发现mongodb exporter的/metrics数据展示接口很慢&#xff0c;延迟高。 看了一下大概是10s 原因分析&#xff1a; 由于是在云服务器上进行搭建的。 经过尝试之后发现创建mo…

Visual Studio Code 1.78 发布

VS Code 1.78 已发布&#xff0c;此版本一些主要亮点包括&#xff1a; 辅助功能改进 - 更好的屏幕阅读器支持、新的音频提示。新的颜色主题 - “Modern” 浅色和深色主题默认设置。 配置文件模板 - Python、Java、数据科学等的内置模板。 新版本提供了配置文件模板&#xff0…

第7章链接:引言

链接&#xff08;linking&#xff09;是将各种代码和数据部分收集起来并组成称为一个单一文件的过程&#xff0c;这个文件可被加载&#xff08;或拷贝&#xff09;到存储器并执行。 链接可在如下三个阶段执行&#xff1a; 编译时&#xff08;complile time&#xff09;&#…