C++ IO多路复用 poll模型

ops/2024/10/18 2:27:28/

原文链接:C++ IO多路复用 poll模型

预备知识

poll模型前置需要了解的可以参考:

IO控制:fcntl库:IO控制库

多线程:C++ Linux多线程同步通信-信号量

socket:C++ Linux多进程Socket通信

select模型:C++ IO多路复用 select模型

poll模型

特性
原理

poll是对select的改进,主要与select进行比较

优点
  1. IO无数量限制:select用位图检测IO,存在一个MAX_SIZE,但是poll引入链表解决上限问题
  2. poll将监听集合与响应集合分别设置,不需要每次重置集合
缺点
  1. 没有改变select的线性遍历的轮询方法,连接数越多,耗时越长
库函数<sys/poll>
# include <poll>
#include <errno.h> int poll ( struct pollfd * fdarray, unsigned int nfds, int timeout);fdarray: 监听的IO数组nfds: fds数组的长度timeout: 0(非阻塞) -1(阻塞) >0(时限)return >0(存在事件的IO数量) 0(超出时限无IO事件) -1(错误)错误的errno类型:EBADF(存在无效IO),EFAULT(fds越界),EINTR(收到中断信号),EINVAL(fds超过上限),ENOMEM(内存不足)struct pollfd {int fd;         	// 需要监听的IOshort events;       // 等待的事件类型short revents;      // 实际发生的事件类型
} ; events(监听位标志):POLLIN 所有可读POLLRDNORM 正常数据可读POLLRDBAND 高优先级数据可读 (如缓冲区需要优先读取的数据)POLLPRI 紧急数据可读 (要求立即读取,如TCP外带指针的控制信息) 四种标志区分用于对读事件有更高区分要求的场景POLLOUT 所有可写POLLWRNORM 正常数据可写POLLWRBAND 高优先级数据可写 (特殊协议里面使用,当该IO优先数据的缓冲区可写时标志) POLLRDHUP 该IO半关闭,不会再给主机发数据,但是还可以接受数据
revents(返回位标志): 相比events,还增加了3个标志POLLERR IO错误时返回POLLHUP IO已经关闭POLLNVAL IO未打开
实例: 实现非阻塞请求响应形式的一个服务器,提供一个四则运算的API, 客户端同步(要求响应后才能再次请求)
请求响应格式
  1. 请求格式
    以http协议为例,请求包含请求头和请求体 最简单的请求头(本例实现的)包括:请求类型,请求主体的字节长度,是否保持连接,是否编码加密.

请求类型这种枚举表示可以用二进制位表示,减小数据包大小

编码加密我们使用简单的数据+10表示.解码-10即可

struct  RequestData{ //20Bstruct  RequestHeader; //8B short会对齐 struct  RequestBody; //12B
};struct  RequestHeader{short  type; // 按位表示|encode|keep-alive| 两位表示4种情况int bodylen; //
};struct  RequestBody{short method; // 按位表示: add|sub|times|divide| 两位表示4种情况int num1; int num2;
};
  1. 响应格式
struct  ResponseData{ //16Bstruct  ResponseHeader; //8Bstruct  ResponseBody; //8B
};struct  ResponseHeader{short  type; // 直接返回请求相同类型int bodylen; // 数据大小
};struct  ResponseBody{short method; // 直接返回请求相同类型int result; 
};
IO连接的处理

这种请求响应格式的服务,需要考虑IO响应后的处理,如keepalive的需要保持连接,否则直接发送完后立即断开连接(close fd,从fds中删除 TCP断开是4挥手过程,可以等响应被读出后再断开)

问题

实际写的时候有几个问题:

  1. 消息的处理时,IO复用+线程池+线程同步最关键的就是响应集合的结构:如果是哈希表,每个请求必须响应后才能再次请求(因为哈希表key唯一). 如果是队列,可以支持同一个服务器多个请求,但是队列的结构需要设计

  2. 队列消息最大的问题就是消息不能直接查找,当遍历fd时,需要每次遍历队列找到相应的消息. 当按照消息遍历每次也不能定位fd在数组哪里,并且fd不一定立即可写. 哈希表能够通过fd直接定位消息.

  3. 队列结构如何设计实现:fd可写和首个消息定位都为O(1)?

  4. 响应结束要求断开连接怎么实现?因为fds数组idx和fd对应,如果断开连接,fds的对应就变化了,因此哈希表不能用来记录索引和fd对应项.断开连接只能重新移动赋值一个新的fds

程序

实际遇到的bug:

  1. 事件判断的时候错误使用了|符号confds[0].revents|POLLIN

正确的为&按位与

  1. accept没有错误检测
    添加

  2. 循环开始时要清空所有IO的revent=0

