linux---生产者和消费者模型

server/2024/10/18 9:32:53/

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

一、堵塞队列

(1)三种关系

生产者vs生产者:互斥(加锁)

消费者vs消费者:互斥(加锁)

生产者vs消费者:互斥和同步(加锁和条件变量)

(2)代码实现

Makefile

test:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f test

BlockQueue.hpp

#include<iostream>
#include<queue>
#include<unistd.h>
#define M 10
template<class T>
class BlockQueue{
public:
BlockQueue(T cap=M)
:_capacity(M)
{pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_pcond,nullptr);pthread_cond_init(&_ccond,nullptr);}bool IsFull()
{return q.size()==_capacity;
}
bool empty()
{return q.size()==0;
}void Push( T in)
{pthread_mutex_lock(&_mutex);//if(!IsFull())//可能出现伪唤醒while(IsFull())//健壮性{pthread_cond_wait(&_pcond,&_mutex);}q.push(in);std::cout<<"push:"<<in<<std::endl;sleep(1);pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_mutex);
}void Pop()
{pthread_mutex_lock(&_mutex);while(empty())//if(!empty())//如果这里使用if判断的话,可能出现伪唤醒问题。{pthread_cond_wait(&_ccond,&_mutex);}auto n=q.front();std::cout<<"pop:"<<n<<std::endl;sleep(1);q.pop();pthread_cond_signal(&_pcond);pthread_mutex_unlock(&_mutex);}~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}private:pthread_mutex_t _mutex;pthread_cond_t _pcond;pthread_cond_t _ccond;int _capacity=M;std::queue<T> q;};

Main.cc 

#include<pthread.h>
#include"BlockQueue.hpp"
#include<iostream>
void* consumer(void* argv)
{BlockQueue<int>* q=static_cast<BlockQueue<int>*>(argv);int i=0;while(true){  q->Pop();}return nullptr;
}
void * productor(void* argv)
{BlockQueue<int>* q=static_cast<BlockQueue<int>*>(argv);while(true){int data=rand()%10+1;q->Push(data);}
return nullptr;
}int main()
{srand((unsigned)time(NULL)^getpid()^pthread_self());pthread_t c,p;BlockQueue<int>* bq=new BlockQueue<int>();pthread_create(&c,nullptr,consumer,bq);pthread_create(&p,nullptr,productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}

(3)总结

以上是单生产和单消费,对于多生产和多消费也是可以的,因为是同一个队列,同一把锁,同一把锁就决定了,生产者和生产者,消费者和消费者之间就是互斥的,生产者和消费者的条件变量提供了同步。

队列中的数据也可以是任务,堵塞队列可以实现高并发高效率,在与每个线程都拿到了自己的任务

并且处理任务,处理任务也是需要时间的,这个决定了,每个线程拿到任务都在跑自己的任务代码,实现高并发。同时条件变量让生产者和消费者进行同步,保证了安全性。

1.消费者和生产者调度优先级

如果消费者线程先调度,队列为空,消费者就会在条件变量下进行等待,等生产者生产商品了,就会唤醒消费者进行消费。

2.如何控制生产消费的节奏

我们可以通过sleep控制。比如消费者消费进行休眠的话,可以给生产者足够的时间进行生产

3.伪唤醒

如果用if来判断队列为空可能会出现伪唤醒,有些线程处于等待堵塞,竞争锁的状态,一旦队列为空而线程竞争到了锁就会出现队列为空依然进行pop的现象。

while(true)可以提供检查,if判断可能有风险。


环形队列

(1)POSIX信号量

posix和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步.

快速认识接口:

(1)初始化

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数: pshared:0表示线程间共享,非零表示进程间共享value:信号量初始值

(2)销毁

int sem_destroy(sem_t *sem);
 

(3)等待

功能:等待信号量,会将信号量的值减1

int sem_wait(sem_t *sem);//p()
 

(3)通知

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。int sem_post(sem_t *sem);//V()
 

三种关系

(2)对于环形队列,生产者和消费者就有两种场景会指向同一个位置。要么为空,要么为满,其他情况不会指向同一个位置。

1.如果队列为空,只能让生产者先生产,消费者不可以消费---------互斥

2.如果队列为满,只能让消费者先消费,然后到生产者生产---------同步

3.其余情况,消费者都是在生产者后面的,两个位置不同,即使pop和push同时进行,也是安全的,这个就是多线程高并发可以进入临界区的原因。

如何实现多线程中的生产者和生产者,消费者和消费者的互斥问题,对于循环队列,我们要定义两把锁,一个是push队列的锁,一个是pop队列的锁。pv操作是原子性的,让生产者和消费者进行同步其中又可以体现互斥。

代码实现

makefile

ringtest:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f ringtest

Main.cc

#include<iostream>
#include<pthread.h>
#include"RingQueue.hpp"
#include<pthread.h>void* producter(void* args)
{while(true){ RingQueue<int>* rq=static_cast<RingQueue<int>*>(args);int n=1;n=rand()%3+1;rq->Push(n);}}
void* consumer(void* argv)
{while(true)
{RingQueue<int>* q=static_cast<RingQueue<int>*>(argv);int date=0;q->Pop(&date);}}int main()
{srand(time(nullptr));pthread_t p,c;
pthread_mutex_t pm,cm;LockGuard pmutex(&pm),cmutex(&cm);RingQueue<int> ringqueue(cmutex,pmutex);
pthread_create(&p,nullptr,consumer,&ringqueue);
pthread_create(&c,nullptr,producter,&ringqueue);pthread_join(p,nullptr);pthread_join(c,nullptr);return 0;
}

RingQueue.hpp 

#include<vector>
#include<iostream>
#include"LockGuard.hpp"
#include<unistd.h>
const int Size =10;
template<class T>
class RingQueue{
private:void P(sem_t& sem){sem_wait(&sem);}void V(sem_t& sem){sem_post(&sem);}
public:
RingQueue(LockGuard c,LockGuard p,int s=Size)
:size(s),pmutex(p),cmutex(c),q(s),ppose(0),cpose(0)
{
sem_init(&psem,0,size);
sem_init(&csem,0,0);}// 生产
void Push(const T& in)
{// 先加锁,还是先申请信号量?先申请信号量,效率高。申请到资源的线程,只有竞争到锁,就可以生产了。P(psem);{pmutex;q[ppose] = in;std::cout<<"生产:"<<in<<std::endl;ppose++;ppose %=size;}V(csem);
}
void Pop(T* out)
{P(csem);{cmutex;*out = q[cpose];sleep(1);std::cout<<"消费:"<<*out<<std::endl;cpose++;cpose %=size;}V(psem);}
~RingQueue()
{sem_destroy(&psem);sem_destroy(&csem);
}private:int size;std::vector<int> q;int ppose;//生产者位置int cpose;//消费者位置sem_t psem;//生产者信号量sem_t csem;//消费者信号量LockGuard pmutex;LockGuard cmutex;};
#pragma once
#include<pthread.h>
#include <semaphore.h>
class Mutex{
public:Mutex(pthread_mutex_t* mutex):_Mutex(mutex){pthread_mutex_init(_Mutex,nullptr);}void Lock(){pthread_mutex_lock(_Mutex);}void unlock(){pthread_mutex_unlock(_Mutex);}~Mutex(){pthread_mutex_destroy(_Mutex);}private:pthread_mutex_t* _Mutex;
};class LockGuard{
public:LockGuard(pthread_mutex_t* lock):mutex(lock){mutex.Lock();}~LockGuard(){mutex.unlock();}private:Mutex mutex;};


http://www.ppmy.cn/server/47700.html

相关文章

我有点想用JDK17了

大家好呀&#xff0c;我是summo&#xff0c;JDK版本升级的非常快&#xff0c;现在已经到JDK20了。JDK版本虽多&#xff0c;但应用最广泛的还得是JDK8&#xff0c;正所谓“他发任他发&#xff0c;我用Java8”。 其实我也不太想升级JDK版本&#xff0c;感觉投入高&#xff0c;收…

2024最新python入门教程|python安装|pycharm安装

前言&#xff1a;在安装PyCharm之前&#xff0c;首先需要明确PyCharm是一款功能强大的Python集成开发环境&#xff08;IDE&#xff09;&#xff0c;由JetBrains公司开发。PyCharm旨在通过提供智能代码补全、语法高亮、代码检查、快速导航和重构等丰富的编码辅助工具&#xff0c…

【Java面试】十二、Kafka相关

文章目录 1、Kafka如何保证消息不丢失1.1 生产者发消息到Brocker丢失&#xff1a;设置异步发送1.2 消息在Broker存储时丢失&#xff1a;发送确认机制1.3 消费者从Brocker接收消息丢失1.4 同步 异步组合提交偏移量 2、Kafka如何保证消费的顺序性3、Kafka高可用机制3.1 集群模式…

编程奇境:C++之旅,从新手村到ACM/OI算法竞赛大门(武器:递归与递推)

上一期我们已经拿起了我们的第一把武器&#xff1a;排序算法&#xff0c;今天我们介绍第二把武器&#xff1a;递归与递推。 让我们用一个生活中的故事来形象地解释递归与递推这两个概念。 递归的故事&#xff1a;俄罗斯套娃 想象一下&#xff0c;你收到了一个精美的俄罗斯套…

CentOS 7基础操作07_Linux复制、删除、移动目录和文件

1、cp——复制(Copy)文件或目录 cp命令用于复制文件或目录,将需要复制的文件或目录&#xff08;源)重建一份并保存为新的文件或目录(可保存到其他目录中)。cp命令的基本使用格式如下&#xff1a; cp [选项]... 源文件或目录... 目标文件或目录 需要复制多个文件或目录…

【Tello无人机】Tello飞行控制

【Tello无人机】Tello飞行控制 上一篇介绍了Tello无人机的仿真环境搭建&#xff0c;本篇将介绍tello无人机在pybullet环境中的飞行控制&#xff0c;实现无人机的速度控制。本环境最终要实现强化学习算法下的飞行任务&#xff0c;故采用通用的gym接口进行环境搭建。 Gym环境接口…

Java:流程控制语句

文章目录 一、顺序结构二、分支结构2.1 if2.2 switch 三、循环结构3.1 for3.2 while3.3 do...while 四、流程控制4.1 break4.2 continue 五、结语 一、顺序结构 顺序结构语句是Java程序默认的执行流程&#xff0c;按照代码的先后顺序&#xff0c;从上到下依次执行。 二、分支结…

使用Rufus工具制作Ubuntu To Go——很详细

一、准备工作 准备工具&#xff1a; 1、下载Rufus(主角)软件 2、准备一个U盘或硬盘&#xff08;小白128G足够&#xff0c;装Ubuntu系统&#xff09; 3、下载Ubuntu系统镜像文件 1、下载软件Rufus 先来看一下官网介绍&#xff1a; Rufus 是一款格式化和创建 USB 启动盘的辅助工…