进程通信(进程池的模拟实现) read write函数复习 Linux ─── 第23课

server/2025/3/28 3:55:40/

目录

write和read函数补充:

进程池(process pool)

第一步: 创建并初始化processpool

第二步:主进程对子进程派发任务

补充:

第三步: 子进程执行完退出进程池

回收子进程

Channel.hpp

ProcessPool.hpp

Task.hpp

main.cc

makefile


write和read函数补充:

const char *str = "hello";
write(fd, str, strlen(str));  // 写入 5 个字节 ('h', 'e', 'l', 'l', 'o'),不含 '\0'

write(fd ,str, strlen(str)) 不会将\0读入 ,strlen中不含\0

read如果读的是字符串想打印 ,需要预留一个位置再最后加\0 ,再进行打印

        读的数据如果不想打印 ,不用预留位置

wirte和read都是数据流,它们的行为是严格的字节级原始数据写入和读出,完全按照用户指定的内容和长度进行操作

匿名管道的应用: 进程池

进程池(process pool)

先把进程创建出来,需要什么任务 ,派发什么任务.

让一个进程(master进程) ,给其他进程(work进程)派发任务 

下面实现process pool

第一步: 创建并初始化processpool

master要管理所有的管道 创建channel

创建管道 ,创建子进程 ,用vector管理全部的channel 

