Linux--构建进程池

news/2024/9/23 10:03:44/

目录

 1.进程池

1.1.我们先完成第一步,创建子进程和信道

1.2. 通过channel控制,发送任务

1.3回收管道和子进程

1.4进行测试

1.5完整代码


 1.进程池

进程池其产生原因主要是为了优化大量任务需要多进程完成时频繁创建和删除进程所带来的资源消耗,以及更好的实现多个进程间的协同。以下是关于进程池的一些关键点:

  1. 组成:进程池技术主要由两部分组成:资源进程和管理进程。资源进程是预先创建好的空闲进程,等待管理进程分发任务(maste向哪个管道写入,就是唤醒哪一个子进程处理任务)。管理进程则负责创建这些资源进程,将工作分配给空闲的资源进程处理,并在工作完成后回收这些资源进程。(父进程要进行后端任务的负载均衡)
  2. 管理:管理进程如何有效地管理资源进程是关键。这涉及到分配任务给资源进程、回收空闲资源进程等操作。管理进程和资源进程之间需要进行交互,这种交互可以通过管道(需要子进程执行一个任务我们就可以派发一个管道,连接一个子进程,执行任务)
  3. 使用场景:当我们需要并行的处理大规模任务时,比如服务器处理大量客户端的任务,进程池是一个很好的选择。它可以避免频繁创建和销毁进程的开销,提高应用的响应速度。
  4. 优化:使用进程池可以避免动态创建和销毁进程的开销,提高应用的性能和稳定性。此外,进程池还可以根据任务的执行情况尽量减少创建的进程数量,最多创建指定个数的进程。

代码实现:

        实现进程池首先我们需要提前准备好一批数量的进程,当一个任务队列被提交到线程池,每个工作进程都会不断从任务队列中取出任务并执行。


1.1.我们先完成第一步,创建子进程和信道

        

        Channel类是来管理一个子进程和与之相关联的管道写端,这个Channel类就是信道。用于一个需要管理多个子进程和它们各自管道写端的程序中。例如,你可能有一个父进程,它创建多个子进程,每个子进程都通过管道与父进程通信。在这种情况下,你可以为每个子进程和它的管道写端创建一个Channel对象,并使用这些对象来管理它们。

class Channel
{
public:Channel(int wfd, pid_t id, const std::string &name): _wfd(wfd), _subprocessid(id), _name(name){}int GetWfd() { return _wfd; }pid_t GetProcessId() { return _subprocessid; }std::string GetName() { return _name; }void CloseChannel(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subprocessid, nullptr, 0);if (rid > 0){std::cout << "wait " << rid << " success" << std::endl;}}~Channel(){}private:int _wfd;//写端fdpid_t _subprocessid;//子进程id'std::string _name;//一个信道的名称
};

        下面函数的主要逻辑是创建多个管道和相应的子进程,每个子进程都将从它自己的管道中读取数据,而父进程则保留管道的写端以便后续向子进程发送数据。同时,这个函数也维护了一个 Channel 对象的向量(std::vector<Channel>),其中每个 Channel 对象代表一个与特定子进程和管道写端相关联的通道。

        task_t是一个函数指针类型,是为了泛型,将来可以传任意不同的任务交给子进程做。

可以理解为回调函数 task 的使用降低了任务与子进程之间的耦合度。在传统的编程模型中,你可能会为每个子进程编写特定的代码块,这些代码块直接嵌入在创建子进程的函数中。这样做会导致代码的高耦合性,因为每个子进程的行为都紧密地绑定在创建它们的函数中。(关于task的应用见下文)

        dup2(pipefd[0], 0)将管道的读端,重定向到标准输入。意思就是管道的读端变成了标准输入,无需修改代码以处理文件描述符,因为以后任意的子进程去读取任务,都是从标准输入去读取的。

        在子进程中,它关闭了管道的写端,将读端重定向到标准输入,执行给定的回调函数 task,然后关闭读端并退出。

void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{// BUG? --> fix bugfor (int i = 0; i < num; i++){// 1. 创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)exit(1);// 2. 创建子进程pid_t id = fork();if (id == 0){if (!channels->empty()){// 第二次之后,开始创建的管道for(auto &channel : *channels) channel.CloseChannel();}// child - readclose(pipefd[1]);dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入task();close(pipefd[0]);exit(0);}// 3.构建一个channel名称std::string channel_name = "Channel-" + std::to_string(i);// 父进程close(pipefd[0]);// a. 子进程的pid b. 父进程关心的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));}
}

提供一个任务文件,接下来让子进程执行任务:

