基于BlockingQueue的生产者消费者模型

embedded/2024/9/23 11:19:19/

在这里插入图片描述

文章目录

  • 引言
  • 理解生产者消费者模型
  • 基于BlockingQueue的生产者消费者模型
    • 单生产,单消费模型
    • 多生产、多消费模型

引言

生产者消费者模型一般可以在超市中听到,例如如下是一个专门卖方便面的超市,这个超市有自己供应商,也有客户来买,客户称之为消费者。超市起到一个缓存作用,供应商放假的时候,短时间内超市依然有对应的商品,消费者依然可以消费;相同的,如果短时间内消费者不来买东西,供应商依然可以供应给超市。也就是说,供应商生产产品比较慢,可以先生成一批产品放在超市中;供应商如果供应比较快,可以等消费者消费一段时间再去供应产品,协调忙线不均。现实生活中,在人口密集的地方肯定会有超市,生产者消费者模型效率高,有了超市这个巨大的缓存,可以使得消费者和生产者并发起来。
在这里插入图片描述

个别消费者不想买方便面不会影响到供应商,个别供应商出现了问题,不会影响消费者买方便面,这就做到了生产者和消费者的解耦

理解生产者消费者模型

上述例子对应到计算机中,供应商和消费者就是线程,超市是一段内存空间,方便面是数据。生产线程将数据交到一段内存空间中,消费线程从内存空间中将数据拿走。

“321原则”:

  1. 一个交易场所(特定数据结构的形式存在的一段内存空间)
  2. 两种角色:生产者、消费者,也就是生产线程和消费线程
  3. 三种关系:生产和生产(互斥关系)、消费和消费(互斥关系)、生产和消费(互斥关系、同步关系)

实现生产者消费者模型本质就是通过代码实现“321原则”,用锁和条件变量(或者其他形式)来实现三种关系。

基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
在这里插入图片描述

单生产,单消费模型

//BlockQueue.hpp
#pragma once#include<iostream>
#include<string>
#include<queue>
#include<pthread.h>const static int defaultcap=5;template<typename T>
class BlockQueue
{
private:bool isFull(){return _block_queue.size()==_max_cap;}bool isEmpty(){return _block_queue.empty();}public:BlockQueue(int cap= defaultcap):_max_cap(cap){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_P_cond,nullptr);pthread_cond_init(&_C_cond,nullptr);}void Pop(T *out){pthread_mutex_lock(&_mutex);while(isEmpty()){pthread_cond_wait(&_C_cond,&_mutex);}*out=_block_queue.front();_block_queue.pop();pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_P_cond);  //唤醒生产者}void Equeue(const T &in){pthread_mutex_lock(&_mutex);while(isFull()) //阻塞队列满{//满了生产者不能再生产,必须等待pthread_cond_wait(&_P_cond,&_mutex); //被调用的时候,除了让自己继续排队等待,还会释放自己传递的锁//函数返回时,会返回在临界区,必须先参与锁的竞争,重新加上锁,该函数才会返回,依然是持有锁的状态}//阻塞队列未满或者被唤醒_block_queue.push(in);  //生产数据到阻塞队列pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_C_cond);  //唤醒消费者}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_P_cond);pthread_cond_destroy(&_C_cond);}private:std::queue<T> _block_queue;  //临界资源int _max_cap;pthread_mutex_t _mutex;pthread_cond_t _P_cond;  //生产者条件变量pthread_cond_t _C_cond;  //消费者条件变量
};

Pop 函数:从队列中取出元素,并将其存储在 out 指针指向的地址中。步骤如下:

  • 锁定互斥量:通过 pthread_mutex_lock(&_mutex) 确保对队列的操作是线程安全的。
  • 等待条件变量:如果队列为空,使用 pthread_cond_wait(&_C_cond, &_mutex) 等待消费者条件变量被信号唤醒。
  • 取出元素:从队列中取出前面的元素,并将其弹出。
  • 解锁互斥量:通过 pthread_mutex_unlock(&_mutex) 解锁。
  • 唤醒生产者:使用 pthread_cond_signal(&_P_cond) 唤醒可能被阻塞的生产者线程。

Equeue 函数:将元素 in 插入队列。步骤如下:

  • 锁定互斥量:通过 pthread_mutex_lock(&_mutex) 确保对队列的操作是线程安全的。
  • 等待条件变量:如果队列已满,使用 pthread_cond_wait(&_P_cond, &_mutex) 等待生产者条件变量被信号唤醒。
  • 插入元素:将新元素插入到队列中。
  • 解锁互斥量:通过 pthread_mutex_unlock(&_mutex) 解锁。
  • 唤醒消费者:使用 pthread_cond_signal(&_C_cond) 唤醒可能被阻塞的消费者线程。

