构建高效的进程池:深入解析C++实现

server/2025/1/15 10:29:03/

构建高效的进程池:深入解析C++实现

在高性能计算和服务器应用中,进程池(Process Pool)是一种重要的设计模式。它通过预先创建和维护一定数量的子进程来处理大量的任务请求,从而避免频繁地创建和销毁进程带来的开销。本文将通过一个具体的C++代码示例,详细解析如何实现一个简单但功能完善的进程池。


目录

  1. 什么是进程池
  2. 为什么使用进程池
  3. 代码概述
  4. 关键组件解析
    • Channel 类
    • 创建通道与子进程
    • 任务发送与控制
    • 任务定义与执行
  5. 运行示例
  6. 总结

什么是进程池

进程池是一种预先创建并维护一定数量的子进程的技术,这些子进程可以重复使用来执行多个任务。与每次任务都创建新进程相比,进程池能够显著减少进程创建和销毁带来的系统开销,提升系统性能和响应速度。


为什么使用进程池

使用进程池有以下几个主要优势:

  1. 性能提升:减少频繁的进程创建和销毁带来的系统开销。
  2. 资源管理:通过限制进程数量,避免系统资源被过度消耗。
  3. 响应速度:预先存在的子进程能够更快地响应任务请求。
  4. 稳定性:统一管理子进程,提高系统的稳定性和可维护性。

代码概述

本文将解析以下两个主要的C++文件:

  1. main.cc:包含进程池的核心逻辑,包括子进程的创建、任务的分发和进程的管理。
  2. task.hpp:定义具体的任务函数和任务执行逻辑。

main.cc

#include <iostream>
#include <unistd.h>
#include <cerrno>
#include <cstring>
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include "task.hpp"class Channel
{
public:Channel(int wfd, pid_t subprocesspid, std::string name): _wfd(wfd), _subprocesspid(subprocesspid), _name(name){}int getWfd() const { return _wfd; }pid_t getSubprocesspid() const { return _subprocesspid; }std::string getName() const { return _name; }void Wait(){int status;if (waitpid(_subprocesspid, &status, 0) == -1){std::cerr << "waitpid failed: " << strerror(errno) << std::endl;}if (WIFEXITED(status)){std::cout << _name << " exited with status " << WEXITSTATUS(status) << std::endl;}else{std::cerr << _name << " exited abnormally" << std::endl;}}void CloseChannel(){if (close(_wfd) == -1){std::cerr << "close " << _name << " failed: " << strerror(errno) << std::endl;}}private:int _wfd;pid_t _subprocesspid;std::string _name;
};void CreateChannelsandSubprocesses(int subprocessnum, std::vector<Channel> *channels, task_t task)
{for (int i = 0; i < subprocessnum; i++){int pipefd[2];if (pipe(pipefd) == -1){std::cerr << "pipe failed: " << strerror(errno) << std::endl;exit(1);}pid_t id = fork();if (id == -1){std::cerr << "fork failed:" << strerror(errno) << std::endl;}if (id == 0){if (i != 0){for (int j = 0; j < i; j++){close((*channels)[j].getWfd());}}close(pipefd[1]);dup2(pipefd[0], STDIN_FILENO);task();close(pipefd[0]);exit(0);}close(pipefd[0]);channels->push_back(Channel(pipefd[1], id, "subprocess" + std::to_string(i)));}
}int NextChannelIndex(int size)
{static int index = 0;return index++ % size;
}void SendTask(int wfd, int tasknum)
{if (write(wfd, &tasknum, sizeof(tasknum)) == -1){std::cerr << "write failed: " << strerror(errno) << std::endl;}
}void controlProcessonce(std::vector<Channel> &channels)
{int tasknum = Selecttask();int channel_index = NextChannelIndex(channels.size());SendTask(channels[channel_index].getWfd(), tasknum);
}void controlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){controlProcessonce(channels);}}else{while (1){controlProcessonce(channels);}}
}void cleanupchannels(std::vector<Channel> &channels)
{for (auto &channel : channels){channel.CloseChannel();channel.Wait();}
}int main(int argc, char *argv[])
{if (argc < 2){std::cerr << "Usage:" << argv[0] << " subprocessnum" << std::endl;return 1;}int subprocessnum = atoi(argv[1]);if (subprocessnum <= 0){std::cerr << "subprocessnum must be greater than 0" << std::endl;return 1;}Loadtask();std::vector<Channel> channels;// 1. 创建子进程并建立通信通道CreateChannelsandSubprocesses(subprocessnum, &channels, work1);// 2. 控制子进程执行任务controlProcess(channels, 10);// 3. 关闭通信通道并清理资源cleanupchannels(channels);return 0;
}