server.cpp:接收请求,创建线程处理,将结果添加哈希表项,等响应时写回,并删除表项,同时对连接数组重新处理
#include<iostream>
#include<cstring>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<sys/poll.h>
#include<unordered_map>
#include<semaphore.h>
#include<thread>
#include<climits>
#include<unistd.h>
using namespace std;#define SLEEP_TIME 1
#define MAX_CONNECT 4
#define MAX_RUNNING 4#define REQUESTDATA_SIZE 20
#define REQUESTHEADER_SIZE 8
#define REQUESTBODY_SIZE 12
#define RESPONSEDATA_SIZE 16
#define RESPONSEHEADER_SIZE 8
#define RESPONSEBODY_SIZE 8#define ENCODE 1
#define ALIVE 2
#define ADD 0
#define SUB 1
#define TIMES 2
#define DIVIDE 3struct  RequestHeader{short  type; // 按位表示|encode|keep-alive| 两位表示4种情况int bodylen; //
};
struct  RequestBody{short method; // 按位表示: add|sub|times|divide| 两位表示4种情况int num1; int num2;
};
struct  RequestData{ //20Bstruct  RequestHeader header; //8B short会对齐 struct  RequestBody body; //12B
};struct  ResponseHeader{short  type; // 直接返回请求相同类型int bodylen; // 数据大小
};
struct  ResponseBody{short method; // 直接返回请求相同类型int result; // return nullptr if error 
};
struct  ResponseData{ //16Bstruct  ResponseHeader header; //8Bstruct  ResponseBody body; //8B
};int running_threads=0;unordered_map<int,struct ResponseData*> responses;sem_t sem_thread;// 线程处理请求函数
int processing_func(int fd,RequestData reqdata){struct ResponseHeader resheader;struct ResponseBody resbody;resheader.type=reqdata.header.type;resheader.bodylen=RESPONSEBODY_SIZE;resbody.method=reqdata.body.method;resbody.result=INT_MAX;struct ResponseData resdata;resdata.header=resheader;resdata.body=resbody;int result=0,num1=(reqdata.body).num1,num2=(reqdata.body).num2;if(resheader.type&ENCODE){num1-=10;num2-=10;}//处理if(resbody.method&ADD){result=num1+num2;}else if(resbody.method&SUB){result=num1-num2;}else if(resbody.method&TIMES){result=num1*num2;}else if(resbody.method&DIVIDE){if(num2==0) result=INT_MAX;else{result=num1/num2;}}else{perror("error request body.method"); }resbody.result=result;//临界区添加数据sem_wait(&sem_thread);if(!responses.count(fd)){responses[fd]=&resdata;}else{perror("responses get request from same fd twice");}sem_post(&sem_thread);return 0;
}int main(){//信号量同步sem_init(&sem_thread, 0, 1);string server_ip="127.0.0.1";uint16_t server_port=8001;int server_fd=socket(AF_INET,SOCK_STREAM,0);if(server_fd==-1){perror("socket");exit(EXIT_FAILURE);}struct sockaddr_in server_addr;server_addr.sin_family=AF_INET;server_addr.sin_port=htons(server_port);inet_pton(AF_INET,server_ip.c_str(),(struct sockaddr*)&server_addr.sin_addr.s_addr);if(bind(server_fd,(const struct sockaddr*)&server_addr,sizeof(server_addr))==-1){perror("bind");exit(EXIT_FAILURE);}if(listen(server_fd,10)==-1){perror("listen");exit(EXIT_FAILURE);}struct pollfd confds[MAX_CONNECT+1];confds[0].fd=server_fd;confds[0].events=POLLIN;struct RequestHeader reqheader;struct RequestBody reqbody;reqheader.type=0;reqheader.bodylen=REQUESTBODY_SIZE;reqbody.method=0;reqbody.num1=0;reqbody.num2=0;struct RequestData reqdata;reqdata.header=reqheader;reqdata.body=reqbody;int cur_cons=1;while(true){int nevents=poll(confds,cur_cons,0);if(nevents==-1){perror("poll");}else if(nevents==0){cout<<"no events"<<"\n";sleep(SLEEP_TIME);continue;}else ;//添加新连接if((confds[0].revents|POLLIN)&&cur_cons<MAX_CONNECT+1){int new_fd=accept(server_fd,nullptr,nullptr);confds[cur_cons].fd=new_fd;confds[cur_cons].events=POLLIN|POLLOUT;confds[cur_cons].revents=0;cur_cons++;}//遍历处理IO事件for(int i=1;i<cur_cons;i++){//遍历接受数据if((confds[i].revents&POLLIN)&&(running_threads<MAX_RUNNING)){ssize_t size=recv(confds[i].fd,&reqdata,REQUESTDATA_SIZE,0);if(size<=0){perror("recv");exit(EXIT_FAILURE);}else{thread t(processing_func,confds[i].fd,reqdata);if(t.joinable()) t.join();cout<<"recv request: request size=="<<size<<"(REQUESTDATA_SIZE=20)\n";}}//遍历发送数据if(confds[i].revents&POLLOUT){int fd=confds[i].fd;if(responses.count(fd)){ssize_t size=send(confds[i].fd,responses[fd],RESPONSEDATA_SIZE,0);if(size<=0){perror("send");}else{cout<<"send response: response size=="<<size<<"(RESPONSEDATA_SIZE=16)\n";if(!(responses[fd]->header.type&ALIVE)){if(close(fd)==-1){perror("close");}struct pollfd new_confds[MAX_CONNECT+1];//删除第i个连接memcpy(new_confds, confds, i*sizeof(struct pollfd));memcpy(new_confds + i * sizeof(struct pollfd), confds+(i+1)*sizeof(struct pollfd), (MAX_CONNECT-i-1)*sizeof(struct pollfd));//删除哈希表数据,连接数-1sem_wait(&sem_thread);responses.erase(fd);sem_post(&sem_thread);cur_cons--;}}}}}}return 0;
}
client.cpp 单个进程计算请求,阻塞,等待结果返回输出.
#include<sys/socket.h>
#include<arpa/inet.h>
#include<cstring>
#include<unistd.h>
#include<iostream>
#include<thread>
#include<unistd.h>
#include<random>
#include<unordered_map>
using namespace std;#define REQUESTDATA_SIZE 20
#define REQUESTHEADER_SIZE 8
#define REQUESTBODY_SIZE 12
#define RESPONSEDATA_SIZE 16
#define RESPONSEHEADER_SIZE 8
#define RESPONSEBODY_SIZE 8#define ENCODE 1
#define ALIVE 2
#define ADD 0
#define SUB 1
#define TIMES 2
#define DIVIDE 3struct  RequestHeader{short  type; // 按位表示|encode|keep-alive| 两位表示4种情况int bodylen; //
};
struct  RequestBody{short method; // 按位表示: add|sub|times|divide| 两位表示4种情况int num1; int num2;
};
struct  RequestData{ //20Bstruct  RequestHeader header; //8B short会对齐 struct  RequestBody body; //12B
};struct  ResponseHeader{short  type; // 直接返回请求相同类型int bodylen; // 数据大小
};
struct  ResponseBody{short method; // 直接返回请求相同类型int result; // return nullptr if error 
};
struct  ResponseData{ //16Bstruct  ResponseHeader header; //8Bstruct  ResponseBody body; //8B
};unordered_map<int,string> um={{ADD,"ADD"},{SUB,"SUB"},{TIMES,"TIMES"},{DIVIDE,"DIVIDE"}
};random_device rd;
mt19937 gen(rd());
uniform_int_distribution<> randtype(0, 2);
uniform_int_distribution<> randmethod(0, 3);
uniform_int_distribution<> randnum(0, 100);void postfunc(string id,int server_fd,struct sockaddr_in server_addr,struct RequestData reqdata){if(connect(server_fd,(sockaddr*)&server_addr,sizeof(server_addr))==-1){perror("connect");return ;}reqdata.header.type=randtype(gen);reqdata.body.method=randmethod(gen);reqdata.body.num1=randnum(gen);reqdata.body.num2=randnum(gen);if((reqdata.header.type&ENCODE)){reqdata.body.num1+=10;reqdata.body.num2+=10;}// sendssize_t size=send(server_fd,(void*)&reqdata,REQUESTDATA_SIZE,0);if(size<0){perror("send");exit(EXIT_FAILURE);}else if(size==0){perror("connect closed");}else{cout<<"thread"<<id<<" send success"<<"\n";}struct ResponseData resdata;// recvsize=recv(server_fd,(void*)&resdata,sizeof(resdata),0);if(size<0){perror("recv");}else if(size==0){perror("connect closed");}else{cout<<"thread"<<id<<":"<<"[ENCODE="<<(reqdata.header.type&ENCODE)<<" ALIVE="<<(reqdata.header.type&ALIVE)<<"num1="<<reqdata.body.num1<<" "<<um[reqdata.body.method]<<" num2="<<reqdata.body.num2<<" Result="<<resdata.body.result<<"\n";}}int main(int argc,char* argv){string id;if(argc==0){cout<<"no id args"<<"\n";}else{id=argv[0];}//socket int server_fd=socket(AF_INET,SOCK_STREAM,0);if(server_fd==-1){perror("socket");exit(EXIT_FAILURE);}struct sockaddr_in server_addr;string server_ip="127.0.0.1";uint16_t server_port=8001;server_addr.sin_family=AF_INET;server_addr.sin_port=htons(server_port);inet_pton(AF_INET,server_ip.c_str(),&server_addr.sin_addr.s_addr);struct RequestHeader reqheader;struct RequestBody reqbody;reqheader.type=0;reqheader.bodylen=REQUESTBODY_SIZE;reqbody.method=0;reqbody.num1=0;reqbody.num2=0;struct RequestData reqdata;reqdata.header=reqheader;reqdata.body=reqbody;postfunc(id,server_fd,server_addr,reqdata);return 0;
}
multi_client.cpp 10个进程同时请求
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/wait.h>
#include <vector>
#include<filesystem>
using namespace std;int child_process(const char* program_name,const char* args[]){pid_t cpid=fork();if(cpid==-1){perror("fork");exit(EXIT_FAILURE);}else if(cpid>0){return cpid;}else{execve(program_name,const_cast<char* const*>(args),nullptr);perror("execve");exit(EXIT_FAILURE);}}int main()
{char* program_name=(char*)"./c.cpp";if(!std::filesystem::exists(program_name)){cout<<"file not exists\n";exit(EXIT_FAILURE);}vector<pid_t> childs(10);for(int i=0;i<10;i++){string process_id="process_"+to_string(i);const char* args[]={program_name,process_id.c_str(),nullptr};childs[i]=child_process(program_name,args);}pid_t cpid;while((cpid=wait(nullptr))>0){cout<<"child."<<cpid<<"terminated\n";}return 0;
}