为了体现阻塞队列的特点,分别设计了两种测试代码:

  1. 生产一个,消费一个
#include"BlockQueue.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>void *Consumer(void *args)
{BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);while(true){//获取数据int data=0;bq->Pop(&data);//处理数据std::cout<<"Coumer -> "<<data<<std::endl;}
}void *Productor(void *args)
{srand(time(nullptr)^getpid());BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);while(true){sleep(2);//构建数据int data=rand()%10+1;  // [1,10]//生产数据bq->Equeue(data);std::cout<<"Productor -> "<<data<<std::endl;}
}int main()
{BlockQueue<int> *bq=new BlockQueue<int>();pthread_t c,p;pthread_create(&c,nullptr,Consumer,bq);pthread_create(&p,nullptr,Productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}

在这里插入图片描述

  1. 先生产一批数据,直到队列开始阻塞,然后消费一个,生产一个
#include"BlockQueue.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>void *Consumer(void *args)
{BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);while(true){sleep(2);//获取数据int data=0;bq->Pop(&data);//处理数据std::cout<<"Coumer -> "<<data<<std::endl;}
}void *Productor(void *args)
{srand(time(nullptr)^getpid());BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);while(true){//构建数据int data=rand()%10+1;  // [1,10]//生产数据bq->Equeue(data);std::cout<<"Productor -> "<<data<<std::endl;}
}int main()
{BlockQueue<int> *bq=new BlockQueue<int>();pthread_t c,p;pthread_create(&c,nullptr,Consumer,bq);pthread_create(&p,nullptr,Productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}

在这里插入图片描述

上述测试代码是传递一个int类型的数据到阻塞队列中,也可以传递其他类型,在传递struct或者class类型时,可以封装成一个个的任务传递到阻塞队列中。

  • 传递任务:
//Task.hpp
#pragma once
#include<iostream>
#include<string>class Task
{public:Task(){}Task(int x,int y):_x(x),_y(y){}void Excute(){_result=_x+_y;}std::string debug(){std::string msg=std::to_string(_x)+"+"+std::to_string(_y)+"=?";return msg;}std::string result(){std::string msg=std::to_string(_x)+"+"+std::to_string(_y)+"="+std::to_string(_result);return msg;}private:int _x;int _y;int _result;
};
#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>void *Consumer(void *args)
{BlockQueue<Task> *bq=static_cast<BlockQueue<Task> *>(args);while(true){sleep(2);//获取数据Task t;bq->Pop(&t);// bq->Pop(&data);//处理数据t.Excute();std::cout<<"Coumer -> "<<t.result()<<std::endl;}
}void *Productor(void *args)
{srand(time(nullptr)^getpid());BlockQueue<Task> *bq=static_cast<BlockQueue<Task> *>(args);while(true){//构建数据int x=rand()%10+1;usleep(x*1000);int y=rand()%10+1;Task t(x,y);//生产数据bq->Equeue(t);std::cout<<"Productor -> "<<t.debug()<<std::endl;}
}int main()
{BlockQueue<Task> *bq=new BlockQueue<Task>();pthread_t c,p;pthread_create(&c,nullptr,Consumer,bq);pthread_create(&p,nullptr,Productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}

在这里插入图片描述

  • 传递函数任务:
//Task.hpp
#pragma once
#include<iostream>
#include<string>
#include<functional>using task_t=std::function<void()>;void Download()
{std::cout<<"I am Download task"<<std::endl;
}
//main.cc
#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>void *Consumer(void *args)
{BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);while(true){sleep(2);//获取数据task_t t;bq->Pop(&t);//处理数据t();}
}void *Productor(void *args)
{srand(time(nullptr)^getpid());BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);while(true){bq->Equeue(Download);std::cout<<"Productor -> Download "<<std::endl;}
}int main()
{BlockQueue<task_t> *bq=new BlockQueue<task_t>();pthread_t c,p;pthread_create(&c,nullptr,Consumer,bq);pthread_create(&p,nullptr,Productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}

在这里插入图片描述

多生产、多消费模型

创建两个消费者线程 c1c2,它们会并行地从队列中取出任务并处理。创建三个生产者线程 p1p2p3,它们会并行地将任务放入队列中。

//main.cc#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>void *Consumer(void *args)
{BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);while(true){//获取数据task_t t;bq->Pop(&t);t();}
}void *Productor(void *args)
{srand(time(nullptr)^getpid());BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);while(true){bq->Equeue(Download);std::cout<<"Productor -> Download "<<std::endl;sleep(1);}
}int main()
{BlockQueue<task_t> *bq=new BlockQueue<task_t>();pthread_t c1,c2,p1,p2,p3;pthread_create(&c1,nullptr,Consumer,bq);pthread_create(&c2,nullptr,Consumer,bq);pthread_create(&p1,nullptr,Productor,bq);pthread_create(&p1,nullptr,Productor,bq);pthread_create(&p3,nullptr,Productor,bq);pthread_join(c1,nullptr);pthread_join(c2,nullptr);pthread_join(p1,nullptr);pthread_join(p2,nullptr);pthread_join(p3,nullptr);return 0;
}

在这里插入图片描述

在这里插入图片描述


http://www.ppmy.cn/embedded/97965.html

相关文章

鸿蒙内核源码分析(寄存器篇) | 宇宙最忙存储器

寄存器的本质 寄存器从大一的计算机组成原理就开始听到它&#xff0c;感觉很神秘&#xff0c;如梦如雾多年.揭开本质后才发现&#xff0c;寄存器就是一个32位的存储空间&#xff0c;一个int变量而已&#xff0c;但它的厉害之处在于极高频率的使用&#xff0c;让人不敢相信是怎…

白酒品鉴的艺术:品味与感悟的很好结合

白酒&#xff0c;作为中国千年的文化瑰宝&#xff0c;其品鉴不仅是一种味觉上的享受&#xff0c;更是一种精神上的追求。在品鉴白酒的过程中&#xff0c;品味与感悟如同双翼&#xff0c;共同构筑起一场艺术与文化的盛宴。今天&#xff0c;我们就来探讨一下白酒品鉴的艺术&#…

什么是线程和应用?线程和进程区别是什么?

一、线程和应用的定义 1. 线程&#xff1a; • 线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中&#xff0c;是进程中的实际运作单位。 • 一个线程指的是进程中一个单一顺序的控制流&#xff0c;一个进程中可以并发多个线程&#xff0c;每条线程并行执行不同…

Haskell爬虫中日志记录:监控HTTP请求与响应

在当今信息爆炸的时代&#xff0c;数据抓取成为了获取信息的重要手段。Haskell&#xff0c;以其强大的类型系统和函数式编程特性&#xff0c;成为了编写高效、可靠爬虫的理想选择。然而&#xff0c;随着爬虫的运行&#xff0c;监控其行为变得尤为重要。本文将探讨如何在Haskell…

SQLserver中的exists

在 SQL Server 中&#xff0c;EXISTS 是一个布尔子句&#xff0c;用于检查子查询是否返回任何行。如果子查询返回至少一行数据&#xff0c;EXISTS 将返回 TRUE&#xff1b;如果子查询没有返回任何行&#xff0c;EXISTS 将返回 FALSE。EXISTS 通常用于 WHERE 或 HAVING 子句中&a…

Redisson

一、分布式锁与本地锁 在高并发环境下&#xff0c;本地锁只能锁住当前的进程&#xff0c;无法锁住其他进程。 在分布式环境下&#xff0c;本地锁只能锁住当前服务&#xff0c;无法锁住其他服务器上部署的服务&#xff0c;集群环境需要共享一把锁&#xff0c;因此需要使用分布…

根据《广东省制造业高质量发展促进条例》规定,支持___ 投资制造业领域,加强技术改造与创新。

根据《广东省制造业高质量发展促进条例》规定&#xff0c;支持___ 投资制造业领域&#xff0c;加强技术改造与创新。点击查看答案 A、国有资本 B、外商 C、民营企业 D、各类市场主体 根据《广州市支持民营经济发展条例》规定&#xff0c;每年___为广州民营经济服务周&#xff…

生产环境中MapReduce的最佳实践

目录 MapReduce跑的慢的原因 MapReduce常用调优参数 1. MapTask相关参数 2. ReduceTask相关参数 3. 总体调优参数 4. 其他重要参数 调优策略 MapReduce数据倾斜问题 1. 数据预处理 2. 自定义Partitioner 3. 调整Reduce任务数 4. 小文件问题处理 5. 二次排序 6. 使用…