【Linux】匿名管道通信场景——进程池

server/2024/12/27 2:43:54/
🔥 个人主页:大耳朵土土垚
🔥 所属专栏:Linux系统编程

这里将会不定期更新有关Linux的内容,欢迎大家点赞,收藏,评论🥳🥳🎉🎉🎉

文章目录

  • 1. 初始化进程池
  • 2. 进程池执行任务
    • 2.1 任务管理
    • 2.2 执行任务
  • 3. 清理进程池
  • 4. 封装与完整实现
  • 5. 结语

1. 初始化进程池

  进程池的实现是依靠匿名管道,通过进程间通信使得父进程能够管理多个进程任务,相当于父进程拥有了很多个进程——进程池,通过不同的进程完成指定的任务。
  所以我们需要创建多个匿名管道和子进程,进行进程间通信,发送信息给子进程让它们根据接收到的信息处理相关任务。
  因为有多个管道和子进程,为了方便父进程使用不同管道发送对应信息给子进程,我们需要将管道的文件描述符以及对应子进程的pid保存起来,我们选择将它们封装在一个Channel类中。又因为有多个匿名管道和子进程,所以将多个Channel类对象储存在C++STL中的容器vector中来方便父进程进行管理进程池。

代码如下:

int InitProcesspool(int num,std::vector<Channel>& channels)
{for(int i = 0; i < num; i++)//使用循环创建多个匿名管道和子进程{//1.创建匿名管道int pipefd[2] = {0};int n = pipe(pipefd);if(n < 0) return 2;//根据不同的返回值判断原因,也可以使用枚举来约定返回值代表的内容//2.创建子进程pid_t id = fork();if(id < 0) return 3;//3.建立通信管道,父子进程关闭读端或写端if(id == 0)//子进程{//子进程读取,关闭写端::close(pipefd[1]);//dup2dup2(pipefd[0],0);//子进程需要执行的内容Work();::exit(0);}//父进程//父进程写入,关闭读端::close(pipefd[0]);channels.emplace_back(pipefd[1],id);//保存在channel对象中并存入vector}return 0;
}

对子进程内部,我们使用dup2系统调用将匿名管道读端文件描述符与标准输入stdin交换,这样我们就不需要保存不同进程对应匿名管道的读端文件描述符,只需要统一从0号文件描述符中读取内容即可。

对于Channel类

class Channel{
public:Channel(int fd,pid_t who):_fd(fd),_who(who){_name = "Channel-"+std::to_string(fd)+"-"+std::to_string(who);}std::string GetName(){return _name;}int GetFd(){return _fd;}pid_t GetWho(){return _who;}void Send(int num)//父进程往匿名管道发送信息{::write(_fd,&num,sizeof(num));}~Channel(){}
private:int _fd;//保存匿名管道通信的文件描述符std::string _name;//名字(自己取的)pid_t _who;//子进程pid
};

对于父进程发送给子进程的信息我们选择约定一个数字对应一个任务,不同数字对应不同需要完成的任务,子进程接收到信息后就可以根据数字来确定不同的任务。

2. 进程池执行任务

2.1 任务管理

  执行任务之前我们需要先确定有哪些任务,怎么执行…所以我们需要进行任务管理,同样我们也是使用一个类TaskManager来进行任务管理:

#include<iostream>
#include<unordered_map>
#include<functional>
#include<ctime>using task_t = std::function<void()>;//函数指针//不同任务函数void Load(){std::cout<<"正在执行加载任务..."<<std::endl;}void Del(){std::cout<<"正在执行删除任务..."<<std::endl;}void Log(){std::cout<<"正在执行日志任务..."<<std::endl;}static int number = 0;
class TaskManager
{
public:TaskManager(){srand(time(nullptr));InsertTask(Load);InsertTask(Del);InsertTask(Log);}int SelectTask(){return rand()%number;}void InsertTask(task_t t){m[number++] = t;}void Excute(int num){if(m.find(num) == m.end())return;m[num]();//执行任务}~TaskManager(){}
private:std::unordered_map<int,task_t> m;//使用map封装数字与对应的任务函数指针
};TaskManager tm;

  选择新创建一个源文件Task.hpp来封装上述内容,上述任务管理类中我们使用map来保存数字与任务函数指针的相关关系,这样通过数字就可以确定是哪一个任务函数;此外选择任务使用的方法是随机数的方法,大家可以根据自己的想法确定不同的方式。

2.2 执行任务