http://www.ppmy.cn/ops/126359.html

相关文章

C#从零开始学习(Head First C#)

想要开发游戏&#xff0c;C#是unity用的编程语言,所以想系统的巩固和学习一下&#xff0c;在此记录自己的学习笔记&#xff0c;来和大家共同学习&#xff0c;同时也希望能够帮助一些想入门的同学&#xff0c;因此我会使用Head First C#这本书籍,从最开始的章节记录。给自己定个…

【环境搭建】远程服务器搭建ElasticSearch

参考&#xff1a; 非常详细的阿里云服务器安装ElasticSearch过程..._阿里云服务器使用elasticsearch-CSDN博客 服务器平台&#xff1a;AutoDL 注意&#xff1a; 1、切换为非root用户&#xff0c;su 新用户名&#xff0c;否则ES无法启动 2、安装过程中没有出现设置账号密码…

使用shell实现高精度时间日志记录与时间跳变检测

文章目录 0. 概述1. 使用说明1.1. 参数说明1.2. 运行脚本 2. 脚本详细解析2.1. 参数初始化2.2. 参数解析与验证2.3 主循环条件2.4 时间跳变检测与处理2.5. 日志轮转机制2.6. 睡眠时间计算 0. 概述 之前写过单线程版本的高精度时间日志记录小程序&#xff1a;C编程&#xff1a;…

