【Linux】进程间通信——进程池

news/2025/2/21 12:36:24/

在这里插入图片描述

文章目录

  • 进程池
    • 什么进程池
    • 进程池的作用
  • 用代码模拟进程池
    • 管道信息
    • 任务类
    • InitProcesspool()
    • DisPatchTasks()
    • 任务的执行逻辑(Work)
    • CleanProcessPool()
  • 封装
    • main.cc
    • Channel.hpp
    • ProcessPool.hpp
    • Task.hpp
    • Makefile
  • 总结
    • 总结

进程池

什么进程池

进程池(Process Pool)是一种用于管理进程的技术,它通过预先创建一定数量的进程来避免频繁创建和销毁进程的开销。进程池通常用于需要并发执行大量任务的场景,特别是在处理CPU密集型任务时。

在这里插入图片描述

上面这种模型就是进程池,父进程通过创建多个子进程,然后通过管道连接,分别向各个子进程派发任务。
上面的父进程称为master进程,子进程称为worker进程或者slaver进程。

进程池的作用

进程池的作用主要体现在以下几个方面:

  1. 提高性能: 通过预先创建一定数量的进程,减少了频繁创建和销毁进程的开销,尤其在需要大量并发任务处理时,能有效提升整体执行效率。

  2. 减少系统资源消耗: 在系统中创建进程是一个资源密集型操作,频繁创建和销毁进程会导致资源浪费。进程池通过复用已有进程,避免了这一问题。

  3. 提升任务响应速度: 由于进程池中的进程是预先创建的,所以当需要处理任务时,可以快速分配一个空闲进程,而不需要等待进程的创建。

  4. 更好的资源管理: 进程池可以限制系统中的最大并发进程数,避免过多进程同时运行,导致系统资源(如CPU、内存)耗尽,从而保护系统的稳定性。

  5. 并行处理: 对于CPU密集型任务,进程池通过并行化处理多个任务,可以显著提升处理效率,尤其是在多核CPU的环境中。

进程池适合用于大规模并发任务的处理,如Web爬虫、数据处理、大规模计算等场景。

用代码模拟进程池

管道信息

首先实现进程池,我们要控制创建多少个进程,所以可以用cin,手动输入,但是还有一种控制创建子进程格式的方法就是通过main函数的参数进行控制子进程的创建个数

enum
{OK = 0,UsageError,PipeError,ForkError,
};
void Usage(string proc)
{cout<<"Usage: "<<proc<<"process-num"<<endl;
}
int main(int argc, char *argv[])
{if(argc != 2){Usage(argv[0]);return UsageError;}return 0;
}

main函数的第一个参数是数组元素个数,已知数组的第一个元素是可执行程序的名称(./XXXX),所以第二个元素是创建子进程个数,如果输入的命令行的格式错误我们有一个函数Usage会输出,类型错误,并且返回枚举类型UsageError。

有了需要创建的子进程的个数,接下来我们就要创建管道并且创建子进程了。

我们封装一个类用这个类来管理进程池

using work_t = function<void()>;
class ProcessPool
{
public:ProcessPool(int n,work_t w):num(n),work(w){}~ProcessPool(){}int InitProcesspool(){}void DisPatchTasks(){}void CleanProcessPool(){}
private:vector<Channel> channels;//管道int num;//进程的个数work_t work;//函数类型(工作类型(void()))
};

上面vector中的类型也是一个我们自己封装的类,用来管理管道。

class Channel
{
public://构造函数Channel(int wfd,pid_t who):_wfd(wfd),_who(who){_name="Channel-"+to_string(wfd)+'-'+to_string(who);}//获取名字string Name(){return _name;}void Send(int cmd)//cmd是任务码{write(_wfd,&cmd,sizeof(cmd));}void Close()//关闭写端{close(_wfd);}pid_t Id()//获取写端对应的子进程{return _who;}//析构函数~Channel(){}
private:int _wfd;//master的写端string _name;//对应worker的名字pid_t _who;//记录哪个子进程的管道
};

Channel的作用:保存写端的fd,记录写端对应的子进程的名字,还有子进程的pid。
除了需要管理channels的类,我们还需要管理任务的类,因为我们还没有任务。

任务类

先模拟出三种任务

void Download()
{cout<<"我是下载任务....."<<endl;
}
void Log()
{cout<<"我是日志任务....."<<endl;
}
void Sql()
{cout<<"我是数据库同步任务....."<<endl;
}