  • 发送任务

使用按顺序轮询的方式派发任务给不同的子进程——设置10次任务循环,先通过任务管理类中的选择函数获取任务编号,然后通过父进程进程池管理类将任务编号发送给子进程。

void ExcuteTask(std::vector<Channel>& channels)
{int n = 0;int count = 10;while(count--)//执行10次任务{//1.选择任务,获取任务编号int tasknum = tm.SelectTask();//2.选择子进程,使用轮询选择,派发任务channels[n++].Send(tasknum);n%=channels.size();std::cout<<std::endl;std::cout<<"*****成功发送"<<10-count<<"个任务*****"<<std::endl;sleep(2);//每个2s发送一个任务}
}
  • 接受并执行任务

子进程接受并执行任务——先通过匿名管道接受父进程发送的任务编号,然后通过任务管理类对象执行任务编号所对应的任务函数。

//子进程接受并执行任务
void Work()
{while(true){int num = 0;int n = ::read(0,&num,sizeof(num));if(n == sizeof(num))//读取成功tm.Excute(num);//不要发成nelse if(n == 0){break;}else{break;}}
}

3. 清理进程池

我们需要回收匿名管道的文件描述符和子进程

void CleanProcesspool(std::vector<Channel>& channels)
{for(auto& c : channels)::close(c.GetFd());for(auto& c : channels){pid_t rid = ::waitpid(c.GetWho(),nullptr,0);if(rid <= 0){std::cout<<std::endl;std::cout<<"清理子进程失败..."<<std::endl;return;}}std::cout<<std::endl;std::cout<<"成功清理子进程..."<<std::endl;
}

  注意这里不能使用一个循环来进行清理,如下面代码是错误的:

void CleanProcesspool(std::vector<Channel>& channels)
{for(auto& c : channels)//只使用一次循环{::close(c.GetFd());pid_t rid = ::waitpid(c.GetWho(),nullptr,0);if(rid <= 0){std::cout<<std::endl;std::cout<<"清理子进程失败..."<<std::endl;return;}}std::cout<<std::endl;std::cout<<"成功清理子进程..."<<std::endl;
}

这是因为在创建子进程时,子进程会继承父进程的文件描述符表,因此在第一个匿名管道创建后,例如父进程的4号文件描述符指向该匿名管道写端,那么在创建第二个子进程时,该子进程会继承4号文件描述符也指向第一个匿名管道写端,因此创建的子进程越多,前面匿名管道写端被指向的就越多,所以仅仅关闭一个进程的写端指向,还有其他的写端指向,所以读端无法读到0,也就无法退出,如下图所示:
在这里插入图片描述
在这里插入图片描述

当创建2个子进程时,第一个匿名管道写端就有两个进程指向,当创建的进程越多,该写端指向的也就越多。

如果要使用一个循环来清理回收子进程,我们可以从后往前关闭文件描述符,因为最后一个管道写端只有父进程指向。

4. 封装与完整实现

  对于父进程管理进程池我们可以封装一个类来更好的进行管理与实现:

#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <stdlib.h>
#include <sys/wait.h>#include "Task.hpp"
#include "Channel.hpp"
// masterclass ProcessPool
{
public:int InitProcesspool(int num){for (int i = 0; i < num; i++){// 1.创建匿名管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return 2;// 2.创建子进程pid_t id = fork();if (id < 0)return 3;// 3.建立通信管道,父子进程关闭读端或写端if (id == 0) // 子进程{// 子进程读取,关闭写端::close(pipefd[1]);// dup2dup2(pipefd[0], 0);Work();::exit(0);}// 父进程// 父进程写入,关闭读端::close(pipefd[0]);channels.emplace_back(pipefd[1], id);}return 0;}void ExcuteTask(){int n = 0;int count = 10;while (count--) // 执行10次任务{// 1.选择任务,获取任务编号int tasknum = tm.SelectTask();// 2.选择子进程,使用轮询选择,派发任务channels[n++].Send(tasknum);n %= channels.size();std::cout << std::endl;// std::cout<<"**************************"<<std::endl;std::cout << "*****成功发送" << 10 - count << "个任务*****" << std::endl;// std::cout<<"**************************"<<std::endl;// std::cout<<std::endl;sleep(3);}}void CleanProcesspool(){for (auto &c : channels)::close(c.GetFd());for (auto &c : channels){pid_t rid = ::waitpid(c.GetWho(), nullptr, 0);if (rid <= 0){std::cout << std::endl;std::cout << "清理子进程失败..." << std::endl;return;}}std::cout << std::endl;std::cout << "成功清理子进程..." << std::endl;}private:std::vector<Channel> channels;
};

main函数:

#include "ProcessPool.hpp"int main(int argc, char* argv[])
{//0.获取应该创建管道个数num个if(argc!=2){std::cout<<"请输入管道个数."<<std::endl;return 1;}int num = std::stoi(argv[1]);std::vector<Channel> channels;ProcessPool* pp = new ProcessPool;//1.初始化进程池——创建进程池pp->InitProcesspool(num);//2.执行任务pp->ExcuteTask();//3.任务执行完成,回收子进程pp->CleanProcesspool();delete pp;return 0;
}

运行结果如下:

在这里插入图片描述

5. 结语

  以上就是基于匿名管道通信实现的进程池,子进程会根据接受到的信息执行不同的任务,父进程可以看作Master来进行管理创建的多个进程。关键点在于对进程管理的封装以及回收子进程时会有多个进程指向匿名管道的读端,所以回收时要注意可能会出现的bug。


http://www.ppmy.cn/server/147584.html

相关文章

【Word 知识点】

快捷键 1.复制 Ctrlc 2.粘贴 Ctrlv 3.剪切 Ctrlx 4.全选 Ctrla 5.加粗 Ctrlb 6.打开 Ctrlo 7.新建 Ctrln 8.保存 Ctrls 9.查找 Ctrlf 10.替换 Ctrlh Word 要点 1.文档基本操作&#xff1a; 新建 打开 保存 复制 粘贴 剪切 查找 替换 2.字体&#xff1a;字体 字号 颜…

《山海经》:东山

《山海经》&#xff1a;东山 樕[朱{虫虫}(上下)]山&#xff08;鳙鳙鱼&#xff1a;形像犁牛发出猪叫&#xff09;藟山栒状山&#xff08;从从&#xff1a;形状像狗&#xff0c;六条腿&#xff09;勃亝山番条山姑儿山高氏山岳山山&#xff08;状夸父&#xff09;独山&#xff08…

计算机网络安全

从广义来说&#xff0c;凡是涉及到网络上信息的机密性、报文完整性、端点鉴别等技术和理论都是网络安全的研究领域。 机密性指仅有发送方和接收方能理解传输报文的内容&#xff0c;而其他未授权用户不能解密&#xff08;理解&#xff09;该报文报文完整性指报文在传输过程中不…

【Open-Fegin使用介绍】

文章目录 OpenFegin的介绍OpenFegin使用1.导入配置2. 开启OpenFegin3.配置OpenFeigin的类4.OpenFegin的使用 FeginClien注解 OpenFegin的介绍 OpenFeign 是一个声明式的Web服务客户端&#xff0c;它使得编写Web服务客户端变得更加容易。OpenFeign 旨在整合Ribbon和Nacos&#…

用“*”构成一个倒三角形:JAVA

输入&#xff1a;5 输出&#xff1a; ******* ***** *** * 代码&#xff1a; import java.util.Scanner; //倒三角 public class FF6 {public static void main(String[] args) {Scanner scannernew Scanner(System.in);while (scanner.hasNextInt()){int nscanner…

Ubuntu 安装Ansible ansible.cfg配置文件生成

安装后的ansible.cfg后的默认内容如下&#xff1a; rootlocalhost:/etc/ansible# cat ansible.cfg # Since Ansible 2.12 (core): # To generate an example config file (a "disabled" one with all default settings, commented out): # $ ansible-…

Wwise 使用MIDI文件、采样音频

第一种&#xff1a;当采样音频只有一个文件的时候 1.拖入MIDI文件到Interactive Music Hierarchy层级 2.拖入采样音频到Actor-Mixer Hierarchy层级 3.勾选MIDI显示出面板&#xff0c;设置Root Note与采样音频音高相同&#xff0c;这里是C#5 4.播放测试&#xff0c;成功&…

第十一课 Unity编辑器创建的资源优化_预制体和材质篇(Prefabs和Materials)详解

预制体(Prefabs) Unity中的预制体是用来存储游戏对象、子对象及其所需组件的可重用资源&#xff0c;一般来说预制体资源可充当资源模版&#xff0c;在此模版基础上可以在场景中创建新的预制体实例。 使用预制体的好处 由于预制体系统可以自动保持所有实例副本同步&#xff0c…