#include<iostream>
#include<unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include<vector>
#include<functional>//执行任一方法//typedef std::function<void()> work_t;
using work_t =std::function<void()>;//定义了函数对象类型enum 
{OK=0,UsageError,PipeError,ForkError,CloseError,Dup2Error
};//先描述
class channel
{
public:channel(int wfd ,pid_t who):_wfd(wfd),_who(who){_name ="channel-"+std::to_string(wfd)+"->"+ std::to_string(who);}~channel(){}std::string Name(){return _name;}
private:int _wfd;std::string _name;//channel-3->203444pid_t _who;
};void Worker()
{//read ->stdin
}
void Download()
{//read ->stdin
}//channels是输出型参数
//work_t work 回调方法   使创建子进程与让子进程执行任务是解耦的
int InitProcessPool(std::vector<channel>& channels ,int num ,work_t work)
{//创建指定进程个数for(int i=0 ;i <num; i++){   //管道int Pipefds[2]={0};int n =::pipe(Pipefds);if(n< 0) return PipeError;//子进程pid_t id =::fork();if(id == 0){//child 读int close_ret = ::close(Pipefds[1]);//关闭写通道if(close_ret < 0) return CloseError;int dup2_ret =::dup2(Pipefds[0] ,0);//重定向,让子进程从标准输入中获取要执行的任务。不再使用Pipefds[0]了      if(dup2_ret< 0) return Dup2Error;work();::close(Pipefds[0]);//sleep(10);  DebugPrint用::exit(0);}else if(id < 0){return ForkError;}else{//parentint close_ret = ::close(Pipefds[0]);//关闭读通道if(close_ret < 0) return CloseError;channels.emplace_back(Pipefds[1] , id);}}return OK;}
void DebugProcesspool(std::vector<channel>& channels)
{for( auto &c :channels){std::cout<< c.Name() <<std::endl;}}void Usage(std::string process)
{std::cout<<"Uasge:"<< process <<"process-num"<<std::endl;
}//我们自己是master
int main(int argc ,char* argv[])
{if(argc!=2){Usage(argv[0]);return UsageError;}int num  =std::stoi(argv[1]);std::vector<channel> channels;//1.创建&&初始化进程池InitProcessPool(channels , num ,Worker);// InitProcessPool(channels , num ,Print);// InitProcessPool(channels , num ,Dowload);//DebugProcesspool(channels);sleep(100);return 0;
}

第二步:主进程对子进程派发任务

补充:

        什么是任务? 任务码表示任务  4个字节(int)写 ,4个字节(int)读  

        怎么派发? 让每个子进程任务量相等   

  • 方法1: 轮询
  • 方法2:随机
  • 方法3:历史任务数

第三步: 子进程执行完退出进程池

派发完所有的任务,子进程读取完 ,都在read阻塞 ,此时关闭子进程的两种方法

  1. 向子进程发送退出任务
  2. 利用管道写端关闭,读端读到0 ,子进程会自己退出的特性(推荐)

回收子进程

这里有一个藏得很深的bug

        在创建多管道时 ,子进程会继承父进程的fd表 ,就会导致管道的写端被越来越多的子进程拿到,引用计数++ ,释放master进程的全部的wfd后 ,管道的写端还链接着子进程 ,就不能使上面的方法2成功,

解决方法: 

  1. 倒着关闭master的wfd(最后一个管道只有一个wfd(在master,子进程都没有最后一个管道的wfd) ,倒着关闭子进程 ,此子进程的wfd也会跟着关闭)
  2. 在创建子进程时关闭历史wfd

通过上图发现父进程的_wfd从4开始递增

子进程的_rfd都是3

Channel.hpp


#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__#include <iostream>
#include <string>
#include <unistd.h>//先描述
class channel
{
public:channel(int wfd ,pid_t who):_wfd(wfd),_who(who){_name =" channel-"+std::to_string(wfd)+"->"+ std::to_string(who);}~channel(){}void Send(int tasknum){::write(_wfd ,&tasknum ,sizeof(tasknum)); //write第二个参数是const void *buf}std::string Name(){return _name;}void Close(){::close(_wfd);}pid_t Id(){return _who;}int WFD(){return _wfd;}
private:int _wfd;std::string _name;//channel-3->203444pid_t _who;
};#endif

ProcessPool.hpp

#include<iostream>
#include<unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include<vector>
#include<functional>//执行任一方法
#include"Task.hpp"
#include"Channel.hpp"enum 
{OK=0,UsageError,PipeError,ForkError,CloseError,Dup2Error
};
//typedef std::function<void()> work_t;
using work_t =std::function<void()>;//定义了函数对象类型class ProcessPool
{
public:ProcessPool(int num ,work_t w):num(num),work(w){}//channels是输出型参数
//work_t work 回调方法   使创建子进程与让子进程执行任务是解耦的
int InitProcessPool()
{//创建指定进程个数for(int i=0 ;i <num; i++){   //管道int Pipefds[2]={0};int n =::pipe(Pipefds);if(n< 0) return PipeError;//子进程pid_t id =::fork();if(id == 0){//child 读// 子进程关闭历史wfdfor(auto &c : channels){std::cout  << c.WFD() << " ";c.Close();}int close_ret = ::close(Pipefds[1]);//关闭写通道if(close_ret < 0) return CloseError;int dup2_ret =::dup2(Pipefds[0] ,0);//重定向,让子进程从标准输入中获取要执行的任务。不再使用Pipefds[0]了      if(dup2_ret< 0) return Dup2Error;//子进程从管道拿任务 ,执行任务work();//work()退出后exit::close(Pipefds[0]);//sleep(10);  DebugPrint用::exit(0);}else if(id < 0){return ForkError;}else{//parentint close_ret = ::close(Pipefds[0]);//关闭读通道if(close_ret < 0) return CloseError;channels.emplace_back(Pipefds[1] , id);}}return OK;}void DebugProcesspool(){for( auto &c :channels){std::cout<< c.Name() <<std::endl;}}void DispathTask()
{int who =0;//派发任务int num =20; //20个任务while(num--){//a.选择一个任务 ,整数值  ,taskmanager中选int Tasknum = TM.SelectTask();//b.选择一个子进程管道  ,channels中选channel &curr =channels[who++];who %= channels.size();std::cout<<"######################"<<std::endl;std::cout<<"send"<<Tasknum<<" "<<curr.Name()<<"任务还剩"<<num<<std::endl;std::cout<<"######################"<<std::endl;//c.通过管道派发任务curr.Send(Tasknum);sleep(1);}}void CleanProcesspool()
{// version 3  前提:子进程创建时删除了继承自master的历史wfdfor (auto &c : channels){c.Close();pid_t rid = ::waitpid(c.Id(), nullptr, 0);if (rid > 0){std::cout << "child " << rid << " wait ... success" << std::endl;}}// version 2  前提:子进程创建时没有删除继承自master的历史wfd   方法:倒着关闭master的wfd(让最后一个子进程先读到0,关闭读端 Worker退出 子进程退出)// for(int i = channels.size()-1; i >= 0; i--)// {//     channels[i].Close();//     pid_t rid = ::waitpid(channels[i].Id(), nullptr, 0); // 阻塞了!//     if (rid > 0)//     {//         std::cout << "child " << rid << " wait ... success" << std::endl;//     }// }//vertion1// for(auto& c :channels)// {//     c.Close();// }// //回收子进程,为啥写两个循环// for(auto &c:channels)// {//     int n =waitpid(c.Id() ,nullptr ,0);//     if(n > 0)//     {//         std::cout<<"wait child:"<< n <<"sucess  "<<std::endl;//     }// }
}private:std::vector<channel> channels;int num ;work_t work;
};

Task.hpp

#pragma once#include<iostream>
#include<unordered_map>
#include<functional>
#include<ctime>
#include <sys/types.h>
#include <unistd.h>
using work_t =std::function<void()>;void Download()
{std::cout<<"我是Download任务 "<<"我的pid是"<<getpid()<<std::endl;
}
void Print()
{std::cout<<"我是Print任务 "<<"我的pid是"<<getpid()<<std::endl;
}
void Log ()
{std::cout<<"我是Log日志任务 "<<"我的pid是"<<getpid()<<std::endl;
}
void Sql()
{std::cout<<"我是Sql数据库同步任务 "<<"我的pid是"<<getpid()<<std::endl;
}static int  num =0;//总任务个数
class TaskManager
{
public:TaskManager(){srand(time(nullptr));InsertTask(Download);InsertTask(Print);InsertTask(Log);InsertTask(Sql);}void InsertTask(work_t t){_tasks[num++] =t;}//选择任务int SelectTask(){return rand()% num;}void Excute(int number){//没有这个任务if(_tasks.find(number) == _tasks.end()) return;//有这个任务 ,执行_tasks[number]();}~TaskManager(){}private:std::unordered_map<int ,work_t> _tasks;
};TaskManager TM;//Task属于父子共享区//子进程的回调方法
void Worker()
{//read ->stdinwhile(true){int cmd =0;int n =::read( 0, &cmd ,sizeof(cmd));//读和写都是以一个int的大小if( n== sizeof(cmd))//执行任务{TM.Excute(cmd);}else if(n == 0)//读端已经读到0啦代表写端已经关闭 ,关闭子进程{std::cout<<"pid:"<<getpid()<<"quit..."<<std::endl;break;}else{}}
}

main.cc

#include"ProcessPool.hpp"
#include"Task.hpp"void Usage(std::string process)
{std::cout<<"Uasge:"<< process <<"process-num"<<std::endl;
}//我们自己是master
int main(int argc ,char* argv[])
{if(argc!=2){Usage(argv[0]);return UsageError;}int num  =std::stoi(argv[1]);ProcessPool * pp = new ProcessPool(num ,Worker); //1.创建&&初始化进程池pp->InitProcessPool();//2.派发任务pp->DispathTask();//3.退出进程池 ,只需要关闭所有管道的写端即可pp->CleanProcesspool();//1.创建&&初始化进程池//InitProcessPool(channels , num ,Worker);//DebugProcesspool(channels);//2.派发任务//DispathTask(channels);//3.退出进程池 ,只需要关闭所有管道的写端即可//CleanProcesspool(channels);delete(pp);return 0;
}

makefile

BIN=processpool
CC=g++
FLAGS=-c -Wall -std=c++11
LDFLAGS= -o
#SRC=$(shell ls *.cc)
SRC=$(wildcard *.cc)
OBJ=$(SRC:.cc=.o)$(BIN):$(OBJ)$(CC) $(LDFLAGS) $@ $^
%.o:%.cc$(CC) $(FLAGS)  $<.PHONY:clean
clean:rm -f $(BIN) $(OBJ).PHONY:test
test:@echo $(SRC)@echo $(OBJ)


http://www.ppmy.cn/server/179166.html

相关文章

LeetCode热题100精讲——Top4:移动零【双指针】

你好&#xff0c;我是安然无虞。 文章目录 题目背景移动零C解法Python解法 题目背景 如果大家对于 双指针 的概念并不熟悉, 可以先看我之前为此专门写的算法详解: 蓝桥杯算法竞赛系列第七章——六道力扣经典带你刷爆双指针 移动零 题目链接&#xff1a;移动零 解题思路&…

笔记本+移动端维修全套教程

今天分享的是笔记本移动端维修全套教程&#xff08;免费视频资料大全&#xff09; 当自己手机或者电脑坏了&#xff0c;很多人都会想着去维修店铺修&#xff0c;但价格不透明&#xff0c;容易被坑&#xff0c;当自己了解一些之后&#xff0c;即使不会修&#xff0c;也可以对手…

计算机体系结构及存储系统入门

1.2 计算机体系结构 计算机体系结构(computer architecture)是指计算机的概念性结构、功能和性能特性&#xff0c;它从一个更高的层次对计算机的结构和特征等宏观特性进行研究。计算机体系结构分类如下所述: 从计算机体系架构的宏观视角出发&#xff0c;依据处理机数量&#x…

6.3 模拟专题:LeetCode 6. Z 字形变换

1. 题目链接 LeetCode 6. Zigzag Conversion 2. 题目描述 将一个给定字符串 s 按照指定的行数 numRows 进行 Z 字形排列后&#xff0c;逐行读取并返回新的字符串。 示例&#xff1a; 输入&#xff1a;s "PAYPALISHIRING", numRows 3 → 输出&#xff1a;"P…

蓝之洋科技以AI智能制造引领变革,推动移动电源产业迈向高端智能化!

在全球智能制造加速发展的背景下&#xff0c;深圳市蓝之洋科技有限公司凭借十余年的行业深耕、先进的生产体系、严格的质量标准及持续的技术创新&#xff0c;已成为移动电源领域的领先制造商。作为众多国际品牌的长期合作伙伴&#xff0c;蓝之洋科技不仅在生产制造方面树立了行…

雕马快租:直播设备租赁新趋势,低成本重构传统营销模式的破局之道

引言&#xff1a;直播电商浪潮下的设备成本困局 随着直播电商渗透率突破60%&#xff08;艾媒咨询2025年数据&#xff09;&#xff0c;企业与个人主播对直播设备的需求呈指数级增长。然而&#xff0c;一套专业直播设备的采购成本动辄数万元&#xff0c;叠加设备迭代周期缩短至1…

软件开发过程中常用的调试工具(gdb)

gdb 因为我们公司其中脚本中有rk的gdb调试工具脚本&#xff0c;内部只需要将其打开后进行编译即可&#xff1a; 需要将编译出来的cvr_app 第一种&#xff1a;使用gdb将app给跑起来&#xff1a;gdb cvr_app 然后在出现问题时&#xff1a; 输入bt&#xff0c;可以打印出当前…

从零构建大语言模型全栈开发指南:第二部分:模型架构设计与实现-2.2.3实战案例:在笔记本电脑上运行轻量级LLM

👉 点击关注不迷路 👉 点击关注不迷路 👉 点击关注不迷路 文章大纲 实战案例:在笔记本电脑上运行轻量级LLM2.2.3 模型架构设计与实现1. 环境与工具准备1.1 硬件要求1.2 软件栈选择2. 轻量级模型架构设计2.1 模型参数配置2.2 关键技术优化3. 实战流程3.1 数据准备流程3.2…