#pragma once#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>#define TaskNum 3typedef void (*task_t)(); // task_t 函数指针类型void Print()
{std::cout << "I am print task" << std::endl;
}
void DownLoad()
{std::cout << "I am a download task" << std::endl;
}
void Flush()
{std::cout << "I am a flush task" << std::endl;
}task_t tasks[TaskNum];void LoadTask()
{srand(time(nullptr) ^ getpid() ^ 17777);tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush;
}void ExcuteTask(int number)
{if (number < 0 || number > 2)return;tasks[number]();
}int SelectTask()
{return rand() % TaskNum;
}void work()
{while (true){int command = 0;int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is : " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process : " << getpid() << " quit" << std::endl;break;}}
}
  • 初始化阶段:LoadTask()函数被调用,设置tasks函数指针数组以包含所有可用的任务函数,通过索引调用三个具体的任务函数:Print()DownLoad(), 和 Flush()。
  • 任务选择阶段:SelectTask()函数在需要时用于生成一个随机任务索引。
  • 任务执行阶段:在子进程中,work()函数不断监听标准输入,等待一个任务索引。一旦接收到索引,就调用ExcuteTask()来执行相应的任务。

子进程退出:当子进程从标准输入读取到0时,它知道应该退出循环并结束。


1.2. 通过channel控制,发送任务

int NextChannel(int channelnum)
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}
void SendTaskCommand(Channel &channel, int taskcommand)
{write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}
void ctrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 选择一个任务int taskcommand = SelectTask();// b. 选择一个信道和进程int channel_index = NextChannel(channels.size());// c. 发送任务SendTaskCommand(channels[channel_index], taskcommand);std::cout << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: "<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void ctrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){ctrlProcessOnce(channels);}}else{while (true){ctrlProcessOnce(channels);}}
}
  1.   NextChannel 函数是一个轮询选择器,用于在给定的通道数量(channelnum)中循环选择一个通道索引。它使用了一个静态变量 next 来跟踪上一次选择的索引,并在每次调用时递增这个索引。
  2.  SendTaskCommand 函数用于向指定的 Channel 对象(代表一个与子进程的通信通道)发送一个任务命令(taskcommand。它通过调用 Channel 对象的 GetWfd 方法获取管道的写端文件描述符,然后使用 write 系统调用来将任务命令写入管道。这样,子进程就可以从管道中读取这个命令并执行相应的任务。
  3.  ctrlProcessOnce 函数模拟了控制进程的一次操作。调用 NextChannel 函数来选择一个通道索引,并通过该索引从 channels 向量中获取一个 Channel 对象。然后,它调用 SendTaskCommand 函数向选定的通道发送任务命令。最后,它打印出相关的信息,包括任务命令、通道名称和子进程的进程ID,以便进行调试和监控。               
  4. ctrlProcess 函数是控制进程的主循环。它接受一个 channels 向量和一个可选的 times 参数。如果 times 大于0,则控制进程会循环执行 ctrlProcessOnce 函数 times 次;如果 times 小于或等于0(默认为-1),则控制进程会无限循环地执行 ctrlProcessOnce 函数

1.3回收管道和子进程

class Channel
{
public:Channel(int wfd, pid_t id, const std::string &name): _wfd(wfd), _subprocessid(id), _name(name){}int GetWfd() { return _wfd; }pid_t GetProcessId() { return _subprocessid; }std::string GetName() { return _name; }void CloseChannel(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subprocessid, nullptr, 0);if (rid > 0){std::cout << "wait " << rid << " success" << std::endl;}}~Channel(){}private:int _wfd;//写端fdpid_t _subprocessid;//子进程id'std::string _name;//一个信道的名称
};void CleanUpChannel(std::vector<Channel> &channels)
{for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}

a. 关闭所有的写端 b. 回收子进程

注意:一定是先关闭全部读端,然后统一读端,而不能关一个,等待一个。

在执行子进程创建逻辑的时候,是存在bug的。

我们在创建第一个信道的时候是没有什么问题的:

接着来看,我们创建第二个信道会发生什么:fd[4]的指向被第二个信道的子进程继承了,那么第一个管道的写端就个fd指向了,这就是坑

如果继续创建更多的信道,那么这种情况会一直累积,只有最后一个文件只有一个写端。如果我们是关一个等待一个,那么第一个信道,只会关闭一个读端,但还有其它的读端,这就会发生进程阻塞,导致管道文件一直在等待读取,无法被释放;如果统一的关闭读端,那么当关闭到最后一个信道的时候,最后一个信道读端只有一个,管道文件会被释放,然后就会递归式的逆向关闭其它信道的所有读端,最后将所有的管道文件释放,父进程只需要一 一等待,就能回收所有子进程了。