task.hpp

#pragma once
#include <iostream>
#include <unistd.h>
#include <cerrno>
#include <cstring>
#include <string>
#include <vector>
#include <cstdlib>#define TaskNum 3
typedef void (*task_t)(); // task_t 是函数指针类型task_t task[TaskNum];void task1()
{std::cout << "task1" << std::endl;
}void task2()
{std::cout << "task2" << std::endl;
}void task3()
{std::cout << "task3" << std::endl;
}void ExecuteTask(int count)
{if (count >= 0 && count < TaskNum){task[count]();}else{std::cerr << "task index out of range" << std::endl;}
}void work()
{while (1){int count = 0;int n = read(0, &count, sizeof(count));if (n == -1){std::cerr << "read failed: " << strerror(errno) << std::endl;exit(1);}if (n == sizeof(count)){std::cout << "pid is : " << getpid() << " handling task" << std::endl;ExecuteTask(count);}if (n == 0){std::cout << "read EOF" << std::endl;exit(0);}}
}void work1()
{while (1){int count = 1;int n = read(0, &count, sizeof(count));if (n == -1){std::cerr << "read failed: " << strerror(errno) << std::endl;exit(1);}if (n == sizeof(count)){std::cout << "pid is : " << getpid() << " handling task" << std::endl;ExecuteTask(count);}if (n == 0){std::cout << "read EOF" << std::endl;exit(0);}}
}void work2()
{while (1){int count = 2;int n = read(0, &count, sizeof(count));if (n == -1){std::cerr << "read failed: " << strerror(errno) << std::endl;exit(1);}if (n == sizeof(count)){std::cout << "pid is : " << getpid() << " handling task" << std::endl;ExecuteTask(count);}if (n == 0){std::cout << "read EOF" << std::endl;exit(0);}}
}int Selecttask()
{return rand() % TaskNum;
}void Loadtask()
{task[0] = task1;task[1] = task2;task[2] = task3;
}

关键组件解析

Channel 类

Channel 类用于管理与每个子进程之间的通信通道。它包含以下成员:

  • _wfd:写端文件描述符,用于向子进程发送任务。
  • _subprocesspid:子进程的 PID,用于管理和等待子进程的结束。
  • _name:通道名称,便于调试和日志记录。
class Channel
{
public:Channel(int wfd, pid_t subprocesspid, std::string name): _wfd(wfd), _subprocesspid(subprocesspid), _name(name){}int getWfd() const { return _wfd; }pid_t getSubprocesspid() const { return _subprocesspid; }std::string getName() const { return _name; }void Wait(){int status;if (waitpid(_subprocesspid, &status, 0) == -1){std::cerr << "waitpid failed: " << strerror(errno) << std::endl;}if (WIFEXITED(status)){std::cout << _name << " exited with status " << WEXITSTATUS(status) << std::endl;}else{std::cerr << _name << " exited abnormally" << std::endl;}}void CloseChannel(){if (close(_wfd) == -1){std::cerr << "close " << _name << " failed: " << strerror(errno) << std::endl;}}private:int _wfd;pid_t _subprocesspid;std::string _name;
};

功能解析

  • 构造函数:初始化写端文件描述符、子进程 PID 和通道名称。
  • Wait():等待子进程结束,并报告其退出状态。
  • CloseChannel():关闭写端文件描述符,释放资源。

创建通道与子进程

CreateChannelsandSubprocesses 函数负责创建指定数量的子进程,并为每个子进程建立一个匿名管道用于通信。

void CreateChannelsandSubprocesses(int subprocessnum, std::vector<Channel> *channels, task_t task)
{for (int i = 0; i < subprocessnum; i++){int pipefd[2];if (pipe(pipefd) == -1){std::cerr << "pipe failed: " << strerror(errno) << std::endl;exit(1);}pid_t id = fork();if (id == -1){std::cerr << "fork failed:" << strerror(errno) << std::endl;}if (id == 0){if (i != 0){for (int j = 0; j < i; j++){close((*channels)[j].getWfd());}}close(pipefd[1]);dup2(pipefd[0], STDIN_FILENO);task();close(pipefd[0]);exit(0);}close(pipefd[0]);channels->push_back(Channel(pipefd[1], id, "subprocess" + std::to_string(i)));}
}

步骤解析

  1. 创建管道:通过 pipe(pipefd) 创建一个匿名管道,pipefd[0] 为读端,pipefd[1] 为写端。
  2. 创建子进程:使用 fork() 创建子进程。
  3. 子进程设置
    • 关闭不需要的写端文件描述符。
    • 使用 dup2 将管道的读端重定向到标准输入 (STDIN_FILENO)。
    • 调用任务函数 task(),开始处理任务。
    • 关闭读端文件描述符并退出子进程。
  4. 父进程设置
    • 关闭管道的读端,保留写端用于发送任务。
    • 将新创建的 Channel 对象添加到 channels 容器中。