任务我们用TaskManeger来表示,直接用容器unordered_map<int,task_t>来存储任务,task_t表示返回值是void的函数类型,识别到任务码直接调取函数即可,存储的就是上面三个模拟的函数。选择任务我们用随机数选取,只需要模上任务的数量即可。

static int number = 0;
class TaskManager
{
public:TaskManager(){//srand(time(nullptr));InsertTask(Download);InsertTask(Log);InsertTask(Sql);}~TaskManager(){}//插入任务void InsertTask(task_t t){//number表示有多少个任务tasks[number++]=t;}//选择任务int SelectTask(){//选择一个任务return rand() % number;}//执行任务void Excute(int number){//没找到说明方法是空的if(tasks.find(number) == tasks.end()) return;//找到了执行这个方法tasks[number]();}
private://int(任务码)--->任务种类unordered_map<int,task_t> tasks;
};TaskManager tmp;//函数作用:传进来的参数不对,这个函数就告诉我们如何传递参数
void Usage(string proc)
{cout<<"Usage: "<<proc<<"process-num"<<endl;
}

接下来我们来完成这三个函数:
在这里插入图片描述

InitProcesspool()

首先根据成员提供的需要创建的子进程的个数创建出管道,然后判断管道是否创建成功,管道创建之后,创建子进程,子进程创建失败返回错误码,如果子进程创建成功判断fork的返回值,返回值是0则是子进程,需要关闭写端,读端的fd原本是pipefd2将其重定向到0中也就是标准输入之后,进入提前定义好的work中,完成work之后,子进程直接退出exit()出来之后只可能是父进程,因为子进程在if中已经退出了,所以关闭父进程的读端,然后将数据插入到channels当中。

int InitProcesspool()
{for (int i = 0; i < num; i++){// 1. 创建管道// 管道初始化成0int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return PipeError; // 管道创建失败// 2. 创建进程pid_t id = fork();if (id < 0)return ForkError; // 子进程创建失败// 创建通信信道if (id == 0){close(pipefd[1]); // 关闭子进程的写窗口// 子进程// 原本读的是pipefd0的内容,现在重定向到标准输入,现在读端读取的就是标准输入dup2(pipefd[0], 0);work();  // 子进程需要做的工作exit(0); // 子进程干完工作直接结束}// 因为上面子进程执行完任务会直接退出,所以下面的代码只能是父进程执行close(pipefd[0]); // 关闭父进程的读窗口channels.emplace_back(pipefd[1], id);}return OK;
}

DisPatchTasks()

为了保证每个进程都在不停的工作,不会出现一个进程一直休息,一个进程一直工作的情况,我们采用轮询派发任务,也就是遍历数组依次派发任务,直到没有任务为止(这里任务数量是我们自己定的),派发完任务之后利用send发送给子进程。

void DisPatchTasks()
{int who = 0;// 派发任务int num = 20;while (num--){// a.选择一个任务,整数int task = tmp.SelectTask();// b.选择一个子进程channelChannel &curr = channels[who];who++;who %= channels.size(); // 防止数组越界cout << "########################" << endl;cout << "send" << task << "to" << curr.Name() << ",任务还剩" << num << "个" << endl;cout << "########################" << endl;// c.派发任务curr.Send(task);sleep(1);}
}

任务的执行逻辑(Work)

子进程进入Work之后不会出子进程,会一直死循环在这个Work当中,只有当写端关闭才会break掉

void Worker()
{while(true){int cmd = 0;int n = read(0,&cmd,sizeof(cmd));if(n == sizeof(cmd)) tmp.Excute(cmd);else if(n == 0)//写端关闭{cout<<"pid: "<<getpid()<<"quit..."<<endl;break;}}
}

CleanProcessPool()

最后回收管道并且回收子进程即可

 void CleanProcessPool(){for (auto &c : channels){// 调用关闭函数c.Close();}for (auto &e : channels){// 0:阻塞式等待pid_t rid = waitpid(e.Id(), nullptr, 0);if (rid > 0){cout << "child " << rid << " wait...sucess" << endl;}}}

封装

main.cc

#include "ProcessPool.hpp"
#include "Task.hpp"//我们自己就是master进程
int main(int argc, char *argv[])
{//可执行程序的名字占一个,后面的数字占一个if(argc != 2){Usage(argv[0]);return UsageError;}//将字符串转化为整数,表示一共创建多少个子进程int num = stoi(argv[1]);// vector<Channel> channels;//将所有的管道存储在容器当中// //初始化进程池// InitProcesspool(num,channels,Worker);// //派发任务// DisPatchTasks(channels);// //退出进程池// CleanProcessPool(channels);ProcessPool *pp = new ProcessPool(num,Worker);pp->InitProcesspool();pp->DisPatchTasks();pp->CleanProcessPool();delete pp;return 0;
}

Channel.hpp

#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__#include <iostream>
#include <string>
#include <unistd.h>
using namespace std;   #endif//先描述再组织
class Channel
{
public://构造函数Channel(int wfd,pid_t who):_wfd(wfd),_who(who){_name="Channel-"+to_string(wfd)+'-'+to_string(who);}//获取名字string Name(){return _name;}void Send(int cmd){write(_wfd,&cmd,sizeof(cmd));}void Close(){close(_wfd);}pid_t Id(){return _who;}//析构函数~Channel(){}
private:int _wfd;//master的写端string _name;//对应worker的名字pid_t _who;//记录哪个子进程的管道
};

ProcessPool.hpp

#include <string>
#include <unistd.h>
#include <cstdlib>
#include <vector>
#include <sys/types.h>
#include <functional>
#include <sys/wait.h>
#include "Channel.hpp"
#include "Task.hpp"// typedef function<void()> work_t;
using work_t = function<void()>;enum
{OK = 0,UsageError,PipeError,ForkError,
};class ProcessPool
{
public:ProcessPool(int n,work_t w):num(n),work(w){}~ProcessPool(){}// work_t work:回调int InitProcesspool(){for (int i = 0; i < num; i++){// 1. 创建管道// 管道初始化成0int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return PipeError; // 管道创建失败// 2. 创建进程pid_t id = fork();if (id < 0)return ForkError; // 子进程创建失败// 创建通信信道if (id == 0){close(pipefd[1]); // 关闭子进程的写窗口// 子进程// 原本读的是pipefd0的内容,现在重定向到标准输入,现在读端读取的就是标准输入dup2(pipefd[0], 0);work();  // 子进程需要做的工作exit(0); // 子进程干完工作直接结束}// 因为上面子进程执行完任务会直接退出,所以下面的代码只能是父进程执行close(pipefd[0]); // 关闭父进程的读窗口channels.emplace_back(pipefd[1], id);}return OK;}void DisPatchTasks(){int who = 0;// 派发任务int num = 20;while (num--){// a.选择一个任务,整数int task = tmp.SelectTask();// b.选择一个子进程channelChannel &curr = channels[who];who++;who %= channels.size(); // 防止数组越界cout << "########################" << endl;cout << "send" << task << "to" << curr.Name() << ",任务还剩" << num << "个" << endl;cout << "########################" << endl;// c.派发任务curr.Send(task);sleep(1);}}void CleanProcessPool(){for (auto &c : channels){// 调用关闭函数c.Close();}for (auto &e : channels){// 0:阻塞式等待pid_t rid = waitpid(e.Id(), nullptr, 0);if (rid > 0){cout << "child " << rid << " wait...sucess" << endl;}}}void DebugPrint(){for (auto e : channels){cout << e.Name() << endl;}}
private:vector<Channel> channels;int num;work_t work;
};

Task.hpp

#pragma once#include <iostream>
#include <unordered_map>
#include <functional>
#include <ctime>
using namespace std;using task_t = function<void()>;void Download()
{cout<<"我是下载任务....."<<endl;
}void Log()
{cout<<"我是日志任务....."<<endl;
}void Sql()
{cout<<"我是数据库同步任务....."<<endl;
}static int number = 0;
class TaskManager
{
public:TaskManager(){//srand(time(nullptr));InsertTask(Download);InsertTask(Log);InsertTask(Sql);}~TaskManager(){}//插入任务void InsertTask(task_t t){//number表示有多少个任务tasks[number++]=t;}//选择任务int SelectTask(){//选择一个任务return rand() % number;}//执行任务void Excute(int number){//没找到说明方法是空的if(tasks.find(number) == tasks.end()) return;//找到了执行这个方法tasks[number]();}
private://int(任务码)--->任务种类unordered_map<int,task_t> tasks;
};TaskManager tmp;//函数作用:传进来的参数不对,这个函数就告诉我们如何传递参数
void Usage(string proc)
{cout<<"Usage: "<<proc<<"process-num"<<endl;
}void Worker()
{while(true){int cmd = 0;int n = read(0,&cmd,sizeof(cmd));if(n == sizeof(cmd)) tmp.Excute(cmd);else if(n == 0)//写端关闭{cout<<"pid: "<<getpid()<<"quit..."<<endl;break;}else//读取失败{}}
}

Makefile

BIN=processpool
cc=g++
FLAGS=-c -Wall -std=c++11
LDFLAGS=-o
SRC=$(shell ls *.cc)
OBJ=$(SRC:.cc=.o)$(BIN):$(OBJ)$(cc) $(LDFLAGS) $@ $^
%.o:%.cc$(cc) $(FLAGS) $<
.PHONY:clean
clean:rm -f $(BIN) $(OBJ)
.PHONY:test
test:echo $(SRC)

在这里插入图片描述

总结

总结

本文详细介绍了进程池的概念及其在实际应用中的作用。通过代码模拟,我们展示了如何初始化进程池、分发任务、执行任务逻辑以及清理进程池。文章还涵盖了相关的封装类和文件结构,如main.ccChannel.hppProcessPool.hppTask.hppMakefile,这些内容为理解和实现进程池提供了全面的指导。

进程池是一种有效的资源管理技术,能够提高多任务处理的效率和系统性能。通过合理的设计和实现,进程池可以在复杂的系统中发挥重要作用,减少资源浪费并提升任务执行的稳定性。希望本文的内容能为读者在实际项目中应用进程池提供有价值的参考。


http://www.ppmy.cn/news/1573120.html

相关文章

(arxiv2411) CARE Transformer

作者提出了两个问题&#xff0c;问题 1&#xff1a;堆叠是充分利用局部归纳偏差和长距离信息优势的最佳方法吗&#xff1f; 问题 2&#xff1a;是否有可能同时提高线性视觉 Transformer 的效率和准确性&#xff1f; 为了解决这两个问题&#xff0c;作者提出了一种 deCoupled du…

【三维重建】FeatureGS:特征值优化的几何精度和伪影减少3DGS的重构

文章&#xff1a;https://arxiv.org/pdf/2501.17655 标题&#xff1a;FeatureGS: Eigenvalue-Feature Optimization in 3D Gaussian Splatting for Geometrically Accurate and Artifact-Reduced Reconstruction 文章目录 摘要一、引言二、相关工作&#xff1a;3D特征三、算法3…

根据deepseek模型微调训练自动驾驶模型及数据集的思路

以下是使用DeepSeek模型微调训练自动驾驶模型的详细步骤和代码示例。本流程假设你已有自动驾驶领域的数据集&#xff08;如驾驶指令、传感器数据等&#xff09;&#xff0c;并基于PyTorch框架实现。 Step 1: 环境准备 # 安装依赖库 pip install torch transformers datasets n…

网络安全要学python 、爬虫吗

网络安全其实并不复杂&#xff0c;只是比普通开发岗位要学习的内容多一点。无论是有过编程基础还是零基础的都可以学习的。网络安全目前可就业的岗位从技术上可分为两部分&#xff1a;web安全和二进制逆向安全。web安全是网络安全的入门方向&#xff0c;内容简单&#xff0c;就…

使用 Flask 和 pdfkit 生成带透明 PNG 盖章的 PDF 并上传到阿里云 OSS

在现代 Web 开发中&#xff0c;生成 PDF 文档并在其上添加盖章是常见的需求。本文将详细介绍如何使用 Flask 框架和 pdfkit 库来批量生成 PDF&#xff0c;并在其中添加透明 PNG 盖章&#xff0c;最后将生成的 PDF 上传到阿里云 OSS&#xff08;对象存储服务&#xff09;。 环境…

渲染相机设置 pyrender cameralib

目录 cameralib 设置相机 numpy获取相机参数: pyrender设置相机: hmr2渲染设置 multi_hmr获取cam_t cameralib 设置相机 cameralib安装教程: cameralib 安装-CSDN博客 import cameralibcamera = cameralib.Camera.from_fov(fov_degrees=55, imshape=(720,1280))intri…

Java 大视界 -- 开源社区对 Java 大数据发展的推动与贡献(91)

&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎来到 青云交的博客&#xff01;能与诸位在此相逢&#xff0c;我倍感荣幸。在这飞速更迭的时代&#xff0c;我们都渴望一方心灵净土&#xff0c;而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识&#xff0c;也…

tomcat html乱码

web tomcat html中文乱码 将html文件改成jsp <% page language"java" contentType"text/html; charsetUTF-8" pageEncoding"UTF-8"%>添加 <meta charset"UTF-8">