目录
write和read函数补充:
进程池(process pool)
第一步: 创建并初始化processpool
第二步:主进程对子进程派发任务
补充:
第三步: 子进程执行完退出进程池
回收子进程
Channel.hpp
ProcessPool.hpp
Task.hpp
main.cc
makefile
write和read函数补充:
const char *str = "hello";
write(fd, str, strlen(str)); // 写入 5 个字节 ('h', 'e', 'l', 'l', 'o'),不含 '\0'
write(fd ,str, strlen(str)) 不会将\0读入 ,strlen中不含\0
read如果读的是字符串想打印 ,需要预留一个位置再最后加\0 ,再进行打印
读的数据如果不想打印 ,不用预留位置
wirte和read都是数据流,它们的行为是严格的字节级原始数据写入和读出,完全按照用户指定的内容和长度进行操作
匿名管道的应用: 进程池
进程池(process pool)
先把进程创建出来,需要什么任务 ,派发什么任务.
让一个进程(master进程) ,给其他进程(work进程)派发任务
下面实现process pool
第一步: 创建并初始化processpool
master要管理所有的管道 创建channel
创建管道 ,创建子进程 ,用vector管理全部的channel
#include<iostream>
#include<unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include<vector>
#include<functional>//执行任一方法//typedef std::function<void()> work_t;
using work_t =std::function<void()>;//定义了函数对象类型enum
{OK=0,UsageError,PipeError,ForkError,CloseError,Dup2Error
};//先描述
class channel
{
public:channel(int wfd ,pid_t who):_wfd(wfd),_who(who){_name ="channel-"+std::to_string(wfd)+"->"+ std::to_string(who);}~channel(){}std::string Name(){return _name;}
private:int _wfd;std::string _name;//channel-3->203444pid_t _who;
};void Worker()
{//read ->stdin
}
void Download()
{//read ->stdin
}//channels是输出型参数
//work_t work 回调方法 使创建子进程与让子进程执行任务是解耦的
int InitProcessPool(std::vector<channel>& channels ,int num ,work_t work)
{//创建指定进程个数for(int i=0 ;i <num; i++){ //管道int Pipefds[2]={0};int n =::pipe(Pipefds);if(n< 0) return PipeError;//子进程pid_t id =::fork();if(id == 0){//child 读int close_ret = ::close(Pipefds[1]);//关闭写通道if(close_ret < 0) return CloseError;int dup2_ret =::dup2(Pipefds[0] ,0);//重定向,让子进程从标准输入中获取要执行的任务。不再使用Pipefds[0]了 if(dup2_ret< 0) return Dup2Error;work();::close(Pipefds[0]);//sleep(10); DebugPrint用::exit(0);}else if(id < 0){return ForkError;}else{//parentint close_ret = ::close(Pipefds[0]);//关闭读通道if(close_ret < 0) return CloseError;channels.emplace_back(Pipefds[1] , id);}}return OK;}
void DebugProcesspool(std::vector<channel>& channels)
{for( auto &c :channels){std::cout<< c.Name() <<std::endl;}}void Usage(std::string process)
{std::cout<<"Uasge:"<< process <<"process-num"<<std::endl;
}//我们自己是master
int main(int argc ,char* argv[])
{if(argc!=2){Usage(argv[0]);return UsageError;}int num =std::stoi(argv[1]);std::vector<channel> channels;//1.创建&&初始化进程池InitProcessPool(channels , num ,Worker);// InitProcessPool(channels , num ,Print);// InitProcessPool(channels , num ,Dowload);//DebugProcesspool(channels);sleep(100);return 0;
}
第二步:主进程对子进程派发任务
补充:
什么是任务? 任务码表示任务 4个字节(int)写 ,4个字节(int)读
怎么派发? 让每个子进程任务量相等
- 方法1: 轮询
- 方法2:随机
- 方法3:历史任务数
第三步: 子进程执行完退出进程池
派发完所有的任务,子进程读取完 ,都在read阻塞 ,此时关闭子进程的两种方法
- 向子进程发送退出任务
- 利用管道写端关闭,读端读到0 ,子进程会自己退出的特性(推荐)
回收子进程
这里有一个藏得很深的bug
在创建多管道时 ,子进程会继承父进程的fd表 ,就会导致管道的写端被越来越多的子进程拿到,引用计数++ ,释放master进程的全部的wfd后 ,管道的写端还链接着子进程 ,就不能使上面的方法2成功,
解决方法:
- 倒着关闭master的wfd(最后一个管道只有一个wfd(在master,子进程都没有最后一个管道的wfd) ,倒着关闭子进程 ,此子进程的wfd也会跟着关闭)
- 在创建子进程时关闭历史wfd
通过上图发现父进程的_wfd从4开始递增
子进程的_rfd都是3
Channel.hpp
#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__#include <iostream>
#include <string>
#include <unistd.h>//先描述
class channel
{
public:channel(int wfd ,pid_t who):_wfd(wfd),_who(who){_name =" channel-"+std::to_string(wfd)+"->"+ std::to_string(who);}~channel(){}void Send(int tasknum){::write(_wfd ,&tasknum ,sizeof(tasknum)); //write第二个参数是const void *buf}std::string Name(){return _name;}void Close(){::close(_wfd);}pid_t Id(){return _who;}int WFD(){return _wfd;}
private:int _wfd;std::string _name;//channel-3->203444pid_t _who;
};#endif
ProcessPool.hpp
#include<iostream>
#include<unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include<vector>
#include<functional>//执行任一方法
#include"Task.hpp"
#include"Channel.hpp"enum
{OK=0,UsageError,PipeError,ForkError,CloseError,Dup2Error
};
//typedef std::function<void()> work_t;
using work_t =std::function<void()>;//定义了函数对象类型class ProcessPool
{
public:ProcessPool(int num ,work_t w):num(num),work(w){}//channels是输出型参数
//work_t work 回调方法 使创建子进程与让子进程执行任务是解耦的
int InitProcessPool()
{//创建指定进程个数for(int i=0 ;i <num; i++){ //管道int Pipefds[2]={0};int n =::pipe(Pipefds);if(n< 0) return PipeError;//子进程pid_t id =::fork();if(id == 0){//child 读// 子进程关闭历史wfdfor(auto &c : channels){std::cout << c.WFD() << " ";c.Close();}int close_ret = ::close(Pipefds[1]);//关闭写通道if(close_ret < 0) return CloseError;int dup2_ret =::dup2(Pipefds[0] ,0);//重定向,让子进程从标准输入中获取要执行的任务。不再使用Pipefds[0]了 if(dup2_ret< 0) return Dup2Error;//子进程从管道拿任务 ,执行任务work();//work()退出后exit::close(Pipefds[0]);//sleep(10); DebugPrint用::exit(0);}else if(id < 0){return ForkError;}else{//parentint close_ret = ::close(Pipefds[0]);//关闭读通道if(close_ret < 0) return CloseError;channels.emplace_back(Pipefds[1] , id);}}return OK;}void DebugProcesspool(){for( auto &c :channels){std::cout<< c.Name() <<std::endl;}}void DispathTask()
{int who =0;//派发任务int num =20; //20个任务while(num--){//a.选择一个任务 ,整数值 ,taskmanager中选int Tasknum = TM.SelectTask();//b.选择一个子进程管道 ,channels中选channel &curr =channels[who++];who %= channels.size();std::cout<<"######################"<<std::endl;std::cout<<"send"<<Tasknum<<" "<<curr.Name()<<"任务还剩"<<num<<std::endl;std::cout<<"######################"<<std::endl;//c.通过管道派发任务curr.Send(Tasknum);sleep(1);}}void CleanProcesspool()
{// version 3 前提:子进程创建时删除了继承自master的历史wfdfor (auto &c : channels){c.Close();pid_t rid = ::waitpid(c.Id(), nullptr, 0);if (rid > 0){std::cout << "child " << rid << " wait ... success" << std::endl;}}// version 2 前提:子进程创建时没有删除继承自master的历史wfd 方法:倒着关闭master的wfd(让最后一个子进程先读到0,关闭读端 Worker退出 子进程退出)// for(int i = channels.size()-1; i >= 0; i--)// {// channels[i].Close();// pid_t rid = ::waitpid(channels[i].Id(), nullptr, 0); // 阻塞了!// if (rid > 0)// {// std::cout << "child " << rid << " wait ... success" << std::endl;// }// }//vertion1// for(auto& c :channels)// {// c.Close();// }// //回收子进程,为啥写两个循环// for(auto &c:channels)// {// int n =waitpid(c.Id() ,nullptr ,0);// if(n > 0)// {// std::cout<<"wait child:"<< n <<"sucess "<<std::endl;// }// }
}private:std::vector<channel> channels;int num ;work_t work;
};
Task.hpp
#pragma once#include<iostream>
#include<unordered_map>
#include<functional>
#include<ctime>
#include <sys/types.h>
#include <unistd.h>
using work_t =std::function<void()>;void Download()
{std::cout<<"我是Download任务 "<<"我的pid是"<<getpid()<<std::endl;
}
void Print()
{std::cout<<"我是Print任务 "<<"我的pid是"<<getpid()<<std::endl;
}
void Log ()
{std::cout<<"我是Log日志任务 "<<"我的pid是"<<getpid()<<std::endl;
}
void Sql()
{std::cout<<"我是Sql数据库同步任务 "<<"我的pid是"<<getpid()<<std::endl;
}static int num =0;//总任务个数
class TaskManager
{
public:TaskManager(){srand(time(nullptr));InsertTask(Download);InsertTask(Print);InsertTask(Log);InsertTask(Sql);}void InsertTask(work_t t){_tasks[num++] =t;}//选择任务int SelectTask(){return rand()% num;}void Excute(int number){//没有这个任务if(_tasks.find(number) == _tasks.end()) return;//有这个任务 ,执行_tasks[number]();}~TaskManager(){}private:std::unordered_map<int ,work_t> _tasks;
};TaskManager TM;//Task属于父子共享区//子进程的回调方法
void Worker()
{//read ->stdinwhile(true){int cmd =0;int n =::read( 0, &cmd ,sizeof(cmd));//读和写都是以一个int的大小if( n== sizeof(cmd))//执行任务{TM.Excute(cmd);}else if(n == 0)//读端已经读到0啦代表写端已经关闭 ,关闭子进程{std::cout<<"pid:"<<getpid()<<"quit..."<<std::endl;break;}else{}}
}
main.cc
#include"ProcessPool.hpp"
#include"Task.hpp"void Usage(std::string process)
{std::cout<<"Uasge:"<< process <<"process-num"<<std::endl;
}//我们自己是master
int main(int argc ,char* argv[])
{if(argc!=2){Usage(argv[0]);return UsageError;}int num =std::stoi(argv[1]);ProcessPool * pp = new ProcessPool(num ,Worker); //1.创建&&初始化进程池pp->InitProcessPool();//2.派发任务pp->DispathTask();//3.退出进程池 ,只需要关闭所有管道的写端即可pp->CleanProcesspool();//1.创建&&初始化进程池//InitProcessPool(channels , num ,Worker);//DebugProcesspool(channels);//2.派发任务//DispathTask(channels);//3.退出进程池 ,只需要关闭所有管道的写端即可//CleanProcesspool(channels);delete(pp);return 0;
}
makefile
BIN=processpool
CC=g++
FLAGS=-c -Wall -std=c++11
LDFLAGS= -o
#SRC=$(shell ls *.cc)
SRC=$(wildcard *.cc)
OBJ=$(SRC:.cc=.o)$(BIN):$(OBJ)$(CC) $(LDFLAGS) $@ $^
%.o:%.cc$(CC) $(FLAGS) $<.PHONY:clean
clean:rm -f $(BIN) $(OBJ).PHONY:test
test:@echo $(SRC)@echo $(OBJ)