文章目录
- 什么是进程间通信
- 匿名管道
- 命名管道
- system V共享内存
- system V消息队列
- 信号量
一、什么是进程间通信
首先由于进程运行是具有独立性的,具有独立的页表,PCB和虚拟地址空间等,父子进程间数据互补干扰。这就让进程间通信难度加大。由于操作系统在设计的时候,它本身就是独立的。
进程间通信的本质:需要将不同的进程看到同一份资源(内存空间)。
进程间通信的目的:
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
进程间通信的必要性:
- 单进程无法使用并发能力,也就不能实现多进程间协同
- 比如:传输数据,同步执行流,消息通知等
- 所以进程间通信不是目的,而是手段,是为了实现多进程间协同。
进程通信的技术背景:
- 由于进程具有独立性。虚拟地址空间和页表保证进程运行的独立性(进程内核数据结构和进程的代码和数据。
- 通信的成本比较高
进程间通信的本质理解:
- 进程间通信的前提,首先需要让不同的进程看到同一份"内存"(特定的结构组成)。
- 同一份"内存"不隶属于任何一个进程,应该强调共享。
进程间通信分类:进程间通信方式的标准
- Linux原生提供---管道
- 匿名管道pipe
- 命名管道
- 单机通信---多进程---System V IPC
- System V 消息队列
- System V 共享内存
- System V 信号量
- 网络通信---多线程---POSIX IPC
- 消息队列
- 共享内存
- 信号量
- 互斥量
- 条件变量
- 读写锁
1.管道
什么是管道?
管道,英文为pipe。这是一个我们在学习Linux命令行的时候就会引入的一个很重要的概念。它的发明人是道格拉斯.麦克罗伊,这位也是UNIX上早期shell的发明人。他在发明了shell之后,发现系统操作执行命令的时候,经常有需求要将一个程序的输出交给另一个程序进行处理,这种操作可以使用输入输出重定向加文件搞定。管道本质上就是一个文件,前面的进程以写方式打开文件,后面的进程以读方式打开。这样前面写完后面读,于是就实现了通信。实际上管道的设计也是遵循UNIX的“一切皆文件”设计原则的,它本质上就是一个文件。Linux系统直接把管道实现成了一种文件系统,借助VFS给应用程序提供操作接口。
- 只能单向通信
- 管道内传输的都是资源
2.管道的原理
管道通信背后是进程之间通过管道进行进程通信
管道是Unix中最古老的进程间通信的形式。我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”这里中间的数据资源不属于任何一个进程
注意: 这里使用who命令是查看当前云服务器的登录用户(一行显示一个用户),wc -l 用于统计当前行数。
如果当前进程创建了子进程,此时文件描述符表需要拷贝给子进程吗?而且表中指向的一堆文件要不要拷贝给子进程?
这里文件描述符表示进程与文件之间的关系,表示当前进程可以看到已经打开了哪些文件。这个是需要拷贝给子进程的。但是表中指向的文件与进程无关不需要进行拷贝!由于这个父子进程中struct files_struct是一样的,里面保存的文件指针都是指向同一个文件。(这里我们可以说明父子进程打印到显示屏上都是打印到一号文件中去)。这里我们可以发现父子间看到了同一个公共文件。这也就是管道!
这里我们可以通过fork()创建父子进程,让父进程进行只进行写入,子进程只进行读取,那么这里我们需要关闭父进程的读功能,保留写,关闭子进程的写功能,保留读功能。那么我们可以让父子进程看到同一份资源!
注意:
- Linux下一切皆文件,所以管道也是文件。由于文件属于内核的,所有的进程间通信都属于内核级别的。
- 由于进程间通信的管道必须在内存中,是一个纯内存的通信方式,如果此时需要将数据写入到磁盘中,此时通信效率降低,而且这里的数据往往都是临时数据不要将数据写入磁盘(也叫数据持久化保存)
二.匿名管道
#include <unistd.h>功能:创建一无名管道原型: int pipe(int fd[2]);参数
- fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
- fd[2]作为输出型参数,可以检验管道是否创建成功
- 返回值:成功返回0,失败返回错误代码
1.用fork来共享管道原理
通过fork让子进程继承--能够让具有血缘关系的进程进行通信--常用于父子进程,这样我们就能做到在不同的进程下,看到同一份资源。
这里需要注意:pipefd[0]表示读端,pipefd[1]表示写端。
测试代码:
#include <iostream>
#include <unistd.h>
#include <assert.h>int main()
{//1.创建管道int pipefd[2]={0};int n=pipe(pipefd);//由于在Debug条件下assert是有效的,但是在Release版本下是无效的assert(n!=-1);//强制检查管道是否创建成功(void)n;//这里是说明在Release版本下n被使用过std::cout<<"pipefd[0]"<<pipefd[0]<<std::endl;//3std::cout<<"pipefd[1]"<<pipefd[1]<<std::endl;//4return 0;
}
这里我们可以发现管道已经成功创建:
2.站在文件描述符角度-深度理解管道
实例代码:
#include <iostream>
#include <cstdio>
#include <cstring>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <assert.h>int main()
{//1.创建管道int pipefd[2]={0};int n=pipe(pipefd);//由于在Debug条件下assert是有效的,但是在Release版本下是无效的assert(n!=-1);//强制检查管道是否创建成功(void)n;//这里是说明在Release版本下n被使用过//如果实在Debug版本下就不打印
#ifdef DEBUGstd::cout<<"pipefd[0]"<<pipefd[0]<<std::endl;//3std::cout<<"pipefd[1]"<<pipefd[1]<<std::endl;//4
#endif//创建子进程pid_t id=fork();assert(id!=-1);if(id==0){//子进程//构建单向通信信道//子进程读取,关闭子进程的写端close(pipefd[1]);char buffer[1024];while(true){//从pipefd[0]中进行读取,读取到缓冲区中ssize_t s=read(pipefd[0],buffer,sizeof(buffer)-1);//表示读取成功if(s>0){buffer[s]=0;std::cout<<"child get a message ["<<getpid()<<"] Father#"<<buffer<<std::endl;}}//最后关闭子进程的读端,这里也可以不需要关闭,最终会由OS管理//close(pipefd[0]);exit(0);}//父进程//构建单向通信的信道//父进程进行写入,关闭相应的读端close(pipefd[0]);std::string message="我是父进程,我正在给你发消息";int count=0;char send_buffer[1024];while(true){//构建一个变化的字符串//将printf的内容格式化到字符串中去snprintf(send_buffer,sizeof(send_buffer),"%s[%d]:%d",message.c_str(),getpid(),count++);//进行写入操作//这里是需要写入到管道文件中所以不需要+1,因为'\0'写入到文件中没有意义write(pipefd[1],send_buffer,strlen(send_buffer));sleep(1);}pid_t ret=waitpid(id,nullptr,0);//非阻塞方式等待assert(ret>0);(void)ret;close(pipefd[1]);return 0;
}
这里我们发现父子进程都用到了一个buffer作为缓冲区,为什么不能定义成一个全局的buffer呢?
由于会发生写时拷贝的缘故。
3. 站在内核角度-管道本质
4管道总结
- 1.管道是一种进程间的通信的方式,管道是用来进行具有血缘关系的进程进行进程间通信,常用于父子间进行通信。
- 2.管道具有通过让进程间协同,提供了访问控制
- 3.管道提供的是面向流式的通信服务---面向字节流(需要对应的协议)
- 4.管道是基于文件的,文件的生命周期是随进程的,管道的生命周期是随进程的。
- 5.管道是单向通信的,就会半双工通信的一种特殊情况
上述实例代码中,父进程每隔一秒发送一条消息,但是子进程没有设置读取时间限制,但是子进程还是可以根据父进程的写入而读取,那么子进程在父进程sleep的1s中主要是在等待父进程写入资源就绪。
由于管道是一个文件,显示器也是一个文件,父子同时向显示器写入时,没有等待这一说辞,由于之前我们向显示器中打印的时候,都是交错着疯狂往显示器中打印。这种情况称之为缺乏访问控制。那么子进程等待父进程的写入这种方式我们称之为访问控制。
注意:
- 当管道里的数据满了的时候,写的乙方需要等待读的一方将数据读取完后才能进行写入。
- 当管道为空时,读取一方要等待写的一方写入。
验证代码:
#include <iostream>
#include <cstdio>
#include <cstring>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <assert.h>int main()
{//1.创建管道int pipefd[2]={0};int n=pipe(pipefd);//由于在Debug条件下assert是有效的,但是在Release版本下是无效的assert(n!=-1);//强制检查管道是否创建成功(void)n;//这里是说明在Release版本下n被使用过//如果实在Debug版本下就不打印
#ifdef DEBUGstd::cout<<"pipefd[0]"<<pipefd[0]<<std::endl;//3std::cout<<"pipefd[1]"<<pipefd[1]<<std::endl;//4
#endif//创建子进程pid_t id=fork();assert(id!=-1);if(id==0){//子进程//构建单向通信信道//子进程读取,关闭子进程的写端close(pipefd[1]);char buffer[1024];while(true){//从pipefd[0]中进行读取,读取到缓冲区中ssize_t s=read(pipefd[0],buffer,sizeof(buffer)-1);//表示读取成功if(s>0){sleep(20);buffer[s]=0;std::cout<<"child get a message ["<<getpid()<<"] Father#"<<buffer<<std::endl;}}//最后关闭子进程的读端,这里也可以不需要关闭,最终会由OS管理//close(pipefd[0]);exit(0);}//父进程//构建单向通信的信道//父进程进行写入,关闭相应的读端close(pipefd[0]);std::string message="我是父进程,我正在给你发消息";int count=0;char send_buffer[1024];while(true){//构建一个变化的字符串//将printf的内容格式化到字符串中去snprintf(send_buffer,sizeof(send_buffer),"%s[%d]:%d",message.c_str(),getpid(),count++);//进行写入操作//这里是需要写入到管道文件中所以不需要+1,因为'\0'写入到文件中没有意义write(pipefd[1],send_buffer,strlen(send_buffer));std::cout<<count<<std::endl;}pid_t ret=waitpid(id,nullptr,0);//非阻塞方式等待assert(ret>0);(void)ret;close(pipefd[1]);return 0;
}
由上图我们可以发现缓冲区满了不能继续写入了,只能代码子进程慢慢读取。
接下来我们验证写入一方fd没有关闭,如果有数据则读取,没有数据就等,写入一方fd关闭,读取一方,read返回0,表示读取到文件结尾(此时需要将缓冲区中的内容读取完毕就可以退出了)。
#include <iostream>
#include <cstdio>
#include <cstring>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <assert.h>int main()
{//1.创建管道int pipefd[2]={0};int n=pipe(pipefd);//由于在Debug条件下assert是有效的,但是在Release版本下是无效的assert(n!=-1);//强制检查管道是否创建成功(void)n;//这里是说明在Release版本下n被使用过//如果实在Debug版本下就不打印
#ifdef DEBUGstd::cout<<"pipefd[0]"<<pipefd[0]<<std::endl;//3std::cout<<"pipefd[1]"<<pipefd[1]<<std::endl;//4
#endif//创建子进程pid_t id=fork();assert(id!=-1);if(id==0){//子进程//构建单向通信信道//子进程读取,关闭子进程的写端close(pipefd[1]);char buffer[1024];while(true){//从pipefd[0]中进行读取,读取到缓冲区中ssize_t s=read(pipefd[0],buffer,sizeof(buffer)-1);//表示读取成功if(s>0){buffer[s]=0;std::cout<<"child get a message ["<<getpid()<<"] Father#"<<buffer<<std::endl;}else if(s==0){std::cout<<"writer quit(father),me quit"<<std::endl;break;}}//最后关闭子进程的读端,这里也可以不需要关闭,最终会由OS管理//close(pipefd[0]);exit(0);}//父进程//构建单向通信的信道//父进程进行写入,关闭相应的读端close(pipefd[0]);std::string message="我是父进程,我正在给你发消息";int count=0;char send_buffer[1024];while(true){//构建一个变化的字符串//将printf的内容格式化到字符串中去snprintf(send_buffer,sizeof(send_buffer),"%s[%d]:%d",message.c_str(),getpid(),count++);//进行写入操作//这里是需要写入到管道文件中所以不需要+1,因为'\0'写入到文件中没有意义write(pipefd[1],send_buffer,strlen(send_buffer));sleep(1);std::cout<<count<<std::endl;if(count==5){std::cout<<"writer quit(father)"<<std::endl;break;}}close(pipefd[1]);pid_t ret=waitpid(id,nullptr,0);//非阻塞方式等待assert(ret>0);(void)ret;return 0;
}
一般而言,内核对于管道操作进行同步与互斥管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道。
半双工通信:作为通信的一方,要么在发送数据,要么在接受数据,不能同时进行发送和接受。
全双工通信:既可以发送数据也可以接受数据。
a.写快,读慢,写满的时候就不能再写了
b.写慢,读快,管道没有数据的时候,读的这一方就必须等待
c.写关,读0,表示读到了文件结尾
d.读关,写继续写,OS终止写进程
5.进程池
建立进程池
给父进程和每一个子进程建立一个管道,并以固定大小的方式command code(4KB)向子进程发送指令。
Makefile文件
ProcessPool:ProcessPool.ccg++ -o $@ $^ -std=c++11 -DEBUG
.PHONY:clean
clean:rm -f ProcessPool
Tack.hpp
#pragma once#include <iostream>
#include <unistd.h>
#include <string>
#include <functional>
#include <vector>
#include <unordered_map>typedef std::function<void()> func;std::vector<func> callbacks;
std::unordered_map<int,std::string> desc;void readMySQL()
{std::cout<<"sub process[ "<<getpid()<<"] 执行访问数据库的任务"<<std::endl;
}void execulerUrl()
{std::cout<<"sub process[ "<<getpid()<<"] 执行url解析"<<std::endl;
}void cal()
{ std::cout<<"sub process[ "<<getpid()<<"] 执行加密任务"<<std::endl;
}void save()
{std::cout<<"sub process[ "<<getpid()<<"] 执行数据持久化任务"<<std::endl;
}void load()
{desc.insert({callbacks.size(),"readMySQL:读取数据库"});callbacks.push_back(readMySQL);desc.insert({callbacks.size(),"execulerUrl:进行url解析"});callbacks.push_back(execulerUrl);desc.insert({callbacks.size(),"cal:进行加密计算"});callbacks.push_back(cal);desc.insert({callbacks.size(),"save:进行数据持久化"});callbacks.push_back(save);
}void showHandler()
{for(const auto & iter:desc){std::cout<<iter.first<<"\t"<<iter.second<<std::endl;}
}int handlerSize()
{return callbacks.size();
}
1.手动派发任务方式ProcessPool.cc:
#include <iostream>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"#define PROCESS_NUM 5int waitCommand(int waitFd,bool &quit)
{uint32_t command=0;ssize_t s=read(waitFd,&command,sizeof(command));if(s==0){quit=true;return -1;}assert(s==sizeof(uint32_t));return command;
}void sendAndWakeup(pid_t who,int fd,uint32_t command)
{write(fd,&command,sizeof(command)); std::cout<<"main process call process "<<who<<"execute "<<desc[command]<<"through "<<fd<<std::endl;
}
int main()
{load();std::vector<std::pair<pid_t,int>> slots;//创建多个进程for(int i=0;i<PROCESS_NUM;i++){//创建管道int pipefd[2]={0};int n=pipe(pipefd);assert(n==0);(void)n;pid_t id=fork();assert(id!=-1);//子进程进行读取if(id==0){//child//关闭写端close(pipefd[1]);while(true){bool quit=false;int command=waitCommand(pipefd[0],quit);//如果不发就阻塞//执行对应的命令if(quit) break;if(command>=0&&command<handlerSize()){callbacks[command]();}else{std::cout<<"非法command"<<std::endl;}}exit(1);}//father//进行写入,关闭读端close(pipefd[0]);slots.push_back(std::pair<pid_t,int>(id,pipefd[1]));}//开始任务srand((unsigned long)time(nullptr)^getpid()^222222222222222L);while(true){int select;int command;std::cout<<"################################################"<<std::endl;std::cout<<"# 1.show functions #"<<std::endl;std::cout<<"# 2.send command #"<<std::endl;std::cout<<"################################################"<<std::endl;std::cout<<"Please Select>";std::cin>>select;if(select==1) showHandler();else if(select==2){std::cout<<"Enter Your Command> ";std::cin>>command;//选择进程int choice=rand()%slots.size();//布置任务sendAndWakeup(slots[choice].first,slots[choice].second,command);}}//关闭fd,结束所有进程,所有的子进程都会退出for(const auto &slot:slots){close(slot.second);}//回收所有的子进程信息for(const auto &slot:slots){//等待全部子进程waitpid(slot.first,nullptr,0);}return 0;
}
2.自动派发任务方式ProcessPool.cc
#include <iostream>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"#define PROCESS_NUM 5int waitCommand(int waitFd,bool &quit)
{uint32_t command=0;ssize_t s=read(waitFd,&command,sizeof(command));if(s==0){quit=true;return -1;}assert(s==sizeof(uint32_t));return command;
}void sendAndWakeup(pid_t who,int fd,uint32_t command)
{write(fd,&command,sizeof(command)); std::cout<<"main process call process "<<who<<"execute "<<desc[command]<<"through "<<fd<<std::endl;
}
int main()
{load();std::vector<std::pair<pid_t,int>> slots;//创建多个进程for(int i=0;i<PROCESS_NUM;i++){//创建管道int pipefd[2]={0};int n=pipe(pipefd);assert(n==0);(void)n;pid_t id=fork();assert(id!=-1);//子进程进行读取if(id==0){//child//关闭写端close(pipefd[1]);while(true){bool quit=false;int command=waitCommand(pipefd[0],quit);//如果不发就阻塞//执行对应的命令if(quit) break;if(command>=0&&command<handlerSize()){callbacks[command]();}else{std::cout<<"非法command"<<std::endl;}}exit(1);}//father//进行写入,关闭读端close(pipefd[0]);slots.push_back(std::pair<pid_t,int>(id,pipefd[1]));}//开始任务srand((unsigned long)time(nullptr)^getpid()^222222222222222L);while(true){int command=rand()%handlerSize();int choice=rand()%slots.size();sendAndWakeup(slots[choice].first,slots[choice].second,command);sleep(1);}//关闭fd,结束所有进程,所有的子进程都会退出for(const auto &slot:slots){close(slot.second);}//回收所有的子进程信息for(const auto &slot:slots){//等待全部子进程waitpid(slot.first,nullptr,0);}return 0;
}
6.管道读写规则
当没有数据可读时
- O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。
- O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
当管道满的时候
- O_NONBLOCK disable: write调用阻塞,直到有进程读走数据
- O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
- 如果所有管道写端对应的文件描述符被关闭,则read返回0
- 如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出
- 当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。
- 当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。
7.管道特点
- 只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。
- 管道提供流式服务
- 一般而言,进程退出,管道释放,所以管道的生命周期随进程
- 一般而言,内核会对管道操作进行同步与互斥
- 管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道
三、命名管道
- 管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
- 如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。
- 命名管道是一种特殊类型的文件
命名管道:通信的本质是要两个不同的进程看到同一份资源,才能建立起管道。磁盘上可以创建一个管道文件。管道文件可以被打开但是不会将内存数据刷新到磁盘中。管道文件存在与系统路径下,路径具有唯一性。双方进程就可以通过管道文件的路径看到同一份资源!与匿名管道的区别是文件仅仅在内存中创建了一个文件,由父子进程进行访问而命名管道的管道文件 也是内存中的文件,但是在磁盘中有一个映射,有文件目录。
命名管道可以从命令行上创建,命令行方法是使用下面这个命令:
mkfifo filename
命名管道也可以从程序里创建,相关函数有:
int mkfifo(const char *filename,mode_t mode);
当向管道文件写写入数据后,文件处于阻塞状态。,此时需要进行读取。
通过使用读取可以解决该问题
接下来我们验证循环次数的读入
此时我们可以发现一个终端一直在写入,另外一个终端一直在读取。
通过unlink name_pipe来删除管道文件也是可以的。
1.用命名管道实现server&client通信
Makefile
.PHONY:all
all:client serverclient:client.ccg++ -o $@ $^ -std=c++11
server:server.ccg++ -o $@ $^ -std=c++11.PHONY:clean
clean:rm -f client server
log.hpp
#ifndef _LOG_H_
#define _LOG_H_#include <iostream>
#include <ctime>#define Debug 0
#define Notice 1
#define Warning 2
#define Error 3//定义日志的几种状态
const std::string msg[]={"Debug","Notice","Warning","Error"
};std::ostream &Log(std::string message,int level)
{//时间,日志信息std::cout<<"|"<<(unsigned)time(nullptr)<<"|"<<msg[level]<<"|"<<message;return std::cout;
}#endif
comm.hpp
#ifndef _COMM_H_
#define _COMM_H_#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstdio>
#include <cstring>
#include "log.hpp"//设置管道文件的权限
#define MODE 0666//缓冲区大小
#define SIZE 128//管道文件的路径
std::string ipcPath="./fifo.ipc";#endif
client.cc
#include "comm.hpp"int main()
{//获取管道文件,以写的方式打开int fd=open(ipcPath.c_str(),O_WRONLY);if(fd<0){perror("open");exit(1);}//ipc过程//创建需要发送的字符串std::string buffer;while(true){std::cout<<"Please Enter Message Line:> ";//将字符串放入buffer中std::getline(std::cin,buffer);//将buffer写入管道文件中write(fd,buffer.c_str(),buffer.size());}//关闭文件close(fd);return 0;
}
server.cc
#include "comm.hpp"static void getMessage(int fd)
{char buffer[SIZE];while(true){memset(buffer,'\0',sizeof buffer);ssize_t s=read(fd,buffer,sizeof(buffer)-1);if(s>0){std::cout<<"["<<getpid()<<"]"<<"client say>"<<buffer<<std::endl;}else if(s==0){//end of filestd::cerr<<"["<<getpid()<<"]"<<"read end of file,client quit,server quit too!"<<std::endl;break;}else{//read errorperror("read");break;}}
}int main()
{//1.创建管道文件if(mkfifo(ipcPath.c_str(),MODE)<0){perror("mkfifo");exit(1);}//写日志Log("创建管道文件成功",Debug)<<"step 1"<<std::endl;//2.正常的文件操作int fd=open(ipcPath.c_str(),O_RDONLY);if(fd<0){perror("open");exit(2);}Log("打开管道文件成功",Debug)<<"step 2"<<std::endl;//这里创建三个进程来读取int nums=3;for(int i=0;i<nums;i++){pid_t id=fork();if(id==0){//通信getMessage(fd);exit(2);}}//回收子进程for(int i=0;i<nums;i++){waitpid(-1,nullptr,0);//等待任意进程退出}//3.编写正常的通信代码//创建缓冲区char buffer[SIZE];while(true){//将缓冲区清空memset(buffer,'\0',sizeof(buffer));//将管道中的内容读取到缓冲区中ssize_t s=read(fd,buffer,sizeof(buffer)-1);//如果读取成功的话if(s>0){std::cout<<"["<<getpid()<<"]"<<"client say>"<<buffer<<std::endl;}else if(s==0)//如果读取到文件结尾的位置{//end of filestd::cerr<<"["<<getpid()<<"]"<<"read end of file,client quit,server quit too!"<<std::endl;break;}else{//read errorperror("read");break;}}//4.关闭文件close(fd);Log("关闭管道文件成功",Debug)<<"step 3"<<std::endl;unlink(ipcPath.c_str());//通信完成,删除文件Log("删除管道文件成功",Debug)<<"step 4"<<std::endl;return 0;
}
这里我们可以发现三个进程争抢着客户端中发的信息,谁抢到了就将信息发送出来。
2.匿名管道与命名管道的区别
- 匿名管道由pipe函数创建并打开。
- 命名管道由mkfififo函数创建,打开用open
- FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完 成之后,它们具有相同的语义。
3.命名管道的打开规则
如果当前打开操作是为读而打开FIFO时
- O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO
- O_NONBLOCK enable:立刻返回成功
如果当前打开操作是为写而打开FIFO时
- O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO
- O_NONBLOCK enable:立刻返回失败,错误码为ENXIO
四、system V共享内存
system V IPC提供的通信方式有以下三种:
- system V共享内存
- system V消息队列
- system V信号量
system V共享内存和system V消息队列是以传送数据为目的,而system V信号量是以进程间同步与互斥而设计。
1.system V共享内存
1.共享内存数据结构
struct shmid_ds {struct ipc_perm shm_perm; /* operation perms */int shm_segsz; /* size of segment (bytes) */__kernel_time_t shm_atime; /* last attach time */__kernel_time_t shm_dtime; /* last detach time */__kernel_time_t shm_ctime; /* last change time */__kernel_ipc_pid_t shm_cpid; /* pid of creator */__kernel_ipc_pid_t shm_lpid; /* pid of last operator */unsigned short shm_nattch; /* no. of current attaches */unsigned short shm_unused; /* compatibility */void *shm_unused2; /* ditto - used by DIPC */void *shm_unused3; /* unused */
};
2.共享内存的建立
共享内存不是属于进程的,而是属于操作系统的。共享内存提供者是操作系统。
如果每一对进程通信都需要用共享内存,那么操作系统需要对其进行先描述,再组织的方式进行管理!共享内存=共享内存块+对应的共享内存的内核数据结构
1.共享内存函数
1.shmget函数
- 功能:用来创建共享内存
- 原型
- int shmget(key_t key, size_t size, int shmflg);
- 参数
- key:这个共享内存段名字
- size:共享内存大小
- shmflg:由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的
- 返回值:成功返回一个非负整数,即该共享内存段的标识码;失败返回-1
shmget函数的功能是创建并获取共享内存,这里需要注意一下shmflg为IPC_CREAT或者是IPC_EXCL,其中,IPC_CREAT就是0。
IPC_CREAT:创建共享内存的时候,如果此时系统底层中已经存在,则直接获取之,并且返回,如果不存在,就创建之,并返回。
IPC_EXCL:单独使用IPC_EXCL是没有意义的,需要将IPC_EXCL和IPC_CREAT结合起来使用,如果底层不存在,创建之,并返回,如果底层存在,出错返回。返回成功,一定是一个全新的shmid。
返回值共享内存的用户层表示符,类似于文件的fd。
key值:通过key值,能够在操作系统中唯一即可。server和client使用同一个key值,只要key值相同即可,与key值是多少无关。就能够让通信的对方进程看到的是我创建的共享内存。通过使用同样的算法规则,形成唯一的key值就可以了,只有创建的时候用key,大部分情况用户访问共享内存,都用的是shmid。通过使用ftok函数来获取key值。
2.ftok函数
- 功能:用来返回key值
- 原型
key_t ftok(const char *pathname, int proj_id);
- 参数
- pathname:指定的文件名(该文件必须存在而且可以访问)
- proj_id:子序号,虽然为int,但是只有8个比特被使用(0-255)
- 返回值:当成功执行的时候,一个key_t值将会被返回,否则-1被返回
这里我们需要传入一个路径(pathname)和项目id,ftok函数就是根据相应的算法将路径和项目id合并起来,形成一个唯一值。但是由于这里创建出来的键值底层可能也会有,所以我们这里的ftok不一定会创建成功。这里需要我们的客户端和服务端创建出来的key值相同。
Makefile
.PHONY:all
all:client serverclient:client.ccg++ -o $@ $^ -std=c++11
server:server.ccg++ -o $@ $^ -std=c++11.PHONY:clean
clean:rm -f client server
comm.hpp
#ifndef _COMM_H_
#define _COMM_H_#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstdio>
#include <cstring>
#include "log.hpp"#define PATH_NAME "/home/hsj"
#define PROJ_ID 0x66#endif
server.cc
#include "comm.hpp"int main()
{//1.创建公共的key值key_t k=ftok(PATH_NAME,PROJ_ID);Log("create key done",Debug)<<"Server say:"<<k<<std::endl;}
client.cc
#include "comm.hpp"int main()
{key_t k=ftok(PATH_NAME,PROJ_ID);Log("create key done",Debug)<<"Client say:"<<k<<std::endl;return 0;
}
在Linux中通过ipcs命令查看有关进程间通信设备的信息:
单独使用ipcs命令时,我们可以发现会默认列出消息队列,共享内存以及信号量的相关通信信息。如果想要查看某一个需要加上相应的选项。
- -q:列出消息队列相关信息
- -m:列出共享内存相关信息
- -s:列出信号量相关信息
ipcs命令输出每列信息含义如下:
这里需要注意:key是在内核层面上保证共享内存唯一性方式,而shmid是在用户层面上保证共享内存唯一性方式。
如何释放共享内存呢?
方式一:
我们可以使用:ipcrm -m shmid命令来释放指定id的共享内存资源。
system V IPC资源的生命周期随内核!除非重启,否则一直存在
1.手动删除
2.代码删除
也可以使用shmctl来关闭共享内存。
方式二:
我们可以使用程序释放共享内存资源。
利用shmctl来释放:
3.shmctl函数
函数原型:
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
- 功能:用于控制共享内存
- 原型
- int shmctl(int shmid, int cmd, struct shmid_ds *buf);
- 参数
- shmid:由shmget返回的共享内存标识码
- cmd:将要采取的动作(有三个可取值)
- buf:指向一个保存着共享内存的模式状态和访问权限的数据结构
- 返回值:成功返回0;失败返回-1
- 第一个参数shmid,表示所控制共享内存的用户级标识符
- 第二个参数cmd,表示具体的控制动作
- 第三个参数buf,用于获取或设置所控制共享内存的数据结构
其中,作为shmctl函数的第二个参数cmd传入的常用的选项有以下三个:
测试代码:
#include "comm.hpp"#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>int main()
{key_t key = ftok(PATH_NAME, PROJ_ID); //获取key值if (key < 0){perror("ftok");return 1;}int shm = shmget(key, SIZE, IPC_CREAT | IPC_EXCL); //创建新的共享内存if (shm < 0){perror("shmget");return 2;}printf("key: %x\n", key); //打印key值printf("shm: %d\n", shm); //打印句柄sleep(2);shmctl(shm, IPC_RMID, NULL); //释放共享内存sleep(2);return 0;
}
这里使用监控脚本:while :; do ipcs -m ; sleep 1;done
attach表示关联,detach表示不关联,n表示个数,attach表示关联,表示有多少个进程和我们的共享内存是关联的。
4.shmat函数
- 功能:将共享内存段连接到进程地址空间
- 原型
- void *shmat(int shmid, const void *shmaddr, int shmflg);
- 参数
- shmid: 共享内存标识
- shmaddr:指定连接的地址
- shmflg:它的两个可能取值是SHM_RND和SHM_RDONLY
- 返回值:成功返回一个指针,指向共享内存第一个节;失败返回-1
shmat的参数中,shmaddr共享内存的虚拟地址,用0表示让操作系统帮我们填写,shmflg选择挂载方式,用0表示让操作系统帮我们填写。
5.shmdt函数
- 功能:将共享内存段与当前进程脱离
- 原型
- int shmdt(const void *shmaddr);
- 参数
- shmaddr: 由shmat所返回的指针
- 返回值:成功返回0;失败返回-1
- 注意:将共享内存段与当前进程脱离不等于删除共享内存段
通过创建的共享内存和我们 的程序取消关联,shmaddr就是我们共享内存的虚拟地址。
Makefile
.PHONY:all
all:client serverclient:client.ccg++ -o $@ $^ -std=c++11
server:server.ccg++ -o $@ $^ -std=c++11.PHONY:clean
clean:rm -f client server
log.hpp
#ifndef _LOG_H_
#define _LOG_H_#include <iostream>
#include <ctime>#define Debug 0
#define Notice 1
#define Warning 2
#define Error 3//定义日志的几种状态
const std::string msg[]={"Debug","Notice","Warning","Error"
};std::ostream &Log(std::string message,int level)
{//时间,日志信息std::cout<<"|"<<(unsigned)time(nullptr)<<"|"<<msg[level]<<"|"<<message;return std::cout;
}#endif
comm.hpp
#ifndef _COMM_H_
#define _COMM_H_#include <iostream>
#include <string>#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/shm.h>
#include <fcntl.h>
#include <unistd.h>
#include <cassert>
#include <cstring>
#include "log.hpp"//定义项目的路径,为了key生成
#define PATH_NAME "/home/hsj"
//定义项目的id,这里可以随意取
#define PROJ_ID 0x66//共享内存的大小,最好是以页(page:4096)的整数倍
#define SHM_SIZE 4096#endif
client.cc
#include "comm.hpp"int main()
{//创建我们上述的key值,用于我们两个进程都能相同的key值key_t k=ftok(PATH_NAME,PROJ_ID);//如果key值创建失败了,也就是我们的底层已经有了这一个key值,它会返回-1if(k<0){Log("create key failed",Error)<<"Client say:"<<k<<std::endl;exit(1);}//如果创建成功,就将其写入日志文件中Log("create key done",Debug)<<"Client say:"<<k<<std::endl;//获取共享内存//shmid(key值,需要开辟的共享内存大小,也就是定义在comm.hpp中的4096,以及创建共享内存的模式0就是我们上述的IPC_CREAT)int shmid=shmget(k,SHM_SIZE,0);//如果我们共享内存开辟失败,就返回错误信息,并且退出if(shmid<0){Log("create key failed",Error)<<"Client say:"<<k<<std::endl;exit(2);}Log("create key success",Debug)<<"Client say:"<<k<<std::endl;sleep(10);//shmat(打开的共享内存地址,链接地址shmaddr设置为nullptr,系统会帮我们填写,shmflg系统也会帮我们填写)char* shmaddr=(char*)shmat(shmid,nullptr,0);if(shmaddr==nullptr){Log("attach shm failed",Error)<<"Client say:"<<k<<std::endl;exit(3);}Log("attach shm success",Debug)<<"Client say:"<<k<<std::endl;sleep(10);//去关联//将共享内存段与当前进程脱离int n=shmdt(shmaddr);//如果脱离失败了,就打印日志assert(n!=-1);(void)n;Log("detach shm success",Debug)<<"Client say:"<<k<<std::endl;sleep(10);//client不需要释放共享内存资源,由server端释放即可return 0;
}
server.cc
#include "comm.hpp"//将k转换成十六进制输出
std::string TransToHex(key_t k)
{char buffer[32];snprintf(buffer,sizeof buffer,"0x%x",k);return buffer;
}int main()
{//1.创建公共的key值//这里创建的key值的参数和我们的客户端是一样的,所以我们能够得到一个和客户端一样的key值key_t k=ftok(PATH_NAME,PROJ_ID);//如果此时创建失败了,则断言assert(k!=-1);(void)k;Log("create key done",Debug)<<"Server say:"<<TransToHex(k)<<std::endl;//2.创建共享内存,建议要创建一个全新的共享内存//0666操作权限int shmid=shmget(k,SHM_SIZE,IPC_CREAT|IPC_EXCL|0666);//如果创建失败了就写日志,并且退出if(shmid==-1){perror("shmget");exit(1);}Log("create shm done",Debug)<<"shmid:"<<shmid<<std::endl;sleep(10);//3.将指定的共享内存挂载到自己的地址空间char* shmaddr=(char*)shmat(shmid,nullptr,0);Log("attach shm done",Debug)<<"shmid:"<<shmid<<std::endl;sleep(10);//这里是进行通信的逻辑代码//4.将指定的共享内存,从自己的地址空间去关联int n=shmdt(shmaddr);assert(n!=-1);(void)n;Log("detach shm done",Debug)<<"shmid:"<<shmid<<std::endl;sleep(10);//最后删除共享内存,IPC_RMIDn=shmctl(shmid,IPC_RMID,nullptr);assert(n!=-1);(void)n;Log("del shm done",Debug)<<"shmid:"<<shmid<<std::endl;return 0;
}
我们首先启动监控脚本来查看共享内存,然后启动服务端,然后启动客户端。这里可以发现服务端先启动开辟了共享内存,将共享内存挂接到自己的地址空间当中之后。等待10s之后,取消关联,然后等待10s后,服务端释放了共享内存的空间。而客户端主要在启动服务端之后,将共享内存挂接到自己的地址空间后等待10s之后,取消关联。这里我们可以发现nattach是从0变成1,也就是服务器关联了自己的地址空间,然后变为2,是由于客户端也关联了自己的地址空间。然后变为1了由于服务端退出,后来变为0是由于客户端也退出了。最后没有了,是由于共享内存由服务端释放了。
堆栈之间的共享区域是属于内核还是用户?
这一部分区域是属于用户空间的。也就是不需要进行系统调用,可以直接进行访问,双方进程如果想要通信,直接进行内存级的读写即可。
为什么pipe,fifo都需要通过read,write来进行通信?
由于read、write是系统调用接口,都属于系统调用。其调用的管道本质上都是文件,而文件是内核当中特定的数据结构,是由操作系统进行维护的。而我们的共享内存在堆栈之间的,是属于用户空间。上述代码主要是能让两个进程都能够看到同一份资源。
3.进程间通信
server.cc
#include "comm.hpp"//将k转换成十六进制输出
std::string TransToHex(key_t k)
{char buffer[32];snprintf(buffer,sizeof buffer,"0x%x",k);return buffer;
}int main()
{//1.创建公共的key值//这里创建的key值的参数和我们的客户端是一样的,所以我们能够得到一个和客户端一样的key值key_t k=ftok(PATH_NAME,PROJ_ID);//如果此时创建失败了,则断言assert(k!=-1);(void)k;Log("create key done",Debug)<<"Server say:"<<TransToHex(k)<<std::endl;//2.创建共享内存,建议要创建一个全新的共享内存//0666操作权限int shmid=shmget(k,SHM_SIZE,IPC_CREAT|IPC_EXCL|0666);//如果创建失败了就写日志,并且退出if(shmid==-1){perror("shmget");exit(1);}Log("create shm done",Debug)<<"shmid:"<<shmid<<std::endl;//3.将指定的共享内存挂载到自己的地址空间char* shmaddr=(char*)shmat(shmid,nullptr,0);Log("attach shm done",Debug)<<"shmid:"<<shmid<<std::endl;//这里是进行通信的逻辑代码char buffer[SHM_SIZE];while(true){//读到什么就打印什么printf("%s\n",shmaddr);if(strcmp(shmaddr,"quit")==0) break;sleep(1);}strcmp(shmaddr,"quit");//4.将指定的共享内存,从自己的地址空间去关联int n=shmdt(shmaddr);assert(n!=-1);(void)n;Log("detach shm done",Debug)<<"shmid:"<<shmid<<std::endl;//最后删除共享内存,IPC_RMIDn=shmctl(shmid,IPC_RMID,nullptr);assert(n!=-1);(void)n;Log("del shm done",Debug)<<"shmid:"<<shmid<<std::endl;return 0;
}
client.cc
#include "comm.hpp"int main()
{Log("child pid is:",Debug)<<getpid()<<std::endl;//创建我们上述的key值,用于我们两个进程都能相同的key值key_t k=ftok(PATH_NAME,PROJ_ID);//如果key值创建失败了,也就是我们的底层已经有了这一个key值,它会返回-1if(k<0){Log("create key failed",Error)<<"Client say:"<<k<<std::endl;exit(1);}//如果创建成功,就将其写入日志文件中Log("create key done",Debug)<<"Client say:"<<k<<std::endl;//获取共享内存//shmid(key值,需要开辟的共享内存大小,也就是定义在comm.hpp中的4096,以及创建共享内存的模式0就是我们上述的IPC_CREAT)int shmid=shmget(k,SHM_SIZE,0);//如果我们共享内存开辟失败,就返回错误信息,并且退出if(shmid<0){Log("create key failed",Error)<<"Client say:"<<k<<std::endl;exit(2);}Log("create key success",Debug)<<"Client say:"<<k<<std::endl;//shmat(打开的共享内存地址,链接地址shmaddr设置为nullptr,系统会帮我们填写,shmflg系统也会帮我们填写)char* shmaddr=(char*)shmat(shmid,nullptr,0);if(shmaddr==nullptr){Log("attach shm failed",Error)<<"Client say:"<<k<<std::endl;exit(3);}Log("attach shm success",Debug)<<"Client say:"<<k<<std::endl;//进程间通信逻辑//client将共享内存看做一个char类型的bufferchar a='a';for(;a<='c';a++){//我们每一次都向shmaddr[共享内存的起始地址]写入snprintf(shmaddr,SHM_SIZE-1,"hello server,我是其他进程,我的pid,%d,inc:%c\n",getpid(),a);sleep(5);}//向共享内存中拷贝一个quitstrcpy(shmaddr,"quit");//去关联//将共享内存段与当前进程脱离int n=shmdt(shmaddr);//如果脱离失败了,就打印日志assert(n!=-1);(void)n;Log("detach shm success",Debug)<<"Client say:"<<k<<std::endl;//client不需要释放共享内存资源,由server端释放即可return 0;
}
结论一:只要通信双方都使用了共享内存,一方直接向共享内存中写入数据,另一方马上就可以看到这些数据。所以共享内存是所有进程间通信(IPC)速度最快的!
因为共享内存不需要太多的拷贝,不需要将操作数据交给操作系统。
这里只有两次拷贝,是因为进程A中由于键盘中输入的数据直接放入了共享内存中,进程B直接将共享内存中的数据打印到显示器上。
对于管道来说:
- 1.从键盘到我们自己定义的缓冲区是第一次拷贝
- 2.自己定义的缓冲区到管道文件是第二次拷贝
- 3.从管道文件到到用户层缓冲区是第三次拷贝
- 4.再由用户层缓冲区打印到显示屏中是第四次拷贝。
1.手动输入,服务端输出的通信版本
client.cc
#include "comm.hpp"int main()
{Log("child pid is:",Debug)<<getpid()<<std::endl;//创建我们上述的key值,用于我们两个进程都能相同的key值key_t k=ftok(PATH_NAME,PROJ_ID);//如果key值创建失败了,也就是我们的底层已经有了这一个key值,它会返回-1if(k<0){Log("create key failed",Error)<<"Client say:"<<k<<std::endl;exit(1);}//如果创建成功,就将其写入日志文件中Log("create key done",Debug)<<"Client say:"<<k<<std::endl;//获取共享内存//shmid(key值,需要开辟的共享内存大小,也就是定义在comm.hpp中的4096,以及创建共享内存的模式0就是我们上述的IPC_CREAT)int shmid=shmget(k,SHM_SIZE,0);//如果我们共享内存开辟失败,就返回错误信息,并且退出if(shmid<0){Log("create key failed",Error)<<"Client say:"<<k<<std::endl;exit(2);}Log("create key success",Debug)<<"Client say:"<<k<<std::endl;//shmat(打开的共享内存地址,链接地址shmaddr设置为nullptr,系统会帮我们填写,shmflg系统也会帮我们填写)char* shmaddr=(char*)shmat(shmid,nullptr,0);if(shmaddr==nullptr){Log("attach shm failed",Error)<<"Client say:"<<k<<std::endl;exit(3);}Log("attach shm success",Debug)<<"Client say:"<<k<<std::endl;//进程间通信逻辑while(true){//向0号文件中读取数据,也就是从标准输入读取数据直接放入我们的缓冲区中,大小为SHM_SIZEssize_t s=read(0,shmaddr,SHM_SIZE-1);//如果我们数据读取成功了if(s>0){shmaddr[s-1]=0;if(strcmp(shmaddr,"quit")==0) break;}}//向共享内存中拷贝一个quitstrcpy(shmaddr,"quit");//去关联//将共享内存段与当前进程脱离int n=shmdt(shmaddr);//如果脱离失败了,就打印日志assert(n!=-1);(void)n;Log("detach shm success",Debug)<<"Client say:"<<k<<std::endl;//client不需要释放共享内存资源,由server端释放即可return 0;
}
server.cc
#include "comm.hpp"//将k转换成十六进制输出
std::string TransToHex(key_t k)
{char buffer[32];snprintf(buffer,sizeof buffer,"0x%x",k);return buffer;
}int main()
{//1.创建公共的key值//这里创建的key值的参数和我们的客户端是一样的,所以我们能够得到一个和客户端一样的key值key_t k=ftok(PATH_NAME,PROJ_ID);//如果此时创建失败了,则断言assert(k!=-1);(void)k;Log("create key done",Debug)<<"Server say:"<<TransToHex(k)<<std::endl;//2.创建共享内存,建议要创建一个全新的共享内存//0666操作权限int shmid=shmget(k,SHM_SIZE,IPC_CREAT|IPC_EXCL|0666);//如果创建失败了就写日志,并且退出if(shmid==-1){perror("shmget");exit(1);}Log("create shm done",Debug)<<"shmid:"<<shmid<<std::endl;//3.将指定的共享内存挂载到自己的地址空间char* shmaddr=(char*)shmat(shmid,nullptr,0);Log("attach shm done",Debug)<<"shmid:"<<shmid<<std::endl;//这里是进行通信的逻辑代码char buffer[SHM_SIZE];while(true){//读到什么就打印什么printf("%s\n",shmaddr);if(strcmp(shmaddr,"quit")==0) break;sleep(1);}strcmp(shmaddr,"quit");//4.将指定的共享内存,从自己的地址空间去关联int n=shmdt(shmaddr);assert(n!=-1);(void)n;Log("detach shm done",Debug)<<"shmid:"<<shmid<<std::endl;//最后删除共享内存,IPC_RMIDn=shmctl(shmid,IPC_RMID,nullptr);assert(n!=-1);(void)n;Log("del shm done",Debug)<<"shmid:"<<shmid<<std::endl;return 0;
}
结论二:共享内存缺乏访问控制
共享内存给我们提供了一种快速访问内存的操作机制,所以我们的共享内存没有任何关于访问控制。无论是共享内存中有没有数据,我们的server端都会不停地进行读取,甚至读取和写入方都不知道对方是否存在!不会因为没有内容就读取。这就会导致并发的问题。
如果我们不停地向服务端发送信息,没有控制的话,我们服务端读取的信息可能不完整,这就会导致数据不一致问题。
2.通过管道控制共享内存的访问控制
Makefile
.PHONY:all
all:client serverclient:client.ccg++ -o $@ $^ -std=c++11
server:server.ccg++ -o $@ $^ -std=c++11.PHONY:clean
clean:rm -f client server
log.hpp
#ifndef _LOG_H_
#define _LOG_H_#include <iostream>
#include <ctime>#define Debug 0
#define Notice 1
#define Warning 2
#define Error 3//定义日志的几种状态
const std::string msg[]={"Debug","Notice","Warning","Error"
};std::ostream &Log(std::string message,int level)
{//时间,日志信息std::cout<<"|"<<(unsigned)time(nullptr)<<"|"<<msg[level]<<"|"<<message;return std::cout;
}#endif
comm.hpp
#pragma once#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/shm.h>
#include <fcntl.h>
#include <unistd.h>
#include <cassert>
#include <cstring>
#include "log.hpp"//定义项目的路径,为了key生成
#define PATH_NAME "/home/hsj"
//定义项目的id,这里可以随意取
#define PROJ_ID 0x66//共享内存的大小,最好是以页(page:4096)的整数倍
#define SHM_SIZE 4096//创建一个管道文件名
#define FIFO_NAME "./fifo"//创建一个类,用来创建管道
class Init
{
public:Init(){//将权限掩码设置为0umask(0);//创建我们的管道,分别传入管道的地址,还有我们的读写的权限int n=mkfifo(FIFO_NAME,0666);//判断我们的管道是否创建成功assert(n==0);(void)n;Log("create fifo success",Notice)<<"\n";}~Init(){//将管道进行删除unlink(FIFO_NAME);Log("remove fifo success",Notice)<<"\n";}
};//定义读取和写入模式
#define READ O_RDONLY
#define WRITE O_WRONLY//封装接口,打开我们的文件
int OpenFIFO(std::string pathname,int flags)
{//要打开的文件的路径还有打开文件的模式int fd=open(pathname.c_str(),flags);//判断是否打开成功assert(fd>=0);return fd;
}//让进程进行等待
void Wait(int fd)
{Log("等待中....",Notice)<<"\n";//将此时的tmp写入到管道中uint32_t tmp=0;//将数据从fd管道中读取到我们的tmp中,读取4个字节的大小ssize_t s=read(fd,&tmp,sizeof(uint32_t));assert(s==sizeof(uint32_t));(void)s;
}//唤醒另外一个进程
void Signal(int fd)
{uint32_t tmp=1;//将我们的1写入我们管道中ssize_t s=write(fd,&tmp,sizeof(uint32_t));assert(s==sizeof(uint32_t));(void)s;Log("唤醒中....",Notice)<<"\n";
}//关闭我们的管道文件
void CloseFifo(int fd)
{close(fd);
}
client.cc
#include "comm.hpp"int main()
{Log("child pid is:",Debug)<<getpid()<<std::endl;//创建我们上述的key值,用于我们两个进程都能相同的key值key_t k=ftok(PATH_NAME,PROJ_ID);//如果key值创建失败了,也就是我们的底层已经有了这一个key值,它会返回-1if(k<0){Log("create key failed",Error)<<"Client say:"<<k<<std::endl;exit(1);}//如果创建成功,就将其写入日志文件中Log("create key done",Debug)<<"Client say:"<<k<<std::endl;//获取共享内存//shmid(key值,需要开辟的共享内存大小,也就是定义在comm.hpp中的4096,以及创建共享内存的模式0就是我们上述的IPC_CREAT)int shmid=shmget(k,SHM_SIZE,0);//如果我们共享内存开辟失败,就返回错误信息,并且退出if(shmid<0){Log("create key failed",Error)<<"Client say:"<<k<<std::endl;exit(2);}Log("create key success",Debug)<<"Client say:"<<k<<std::endl;//shmat(打开的共享内存地址,链接地址shmaddr设置为nullptr,系统会帮我们填写,shmflg系统也会帮我们填写)char* shmaddr=(char*)shmat(shmid,nullptr,0);if(shmaddr==nullptr){Log("attach shm failed",Error)<<"Client say:"<<k<<std::endl;exit(3);}Log("attach shm success",Debug)<<"Client say:"<<k<<std::endl;int fd=OpenFIFO(FIFO_NAME,WRITE);//进程间通信逻辑while(true){//向0号文件中读取数据,也就是从标准输入读取数据直接放入我们的缓冲区中,大小为SHM_SIZEssize_t s=read(0,shmaddr,SHM_SIZE-1);//如果我们数据读取成功了if(s>0){shmaddr[s-1]=0;//客户端写入成功之后,唤醒服务端Signal(fd);if(strcmp(shmaddr,"quit")==0) break;}}CloseFifo(fd);//去关联//将共享内存段与当前进程脱离int n=shmdt(shmaddr);//如果脱离失败了,就打印日志assert(n!=-1);(void)n;Log("detach shm success",Debug)<<"Client say:"<<k<<std::endl;//client不需要释放共享内存资源,由server端释放即可return 0;
}
server.cc
#include "comm.hpp"Init init;
//将k转换成十六进制输出
std::string TransToHex(key_t k)
{char buffer[32];snprintf(buffer,sizeof buffer,"0x%x",k);return buffer;
}int main()
{//1.创建公共的key值//这里创建的key值的参数和我们的客户端是一样的,所以我们能够得到一个和客户端一样的key值key_t k=ftok(PATH_NAME,PROJ_ID);//如果此时创建失败了,则断言assert(k!=-1);(void)k;Log("create key done",Debug)<<"Server say:"<<TransToHex(k)<<std::endl;//2.创建共享内存,建议要创建一个全新的共享内存//0666操作权限int shmid=shmget(k,SHM_SIZE,IPC_CREAT|IPC_EXCL|0666);//如果创建失败了就写日志,并且退出if(shmid==-1){perror("shmget");exit(1);}Log("create shm done",Debug)<<"shmid:"<<shmid<<std::endl;//3.将指定的共享内存挂载到自己的地址空间char* shmaddr=(char*)shmat(shmid,nullptr,0);Log("attach shm done",Debug)<<"shmid:"<<shmid<<std::endl;int fd=OpenFIFO(FIFO_NAME,READ);//这里是进行通信的逻辑代码char buffer[SHM_SIZE];while(true){//等待客户端唤醒Wait(fd);//读到什么就打印什么printf("%s\n",shmaddr);if(strcmp(shmaddr,"quit")==0) break;}//4.将指定的共享内存,从自己的地址空间去关联int n=shmdt(shmaddr);assert(n!=-1);(void)n;Log("detach shm done",Debug)<<"shmid:"<<shmid<<std::endl;//最后删除共享内存,IPC_RMIDn=shmctl(shmid,IPC_RMID,nullptr);assert(n!=-1);(void)n;Log("del shm done",Debug)<<"shmid:"<<shmid<<std::endl;CloseFifo(fd);return 0;
}
上述代码就是利用了管道的特性,如果管道中没有内容,我们的读取进程就会被阻塞,如果管道已经满了,我们的管道的写入端也会被阻塞。
五、system V消息队列
1.消息队列的基本原理
消息队列实际上就是在系统当中创建了一个队列,队列当中的每个成员都是一个数据块,这些数据块都由类型和信息两部分构成,两个互相通信的进程通过某种方式看到同一个消息队列,这两个进程向对方发数据时,都在消息队列的队尾添加数据块,这两个进程获取数据块时,都在消息队列的队头取数据块,如下图:
- 其中消息队列当中的某一个数据块是由谁发送给谁的,取决于数据块的类型
总结:
- 消息队列提供了一个从一个进程向另一个进程发送数据块的方法
- 每个数据块都被认为是有一个类型的,接收者进程接收的数据块可以有不同的类型值
- 和共享内存一样,消息队列的资源也必须自行删除,否则不会自动清除,因为system V IPC资源的生命周期是随内核的
2.消息队列的数据结构
当然,系统当中也可能会存在大量的消息队列,系统一定也要为消息队列维护相关的内核数据结构。
struct msqid_ds {struct ipc_perm msg_perm;struct msg *msg_first; /* first message on queue,unused */struct msg *msg_last; /* last message in queue,unused */__kernel_time_t msg_stime; /* last msgsnd time */__kernel_time_t msg_rtime; /* last msgrcv time */__kernel_time_t msg_ctime; /* last change time */unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */unsigned long msg_lqbytes; /* ditto */unsigned short msg_cbytes; /* current number of bytes on queue */unsigned short msg_qnum; /* number of messages in queue */unsigned short msg_qbytes; /* max number of bytes on queue */__kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */__kernel_ipc_pid_t msg_lrpid; /* last receive pid */
};
可以看到消息队列数据结构的第一个成员是msg_perm
,它和shm_perm
是同一个类型的结构体变量,ipc_perm
结构体的定义如下:
struct ipc_perm{__kernel_key_t key;__kernel_uid_t uid;__kernel_gid_t gid;__kernel_uid_t cuid;__kernel_gid_t cgid;__kernel_mode_t mode;unsigned short seq;
};
消息队列的数据结构
msqid_ds
和ipc_perm
结构体分别在/usr/include/linux/msg.h和/usr/include/linux/ipc.h中定义
3.消息队列的创建(msgget)
创建消息队列我们需要用msgget函数,msgget函数的函数原型如下:
int msgget(key_t key, int msgflg);
说明:
- 创建消息队列也需要使用ftok函数生成一个key值,这个key值作为msgget函数的第一个参数
- msgget函数的第二个参数,与创建共享内存时使用的shmget函数的第三个参数相同
- 消息队列创建成功时,msgget函数返回的一个有效的消息队列标识符(用户层标识符)
4.消息队列的释放(msgctl)
释放消息队列我们需要用msgctl函数,msgctl函数的函数原型如下:
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
说明:
msgctl函数的参数与释放共享内存时使用的shmctl函数的三个参数相同,只不过msgctl函数的第三个参数传入的是消息队列的相关数据结构
5.消息队列发送数据方法(msgsnd)
向消息队列发送数据我们需要用msgsnd函数,msgsnd函数的函数原型如下:
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
msgsnd函数的参数说明:
- 第一个参数msqid,表示消息队列的用户级标识符
- 第二个参数msgp,表示待发送的数据块
- 第三个参数msgsz,表示所发送数据块的大小
- 第四个参数msgflg,表示发送数据块的方式,一般默认为0即可
其中msgsnd函数的第二个参数必须为以下结构:
<span style="color:#273849"><span style="background-color:#ffffff"><span style="color:#657b83"><span style="background-color:#fdf6e3"><code class="language-c"><span style="color:#859900">struct</span> <span style="color:#b58900">msgbuf</span><span style="color:#586e75">{</span><span style="color:#859900">long</span> mtype<span style="color:#586e75">;</span> <span style="color:#93a1a1">/* message type, must be > 0 */</span><span style="color:#859900">char</span> mtext<span style="color:#586e75">[</span><span style="color:#268bd2">1</span><span style="color:#586e75">]</span><span style="color:#586e75">;</span> <span style="color:#93a1a1">/* message data */</span><span style="color:#93a1a1">//该结构当中的第二个成员mtext即为待发送的信息,当我们定义该结构时,mtext的大小可以自己指定</span>
<span style="color:#586e75">}</span><span style="color:#586e75">;</span></code></span></span></span></span>
msgsnd函数的返回值说明:
- msgsnd调用成功,返回0
- msgsnd调用失败,返回-1
6.消息队列获取数据方法(msgrcv)
从消息队列获取数据我们需要用msgrcv函数,msgrcv函数的函数原型如下:
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
参数:
- 第一个参数msqid,表示消息队列的用户级标识符
- 第二个参数msgp,表示获取到的数据块,是一个输出型参数
- 第三个参数msgsz,表示要获取数据块的大小
- 第四个参数msgtyp,表示要接收数据块的类型
返回值:
- msgsnd调用成功,返回实际获取到mtext数组中的字节数
- msgsnd调用失败,返回-1
六、信号量
1.信号量基本概念
- 由于进程要求共享资源,而且有些资源需要互斥使用,因此各进程间竞争使用这些资源,进程的这种关系叫做进程互斥
- 系统中某些资源一次只允许一个进程使用,称这样的资源为临界资源或互斥资源
- 在进程中涉及到临界资源的程序段叫临界区
- IPC资源必须删除,否则不会自动删除,因为system V IPC的生命周期随内核
基于对于共享内存的理解:
- 1.为了让进程间通信,首先让不同的进程看到同一份资源。
- 2.之前的通信方式,本质上都是优先解决一个问题,让我们的不同进程看到同一份资源,比如共享内存,但是也带来了一些时序问题,造成了数据不一致的问题。
临界资源:我们把多个进程(执行流)看到的公共一份资源被称之为临界资源。
临界区:我们把自己的进程,访问临界资源的代码称为临界区。
所以多个执行流互相运行的时候互相干扰,主要是我们不加保护地访问了同样的一份资源(临界资源)。在非临界区,多个执行流互相是不影响的!为了更好地进行临界区的保护,可以让多执行流在任何时候,都只有一个进程进入临界区,也就是我们所说的互斥。(就是你需要等我访问完了,你才能够访问)
互斥的本质是串行,但是串行的话,我们的多线程的效率就会降低。
举例说明:
如果我们去电影院里看电影,电影院中里面只有一个VIP放映的位置。我们都想要去座那个位置。但是这个位置在任何时刻只能由一个人座。当我坐的时候,你不能打扰我。我占有这个座位,就称为我正在执行自己的临界区代码。这就是互斥。但是,如果我们的电影院中实际上有几百个位置,一般所有人在看电影之前都需要买票,来解决谁坐在哪里的问题。
这里电影院就是我们的临界资源。我们每个人都想看电影,也就是我们每一个人都想执行自己的临界区代码,看电影一定要有座位(放映厅里面的一个资源)那么这个座位真正属于你,是不是你自己坐在这个位置上,这个座位才属于你呢?并不是!我们为了避免多个人竞争同一个位置,我们先买票。在整个放映期间,这个位置都是属于你的。也就是说,只要你买了票,你就拥有这个座位的使用权。这里的买票的本质就是座位的预定机制。
如果这些进程想要访问这个临界区资源的不同部分,只要保证这些进程在临界区中访问的是不同的部分,就能够保证这些进程并发地执行。这就好比是我们上面的例子中,有200个人看电影,每一个人都有一个不同的位置的话,这两百个人就可以同时看一场电影。我们必须让每一个进程想进入临界资源,访问临界资源中的一部分,我们不能让进程直接去使用临界资源,正如不能让用户直接去电影院占座位。也就是说你得先申请信号量(也就是说你得先买票!)
信号量的本质就是一个计数器,也及时类似于int count =n;(不准确)每一个进程想要进入临界区都需要申请信号量。
申请信号量的本质:
让信号量计数器--
只要申请信号量成功,我们临界资源内部一定给你预留了你想要的资源。
申请信号量的本质其实是对临界资源的一种预定机制
申请信号量->访问临界资源->释放信号量
释放信号量也就是让我们的信号量++
不能用一个整型去代表信号量。因为即便是父子进程,在发生n--的时候,也会发生写时拷贝,并不能同时对一个n进行操作,父子进程的n是一人一个的,没办法同时进行影响。
假设能让多个进程(整数n在共享内存中)看到同一份全局变量,大家都进行申请信号量n--也是不可以的!对于我们的client端先n--,写入数据到共享内存中n++,对于我们的server端,也先n--,读取共享内存,然后n++,但是他们同时对一个变量进行n--的时候是会出问题的!
在计算机中,进行n--,其是一种数据运算,计算机的CPU具有计算能力,而我们的数据放在n的位置上,而n在内存中。也就是计算要在CPU内,而变量在内存中。那么CPU在执行我们的指令的时候
- 1.首先需要load将内存中的数据加载到cpu内的寄存器中(读指令)
- 2.n--(分析&&执行指令)
- 3.将CPU修改完毕的n写回内存(写回结果)
执行流在执行的时候,在任何时候都可能被切换!上面的执行--操作有三步,随时可能会被切换掉。而寄存器只有一套,被所有的执行流共享,但是寄存器里面的数据属于每一个执行流,是与该执行流的上下文!进程在被切换的时候,是需要进行上下文保护&&上下文恢复的。假设这里我们的n是5,当客户端中n进入CPU中,此时我们得到n等于5,也就是我们运行到上述的第一步,我们的客户端进程就被切换掉了,我们将客户端对应的数据和需要执行第几步(n=5,第二步)都记录下来。那我们寄存器中的n还是2,我们进程客户端中的n是5。然后服务端进程到CPU上运行了,将n变为4。当我们的客户端重新加载,我们的客户端回来的时候需要将我们的原先的进程上下文信息恢复,也就是直接将我们此信号量重置为5,然后执行--操作,变成了4。也就代表着我们此时的信号量已经不能准确地表示我们的进入临界区的进程个数了!因为时序问题,而导致n中有中间状态,可能导致数据不一致!我们称这里的n为不安全的。如果一个n--操作只有一行汇编,那么该操作是原子的!(我们上面的n--并不是原子的)
原子性:要么不做,要么做完,没有中间状态,我们就将其称为原子性!
信号量计数器
- 申请信号量-》计数器-- -》P操作-》必须是原子的
- 释放信号量-》计数器++ -》V操作-》必须是原子的
总结:
- 信号量是对临界资源的预定机制!
- 信号量本身也是一份临界资源,也需要保持其原子性想要让我们所有的进程都看到这一份同样的信号量就需要进程间通信。(看到一份同样的资源)
2.信号量的数据结构
在系统当中也为信号量维护了相关的内核数据结构
信号量的数据结构如下:
struct semid_ds {struct ipc_perm sem_perm; /* permissions .. see ipc.h */__kernel_time_t sem_otime; /* last semop time */__kernel_time_t sem_ctime; /* last change time */struct sem *sem_base; /* ptr to first semaphore in array */struct sem_queue *sem_pending; /* pending operations to be processed */struct sem_queue **sem_pending_last; /* last pending operation */struct sem_undo *undo; /* undo requests on this array */unsigned short sem_nsems; /* no. of semaphores in array */
};
信号量数据结构的第一个成员也是ipc_perm
类型的结构体变量,ipc_perm
结构体的定义如下:
struct ipc_perm{__kernel_key_t key;__kernel_uid_t uid;__kernel_gid_t gid;__kernel_uid_t cuid;__kernel_gid_t cgid;__kernel_mode_t mode;unsigned short seq;
};
共享内存的数据结构
msqid_ds
和ipc_perm
结构体分别在/usr/include/linux/sem.h和/usr/include/linux/ipc.h中定义
3.信号量的创建(semget)
创建信号量集我们需要用semget函数,semget函数的函数原型如下:
int semget(key_t key, int nsems, int semflg);
说明一下:
- 创建信号量集也需要使用ftok函数生成一个key值,这个key值作为semget函数的第一个参数
- semget函数的第二个参数nsems,表示创建信号量的个数
- semget函数的第三个参数,与创建共享内存时使用的shmget函数的第三个参数相同
- 信号量集创建成功时,semget函数返回的一个有效的信号量集标识符(用户层标识符)
4.信号量的释放(semctl)
删除信号量集我们需要用semctl函数,semctl函数的函数原型如下:
int semctl(int semid, int semnum, int cmd, ...);
5.信号量的操作(semop)
对信号量集进行操作我们需要用semop函数,semop函数的函数原型如下:
int semop(int semid, struct sembuf *sops, unsigned nsops);