前端布局,y轴超出滚动、x轴超出展示方案

想要实现布局效果&#xff0c;红区高度固定可滑动可收起。红区引用绿区组件。 一般会想到如下方案&#xff0c;红区样式&#xff1a; width&#xff1a;200px; height: 100%; overflow-y: auto; overflow-x: visible; 但是效果并不好&#xff0c;绿区直接隐藏了 最终采用布局方…

数字后端零基础入门系列 | Innovus零基础LAB学习Day1

一 Floorplan 数字IC后端设计如何从零基础快速入门&#xff1f;(内附数字IC后端学习视频&#xff09; Lab5-1这个lab学习目标很明确——启动Innovus工具并完成设计的导入。 在进入lab之前&#xff0c;我们需要进入我们的FPR工作目录。 其中ic062为个人服务器账户。比如你端…

C++初阶学习第七弹——string的模拟实现

C初阶学习第六弹------标准库中的string类_c语言返回string-CSDN博客 通过上篇我们已经学习到了string类的基本使用&#xff0c;这里我们就试着模拟实现一些&#xff0c;我们主要实现一些常用到的函数。 目录 一、string类的构造 二、string类的拷贝构造 三、string类的析构函…

JDBC增删改查操作的基本步骤

JDBC&#xff08;Java Database Connectivity&#xff09;是一种用于执行数据库操作的Java API。以下是使用JDBC进行增删改查&#xff08;CRUD&#xff09;操作的基本步骤和代码示例。 步骤&#xff1a; 加载数据库驱动&#xff1a;确保JDBC驱动程序类被加载。建立数据库连接…

WPF入门_02依赖属性

1、依赖属性主要有以下三个优点 1)依赖属性加入了属性变化通知、限制、验证等功能。这样可以使我们更方便地实现应用,同时大大减少了代码量 2)节约内存:在WinForm中,每个UI控件的属性都赋予了初始值,这样每个相同的控件在内存中都会保存一份初始值。而WPF依赖属性很好地…