任务发送与控制

任务分发逻辑

controlProcessoncecontrolProcess 函数负责从任务池中选择任务,并将任务发送到子进程的通信通道中。

int NextChannelIndex(int size)
{static int index = 0;return index++ % size;
}void SendTask(int wfd, int tasknum)
{if (write(wfd, &tasknum, sizeof(tasknum)) == -1){std::cerr << "write failed: " << strerror(errno) << std::endl;}
}void controlProcessonce(std::vector<Channel> &channels)
{int tasknum = Selecttask();int channel_index = NextChannelIndex(channels.size());SendTask(channels[channel_index].getWfd(), tasknum);
}void controlProcess(std::vector<Channel> &channels, int times = -1)
{if (times > 0){while (times--){controlProcessonce(channels);}}else{while (1){controlProcessonce(channels);}}
}

功能解析

  • NextChannelIndex:使用轮询算法选择下一个子进程的通道索引,确保任务均匀分配。
  • SendTask:通过写端文件描述符向子进程发送任务编号。
  • controlProcessonce:选择一个任务并分发给下一个子进程。
  • controlProcess:根据 times 参数,循环发送指定次数的任务或无限循环发送任务。

任务定义与执行

task.hpp 文件定义了具体的任务函数,以及任务的加载和执行逻辑。

#pragma once
#include <iostream>
#include <unistd.h>
#include <cerrno>
#include <cstring>
#include <string>
#include <vector>
#include <cstdlib>#define TaskNum 3
typedef void (*task_t)(); // task_t 是函数指针类型task_t task[TaskNum];void task1()
{std::cout << "task1" << std::endl;
}void task2()
{std::cout << "task2" << std::endl;
}void task3()
{std::cout << "task3" << std::endl;
}void ExecuteTask(int count)
{if (count >= 0 && count < TaskNum){task[count]();}else{std::cerr << "task index out of range" << std::endl;}
}void work()
{while (1){int count = 0;int n = read(0, &count, sizeof(count));if (n == -1){std::cerr << "read failed: " << strerror(errno) << std::endl;exit(1);}if (n == sizeof(count)){std::cout << "pid is : " << getpid() << " handling task" << std::endl;ExecuteTask(count);}if (n == 0){std::cout << "read EOF" << std::endl;exit(0);}}
}void work1()
{while (1){int count = 1;int n = read(0, &count, sizeof(count));if (n == -1){std::cerr << "read failed: " << strerror(errno) << std::endl;exit(1);}if (n == sizeof(count)){std::cout << "pid is : " << getpid() << " handling task" << std::endl;ExecuteTask(count);}if (n == 0){std::cout << "read EOF" << std::endl;exit(0);}}
}int Selecttask()
{return rand() % TaskNum;
}void Loadtask()
{task[0] = task1;task[1] = task2;task[2] = task3;
}

功能解析

  • 任务函数task1task2task3 分别代表不同的任务逻辑,这里简单地输出任务名称。
  • ExecuteTask:根据接收到的任务编号,调用对应的任务函数。
  • work1:子进程执行的主函数,循环读取来自父进程的任务编号,并执行相应任务。
  • Selecttask:随机选择一个任务编号,用于模拟任务分发。
  • Loadtask:初始化任务数组,将任务函数指针赋值给 task 数组。

清理资源

cleanupchannels 函数负责关闭所有通信通道,并等待子进程结束。

void cleanupchannels(std::vector<Channel> &channels)
{for (auto &channel : channels){channel.CloseChannel();channel.Wait();}
}

运行示例

编译代码

假设文件结构如下:

process_pool/
├── main.cc
└── task.hpp

使用以下命令编译代码:

g++ -o process_pool main.cc

运行程序

假设我们希望创建5个子进程,并分发10个任务:

./process_pool 5

预期输出

程序将创建5个子进程,每个子进程将接收并处理2个任务(总共10个任务)。输出可能如下:

pid is : 12345 handling task
task2
pid is : 12346 handling task
task2
pid is : 12347 handling task
task2
pid is : 12348 handling task
task2
pid is : 12349 handling task
task2
pid is : 12345 handling task
task2
pid is : 12346 handling task
task2
pid is : 12347 handling task
task2
pid is : 12348 handling task
task2
pid is : 12349 handling task
task2
pid is : 12345 handling task
task2
subprocess0 exited with status 0
subprocess1 exited with status 0
subprocess2 exited with status 0
subprocess3 exited with status 0
subprocess4 exited with status 0