1.4进行测试

子进程执行任务成功,父进程回收子进程成功。


1.5完整代码

Task.hpp

#pragma once#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>#define TaskNum 3typedef void (*task_t)(); // task_t 函数指针类型void Print()
{std::cout << "I am print task" << std::endl;
}
void DownLoad()
{std::cout << "I am a download task" << std::endl;
}
void Flush()
{std::cout << "I am a flush task" << std::endl;
}task_t tasks[TaskNum];void LoadTask()
{srand(time(nullptr) ^ getpid() ^ 17777);tasks[0] = Print;tasks[1] = DownLoad;tasks[2] = Flush;
}void ExcuteTask(int number)
{if (number < 0 || number > 2)return;tasks[number]();
}int SelectTask()
{return rand() % TaskNum;
}void work()
{while (true){int command = 0;int n = read(0, &command, sizeof(command));if (n == sizeof(int)){std::cout << "pid is : " << getpid() << " handler task" << std::endl;ExcuteTask(command);}else if (n == 0){std::cout << "sub process : " << getpid() << " quit" << std::endl;break;}}
}

test.cc

#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"class Channel
{
public:Channel(int wfd, pid_t id, const std::string &name): _wfd(wfd), _subprocessid(id), _name(name){}int GetWfd() { return _wfd; }pid_t GetProcessId() { return _subprocessid; }std::string GetName() { return _name; }void CloseChannel(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subprocessid, nullptr, 0);if (rid > 0){std::cout << "wait " << rid << " success" << std::endl;}}~Channel(){}private:int _wfd;pid_t _subprocessid;std::string _name;
};//  task_t task: 回调函数
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{// BUG? --> fix bugfor (int i = 0; i < num; i++){// 1. 创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)exit(1);// 2. 创建子进程pid_t id = fork();if (id == 0){if (!channels->empty()){// 第二次之后,开始创建的管道for(auto &channel : *channels) channel.CloseChannel();}// child - readclose(pipefd[1]);dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入task();close(pipefd[0]);exit(0);}// 3.构建一个channel名称std::string channel_name = "Channel-" + std::to_string(i);// 父进程close(pipefd[0]);// a. 子进程的pid b. 父进程关心的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));}
}int NextChannel(int channelnum)
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}void SendTaskCommand(Channel &channel, int taskcommand)
{write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}
void ctrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 选择一个任务int taskcommand = SelectTask();// b. 选择一个信道和进程int channel_index = NextChannel(channels.size());// c. 发送任务SendTaskCommand(channels[channel_index], taskcommand);std::cout << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: "<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void ctrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){ctrlProcessOnce(channels);}}else{while (true){ctrlProcessOnce(channels);}}
}void CleanUpChannel(std::vector<Channel> &channels)
{for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;return 1;}int num = std::stoi(argv[1]);LoadTask();std::vector<Channel> channels;// 1. 创建信道和子进程CreateChannelAndSub(num, &channels, work);// 2. 通过channel控制子进程ctrlProcess(channels, 10);// 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程CleanUpChannel(channels);// sleep(100);return 0;
}#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"class Channel
{
public:Channel(int wfd, pid_t id, const std::string &name): _wfd(wfd), _subprocessid(id), _name(name){}int GetWfd() { return _wfd; }pid_t GetProcessId() { return _subprocessid; }std::string GetName() { return _name; }void CloseChannel(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subprocessid, nullptr, 0);if (rid > 0){std::cout << "wait " << rid << " success" << std::endl;}}~Channel(){}private:int _wfd;pid_t _subprocessid;std::string _name;
};//  task_t task: 回调函数
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{// BUG? --> fix bugfor (int i = 0; i < num; i++){// 1. 创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)exit(1);// 2. 创建子进程pid_t id = fork();if (id == 0){if (!channels->empty()){// 第二次之后,开始创建的管道for(auto &channel : *channels) channel.CloseChannel();}// child - readclose(pipefd[1]);dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入task();close(pipefd[0]);exit(0);}// 3.构建一个channel名称std::string channel_name = "Channel-" + std::to_string(i);// 父进程close(pipefd[0]);// a. 子进程的pid b. 父进程关心的管道的w端channels->push_back(Channel(pipefd[1], id, channel_name));}
}int NextChannel(int channelnum)
{static int next = 0;int channel = next;next++;next %= channelnum;return channel;
}void SendTaskCommand(Channel &channel, int taskcommand)
{write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}
void ctrlProcessOnce(std::vector<Channel> &channels)
{sleep(1);// a. 选择一个任务int taskcommand = SelectTask();// b. 选择一个信道和进程int channel_index = NextChannel(channels.size());// c. 发送任务SendTaskCommand(channels[channel_index], taskcommand);std::cout << std::endl;std::cout << "taskcommand: " << taskcommand << " channel: "<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void ctrlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){ctrlProcessOnce(channels);}}else{while (true){ctrlProcessOnce(channels);}}
}void CleanUpChannel(std::vector<Channel> &channels)
{for (auto &channel : channels){channel.CloseChannel();}for (auto &channel : channels){channel.Wait();}
}int main(int argc, char *argv[])
{if (argc != 2){std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;return 1;}int num = std::stoi(argv[1]);LoadTask();std::vector<Channel> channels;// 1. 创建信道和子进程CreateChannelAndSub(num, &channels, work);// 2. 通过channel控制子进程ctrlProcess(channels, 10);// 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程CleanUpChannel(channels);// sleep(100);return 0;
}


