生产消费者模式

ops/2024/10/22 10:49:50/

6. 生产消费者模式 Producer-Consumer模式

6.1 概念

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的

image-20240926174733001

由于多个线程都访问了同一个阻塞队列,所以会有并发问题

  • 生产者 vs 生产者:互斥
  • 消费者 vs 消费者:互斥
  • 生产者 vs 消费者:互斥,为了防止饥饿问题,也需要同步

所以这里有3种关系,2个角色(生产者和消费者),1个交易场所(特定结构的内存空间)


该模型的优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

注意,生产者生产的数据也是要花时间获取的,当有数据时,消费者做数据的加工处理,也是要花时间的

所以,不要只看到生产者生产数据到队列的过程,当阻塞队列队列满时,生产者在等待队列下等待过程中,是可以做获取数据的工作的。
同理,不要只看到消费者从队列中消费数据的过程,当阻塞队列为空时,消费者在等待队列下等待过程中,是可以做数据的加工处理动作的。
这样,这两个或者多个线程就并发高效的处理数据了,在多生产和多消费体现明显,少量的线程在等待,大量的线程在获取数据和加工数据。

6.2 基于 BlockingQueue 的生产者-消费者模式

这里是单生产者单消费者

// BlockQueue.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <queue>
using namespace std;template<class T>
class BlockQueue
{static const int NUM = 10;
public:BlockQueue(int num = NUM) : _max(num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_pCond, nullptr);pthread_cond_init(&_cCond, nullptr);}const T pop(){pthread_mutex_lock(&_mutex);// 判断临界资源是否满足也是在访问临界资源,所以要在加锁之后if(_q.size() == 0) {// 队列中没有值,不需要再删除了,让它去等待队列中去等pthread_cond_wait(&_cCond, &_mutex);	// 调用的时候会自动释放锁,因唤醒而返回时,又会重新持有锁}const T x = _q.front();_q.pop();pthread_cond_signal(&_pCond);   // 消费者已经消费数据了,此时就可以唤醒生产者了。pthread_mutex_unlock(&_mutex);return x;}void push(const T& x){   pthread_mutex_lock(&_mutex);if(_q.size() == _max) {// 已经到了最大值,不需要再添加了,让它去等待队列中去等pthread_cond_wait(&_pCond, &_mutex);}_q.push(x);pthread_cond_signal(&_cCond);   // 生产者已经生产数据了,此时就可以唤醒消费者了。pthread_mutex_unlock(&_mutex);  }~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_pCond);pthread_cond_destroy(&_cCond);}private:queue<T> _q;pthread_mutex_t _mutex;pthread_cond_t _cCond;  // 消费者条件变量pthread_cond_t _pCond;  // 生产者条件变量size_t _max;    // 队列能放的最大值
};
// main.cc
#include "BlockQueue.hpp"void* Consumer(void* args)
{BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);while(true) {int data = bq->pop();printf("消费者消费了一个数据:%d\n", data);sleep(2);}}void* Producer(void* args)
{BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);int data = 0;while(true) {data++;bq->push(data);printf("生产者生产了一个数据:%d\n", data);}
}int main()
{BlockQueue<int>* bq = new BlockQueue<int>();pthread_t c, p;pthread_create(&c, nullptr, Consumer, bq);pthread_create(&p, nullptr, Producer, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}

因为消费者消费的比较慢,所以消费者消费一个,生产者就生产一个

s

6.3 生产和消费Task

阻塞队列中存储的不仅仅可以是int,可以是自定义类型

// Task.hpp
#include <iostream>
#include <string>
using namespace std;enum TaskStatus
{DIV_ZERO=1,MOD_ZERO,UNKOWN
};char opStr[4] = {'+','-','*','/'};class Task
{
public:Task(int a, int b, char op) : _a(a), _b(b), _op(op) {}void Run(){switch (_op){case '+':_result = _a + _b;break;case '-':_result = _a - _b;break;case '*':_result = _a * _b;break;case '/':if (_b == 0)    _exitCode = DIV_ZERO;else    _result = _a / _b;break;case '%':if (_b == 0)    _exitCode = MOD_ZERO;else    _result = _a % _b;break;default:_exitCode = UNKOWN;break;}}string GetResult(){string resultStr;resultStr += to_string(_a);resultStr += _op;resultStr += to_string(_b);resultStr += '=';resultStr += to_string(_result);resultStr += "  ";if (_exitCode != 0){resultStr += "error";resultStr += "  ";switch (_exitCode){case DIV_ZERO:resultStr += "div zero";break;case MOD_ZERO:resultStr += "mod zero";break;case UNKOWN:resultStr += "unkown";break;}}elseresultStr += "ok";return resultStr;}string GetTask(){string resultStr;resultStr += to_string(_a);resultStr += _op;resultStr += to_string(_b);resultStr += '=';resultStr += "???";return resultStr;}private:int _a = 0;int _b = 0;char _op = ' ';int _exitCode = 0;int _result = 0;
};
// main.cc
#include "BlockQueue.hpp"
#include "Task.hpp"void* Consumer(void* args)
{BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);while(true) {Task t = bq->pop();t.Run();printf("消费者获得数据: %s, 运算结果是: %s\n", t.GetTask().c_str(), t.GetResult().c_str());// sleep(1);}}void* Producer(void* args)
{BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);int x = 0, y = 0;char op = ' ';while(true) {x = rand() % 100;     // [0, 99]y = rand() % 100;     // [0, 99]op = opStr[rand() % 4];Task t(x, y, op);usleep(10);              // 模拟获取数据需要时间bq->push(t);printf("生产者生产了一个任务:%s\n", t.GetTask().c_str());// sleep(1);}
}int main()
{srand((unsigned int)time(nullptr));BlockQueue<Task>* bq = new BlockQueue<Task>();pthread_t c, p;pthread_create(&p, nullptr, Producer, bq);pthread_create(&c, nullptr, Consumer, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}

image-20241004114412458

6.4 误唤醒

假设现在有一个消费者,多个生产者在它的条件变量下正在进行等待,现在消费者刚刚消费一个数据,阻塞队列于是就恰好有一个空位置,于是该消费者就去唤醒生产者,但是使用的不是pthread_cond_signal(&_pCond);而是pthread_cond_broadcast(&_pCond);

void push(const T& x)
{   pthread_mutex_lock(&_mutex);if(_q.size() == _max) {// 已经到了最大值,不需要再添加了,让它去等待队列中去等pthread_cond_wait(&_pCond, &_mutex);}_q.push(x);pthread_cond_signal(&_cCond);   // 生产者已经生产数据了,此时就可以唤醒消费者了。pthread_mutex_unlock(&_mutex);  
}

于是,这几个被唤醒的生产者就不在条件变量下等待了,而是都跑过去去竞争锁,但只有一个能竞争成功,该生产者于是就继续向下执行,向阻塞队列中push()数据。当该生产者生产完数据,准备唤醒消费者并解锁的时候,消费者不一定竞争成功锁,因为还有那些同样在刚才竞争锁资源失败的生产者在锁资源下等着呢!如果这些生产者又拿到锁,像阻塞队列中push()数据,就会发生错误,因为阻塞队列已经到_max了!

一个线程在条件满足被唤醒的时候,但是历史上的条件满足已经其它线程处理掉了,于是该线程只能等待,当它再次被唤醒,进行数据访问,就可能出错,我们把这种现象称为该线程被误唤醒了。或者叫虚假唤醒。即本不应该被唤醒线程被唤醒了,导致程序执行结果错误。


为了防止线程被虚假唤醒,判断临界资源是否可以被消费或生产的时候要用while()循环,循环判断,当条件不满足的时候,让该线程重新去等待队列中等待,而不是一股脑的一直在竞争锁资源

const T pop() 
{// ...while(_q.size() == 0) {}// ...
}
void push(const T& x)
{// ...while(_q.size() == _max) {}// ...
}

6.5 多生产者多消费者

static const size_t C_NUM = 5;
static const size_t P_NUM = 5;
// ...
int main()
{srand((unsigned int)time(nullptr));BlockQueue<Task>* bq = new BlockQueue<Task>();pthread_t c[C_NUM], p[P_NUM];for(size_t i = 0; i < P_NUM;++i)pthread_create(c+i, nullptr, Producer, bq);for(size_t i = 0; i < C_NUM;++i)pthread_create(p+i, nullptr, Consumer, bq);for(size_t i = 0; i < P_NUM;++i)pthread_join(c[i], nullptr);for(size_t i = 0; i < C_NUM;++i)pthread_join(p[i], nullptr);delete bq;return 0;
}

image-20241004144049161

由于BlockQueue中用了一把锁,所以生产者和生产者的互斥关系,生产者和消费者的互斥关系,消费者和消费者的互斥问题,都可以解决
生产和消费者的同步问题,我们使用了两个条件变量来解决
即便是多生产者多消费者,任何时刻,只允许一个线程来访问临界资源,让它来进行生产或消费


http://www.ppmy.cn/ops/121178.html

相关文章

vue之vuex的使用及举例

Vuex是专门为Vue.js设计的集中式状态管理架构&#xff0c;它允许你将所有的组件共享状态存储在一个单独的地方&#xff0c;即“store”&#xff0c;并以相应的规则保证状态以一种可预测的方式发生变化。以下是Vuex的基本使用方法&#xff1a; 一、安装Vuex 对于Vue 2项目&…

服务器开通个人账户

给服务器添加新用户&#xff0c;然后后续的软链接操作 &#xff08;文件不要放到home&#xff09;放到data盘中 1、添加新用户 首先登录 root 账户 # 创建xxx用户 sudo useradd -m -s /bin/bash xxx # 添加密码 sudo passwd xxx # 对应位置建立文件夹xxx-dir 数据盘位置建…

Python编写的贪吃蛇小游戏

安装包 pip install pygame完整代码 import pygame import randompygame.init()# 定义颜色 white (255, 255, 255) black (0, 0, 0) red (213, 50, 80) green (0, 255, 0) blue (50, 153, 213)# 定义屏幕大小 dis_width 800 dis_height 600dis pygame.display.set_mo…

SpringBootTest Mockito 虚实结合编写测试

SpringBootTest & Mockito 虚实结合测试 起因 单一使用mockito&#xff0c;会出现很多mock困难的问题&#xff0c;导致测试编写过程太长&#xff0c;太恶心 单一使用springboottest&#xff0c;会遇到需要外部接口的地方&#xff0c;这个时候就非得去真实调用才行。也很恶…

android compose ScrollableTabRow indicator 指示器设置宽度

.requiredWidth(30.dp) Box(modifier Modifier.background(Color.LightGray).fillMaxWidth()) {ScrollableTabRow(selectedTabIndex selectedTabIndex, // 默认选中第一个标签containerColor ColorPageBg,edgePadding 1.dp, // 内容与边缘的距离indicator { tabPositions…

探索私有化聊天软件:即时通讯与音视频技术的结合

在数字化转型的浪潮中&#xff0c;企业对于高效、安全、定制化的通讯解决方案的需求日益迫切。鲸信&#xff0c;作为音视频通信技术的佼佼者&#xff0c;凭借其强大的即时通讯与音视频SDK&#xff08;软件开发工具包&#xff09;结合能力&#xff0c;为企业量身打造了私有化聊天…

Mac制作Linux操作系统启动盘

前期准备 一个 Mac 电脑 一个 U 盘&#xff08;8GB 以上&#xff09; 下载好 Linux 系统镜像&#xff08;iso 文件&#xff09; 具体步骤 挂载 U 盘 解挂 U 盘 写系统镜像到 U 盘 完成 一、挂载 U 盘 首先插入 U 盘&#xff0c;打开终端输入下面的命令查看 U 盘是否已经 m…

力扣题解 983

大家好&#xff0c;欢迎来到无限大的判断&#xff0c;祝大家国庆假期愉快 题目描述&#xff08;中等&#xff09; 最低票价 在一个火车旅行很受欢迎的国度&#xff0c;你提前一年计划了一些火车旅行。在接下来的一年里&#xff0c;你要旅行的日子将以一个名为 days 的数组给出…