注意

  • work1 函数中,子进程固定处理任务编号 1,即 task2
  • 若需要子进程处理不同任务,可调整任务分发逻辑或子进程的工作函数。

总结

本文通过一个具体的C++代码示例,详细解析了如何实现一个简单的进程池。关键步骤包括:

  1. 创建通信通道:使用匿名管道 (pipe) 实现父子进程间的通信。
  2. 创建子进程:通过 fork 创建子进程,并在子进程中执行任务处理函数。
  3. 任务分发:父进程通过写端文件描述符向子进程发送任务编号,实现任务的分发与调度。
  4. 资源管理:通过 Channel 类管理通信通道和子进程,确保资源的正确释放和进程的正常结束。

这种基于进程的任务调度模型适用于需要隔离任务执行环境、处理CPU密集型任务的场景。通过合理的进程池设计,可以显著提升系统的性能和稳定性。

在实际应用中,进程池的实现可以根据具体需求进行优化和扩展,例如:

  • 动态调整子进程数量:根据系统负载动态增加或减少子进程数量。
  • 任务队列:引入任务队列机制,支持任务的排队和优先级调度。
  • 错误处理与重启机制:增强进程池的鲁棒性,自动重启异常终止的子进程。

希望本文能对理解和实现进程池提供有价值的参考


http://www.ppmy.cn/server/158522.html

相关文章

创建和管理表

第十章: 创建和管理表 1.基础知识 1.1一条数据的存储规则 MySQL数据库从大到小依次是数据库函数,数据库,数据表,数据表的行与列 1.2标识符命名规则 数据库名 表名不得超过30个字符,变量名限制为29个 必须只能包括A-Z,a-z,0-9,_共63个字符 数据库名,表名,字段名等对象名中间…

Level2逐笔成交逐笔委托毫秒记录:今日分享优质股票数据20250114

逐笔成交逐笔委托下载 链接: https://pan.baidu.com/s/18YtQiLnt06cPQP1nRXor0g?pwd4k3h 提取码: 4k3h Level2逐笔成交逐笔委托数据分享下载 基于Level2的逐笔成交和逐笔委托数据&#xff0c;这种毫秒级别的记录能分析出许多关键信息&#xff0c;如庄家意图、虚假动作&#…

vue实现淘宝web端,装饰淘宝店铺APP,以及后端设计成能快速响应前端APP

一、前端实现 实现一个类似于淘宝店铺的装饰应用&#xff08;APP&#xff09;是一个复杂的任务&#xff0c;涉及到前端界面设计、拖放功能、模块化组件、数据管理等多个方面。为了简化这个过程&#xff0c;我们可以创建一个基本的 Vue 3 应用&#xff0c;允许用户通过拖放来添…

PHP:构建高效Web应用的强大工具

在当今的数字化时代&#xff0c;Web应用已经成为各行各业不可或缺的一部分。而在众多的编程语言中&#xff0c;PHP&#xff08;Hypertext Preprocessor&#xff09;凭借其简单易学、功能强大、兼容性高等特点&#xff0c;成为了构建Web应用的热门选择。本文将深入探讨PHP的优势…

uniapp 绘制五星评分 精确到半星

uniapp 绘制五星评分 精确到半星 对比上一篇博客 这个很简单上组件 <template><div :class"[imgTypeother?image-box:image-box1]"><img :class"[imgTypeother?image:image1]" v-for"(el,i) in value" :key"i" :sr…

Java 面试中的高频算法题详解

&#x1f496; 欢迎来到我的博客&#xff01; 非常高兴能在这里与您相遇。在这里&#xff0c;您不仅能获得有趣的技术分享&#xff0c;还能感受到轻松愉快的氛围。无论您是编程新手&#xff0c;还是资深开发者&#xff0c;都能在这里找到属于您的知识宝藏&#xff0c;学习和成长…

字符串算法篇——字里乾坤,算法织梦,解构字符串的艺术(下)

文章目录 前言第一章&#xff1a;最长公共前缀1.1 题目链接&#xff1a;https://leetcode.cn/problems/longest-common-prefix/description/1.2 题目分析&#xff1a;1.3 思路讲解&#xff1a;1.4 代码实现&#xff1a; 第二章&#xff1a;最长回文子串2.1 题目链接&#xff1a…

GET 和 POST 请求的详细区别及代码示例

文章目录 前言一、请求参数的处理方式二、安全性和幂等性三、缓存机制四、数据类型支持五、请求体的区别六、示例代码结语 前言 GET 和 POST 是 HTTP 协议中两种最常用的请求方法&#xff0c;它们在如何发送数据、安全性、幂等性等方面有着显著的不同。下面将更深入地探讨这两…