http://www.ppmy.cn/news/1463737.html

相关文章

查询指定会话免打扰

查询指定用户&#xff08;requestId) 为指定会话&#xff08;targetId&#xff09;的设置的免打扰状态。 提示 该设置为用户级别设置。对应的设置接口详见设置指定会话免打扰。 请求方法 POST&#xff1a; https://数据中心域名/conversation/notification/get.json 频率限…

白酒:不同产地白酒的风格特点与比较

云仓酒庄豪迈白酒&#xff0c;作为中国白酒的一部分&#xff0c;其风格特点深受产区的影响。不同产地的白酒&#xff0c;由于自然环境、酿造工艺等因素的差异&#xff0c;形成了各自与众不同的风味和特点。下面让云仓酒庄豪迈白酒来比较一下不同产地白酒的风格特点。 首先&…

VPN的详细理解

VPN&#xff08;Virtual Private Network&#xff0c;虚拟私人网络&#xff09;是一种在公共网络上建立加密通道的技术&#xff0c;通过这种技术可以使远程用户访问公司内部网络资源时&#xff0c;实现安全的连接和数据传输。以下是对VPN的详细介绍&#xff1a; 选择代理浏览器…

【夏之以寒-Kafka专栏 01】Kafka的消息是采用Pull模式还是Push模式?

作者名称&#xff1a;夏之以寒 作者简介&#xff1a;专注于Java和大数据领域&#xff0c;致力于探索技术的边界&#xff0c;分享前沿的实践和洞见 文章专栏&#xff1a;夏之以寒-kafka专栏 专栏介绍&#xff1a;本专栏旨在以浅显易懂的方式介绍Kafka的基本概念、核心组件和使用…

【文生漫画系统】小说推文快速生成漫画短视频,搭建一款属于自己的系统,在使用的同时又能运营。

当前热门小说推文的推广方式&#xff0c;又更新了。 从传统的解压视频或者跑酷视频&#xff0c;到现在的漫画形式。涨粉的速度是非常快速的。 所以在做小说推广项目的可以了解一下这款系统。 只需三步就可以生成漫画短视频了 一、文生漫画短视频怎么生成&#xff1f; 二、系…

832. 翻转图像 - 力扣

1. 题目 给定一个 n x n 的二进制矩阵 image &#xff0c;先 水平 翻转图像&#xff0c;然后 反转 图像并返回 结果 。 水平翻转图片就是将图片的每一行都进行翻转&#xff0c;即逆序。 例如&#xff0c;水平翻转 [1,1,0] 的结果是 [0,1,1]。 反转图片的意思是图片中的 0 全部被…

基于Freertos的工训机器人

一. 工训机器人 V1 1. 实物 将自制的F4开发板放置车底板下方&#xff0c;节省上方空间&#xff0c;且能保证布线方便整齐。 2. SW仿真 使用SolidWorks进行仿真&#xff0c;且绘制3D打印件。 工训仿真 3.3D打印爪测试 机械爪测试 二. 工训机器人 V2 1. 实物 工训机器人V2不同于…

详解布隆过滤器(含面试考点)

Bloom Filter 底层逻辑主要代码实现解析&#xff08;以C为例&#xff09;优缺点应用场景面试常问问题1&#xff1a;什么是布隆过滤器&#xff1f;问题2&#xff1a;布隆过滤器如何处理误报&#xff1f;问题3&#xff1a;如何设计布隆过滤器以最小化误报率&#xff1f;问题4&…