『 Linux 』基于阻塞队列的生产者消费者模型

news/2025/3/15 16:40:27/

文章目录

    • 生产者-消费者模型概述
    • 生产者消费者模型的高效性
    • 虚假唤醒
    • 信号丢失
    • 生产者消费者模型的模拟实现
    • 参考代码


生产者-消费者模型概述

请添加图片描述

生产者消费者模型是一种多线程设计模式,常见于解决多个生产者线程和多个消费者线程之间如何安全有效地共享数据;

该模型中存在三种关系,两个角色和一个交易场所;

两种角色分别为 消费者生产者 ;

  • 生产者

    生产者用于生产数据或任务,并将其放入共享区域中;

  • 消费者

    消费者负责从共享区域中读取数据或任务并进行处理;

一个交易场所指的是一块特定结构的内存空间,该区域用于充当生产者和消费者之间的中介,用于暂存数据,其中该空间可以是有限的也可以是无限的;

三种关系分别为 生产者与生产者 , 消费者与消费者 , 生产者与消费者 ;

  • 生产者与生产者

    生产者之间必须是互斥关系;

    多个生产者同时向共享空间写入数据时,需要互斥访问以避免共享空间状态的竞争和数据损坏;

    可通过互斥锁确保同一时刻只能有一个生产者向共享空间中写入数据;

  • 消费者与消费者

    消费者之间必须是互斥关系;

    多个消费者同时从缓冲区中读取数据时需要互斥访问以避免共享空间状态的竞争和数据损坏;

    可通过互斥锁确保同一时刻只有一个消费者可以从共享空间读取数据;

  • 生产者与消费者

    生产者与消费者需要既存在互斥关系也存在同步关系;

    • 互斥关系

      生产者和消费者都需要互斥的访问共享空间以避免数据竞争和数据不一致;

      通过互斥锁确保当一个线程(生产者或消费者)正在访问共享空间时其他线程不能同时访问;

    • 同步关系

      生产者和消费者需要在某些条件下等待对方的操作完成;

      例如当共享空间中数据高与一定数量时生产者需要等待消费者消费数据,当共享空间内数据低于一个数量时消费者需要等待生产者生产数据;

      需通过条件变量实现线程之间的同步,使生产者和消费者在需要等待时等待并在条件满足时被唤醒;

这种模型的设计的优点为:

  • 支持忙闲不均

    生产者和消费者可以以不同的速率进行工作,例如生产者写入数据或任务的速率大于消费者或者相反;

    其中共享空间使得生产者和消费者的速率不必严格匹配从而增强了系统应对负载波动的能力;

  • 对生产者和消费者进行解耦

    解耦意味着生产者和消费者不需要直接相互依赖或协调,他们通过共享缓冲区间接相互交互;

    不需要直接依赖对方的实现,是系统更加模块化和灵活,同时易于拓展和维护;


生产者消费者模型的高效性

请添加图片描述

生产者消费者模型是一种高效的设计模型;

其高效性不体现在于在加锁时生产者和消费者对共享资源的串型访问;

而是对于生产者生产数据前需先接收数据,消费者在消费数据后需要对数据进行加工处理;

本质上是对于非临界资源的处理,即可能当生产者在接收数据(访问非临界资源)时消费者正在消费数据(访问临界资源),或是生产者在生产数据(访问临界资源)时,消费者在处理加工数据(访问非临界资源);

而对于非临界资源的访问与处理也是具有时间开销的;

当一个生产者或是消费者正在访问非临界资源时不影响对端访问临界资源从而并发操作,提高了模型整体的高效性;

主要体现在以下方面:

  • 解耦生产和消费过程允许生产和消费以不同速率进行
  • 通过缓冲区平衡生产和消费的速度差异提高整体吞吐量
  • 允许生产者和消费者在各自的非临界区并行工作(一端访问临界,一端访问非临界)
  • 支持多个生产者和消费者并发操作进一步提高并行度


虚假唤醒

请添加图片描述

虚假唤醒指的是一个线程在没有收到明确的唤醒信号的情况下从条件变量的等待状态中被唤醒;

这种唤醒不是由程序逻辑触发的而是由系统或底层实现导致的;

void *threadRouding(void *args){// ... 其他操作pthread_mutex_lock(&mutex_);if (条件不满足) {pthread_cond_wait(&c_cond_, &mutex_);}// ... 其他操作pthread_mutex_unlock(&mutex_);
}

以该段代码为例,在多线程环境中,线程在进入函数时首先获取互斥锁,利用if判断是否条件不满足,若是条件不满足则会加入至条件变量的等待队列中;

假设共享资源不停在变化,在某一刻时使用pthread_cond_broadcast()唤醒了所有线程后,此时应只有一个线程成功获取锁并向下执行;

而其他线程被唤醒后应因条件不满足而继续等待,但在该段代码中若是其他线程也被唤醒后不会再次对条件变量条件状态进行检查而直接向下执行,此时其他线程则是一种虚假唤醒;

或者是共享资源在不停变化,当一个线程被pthread_cond_signal()唤醒时其共享资源中的状态已经不满足条件变量状态,但此时并未对条件变量状态进行重新检查,当线程被唤醒后向下执行时共享资源的状态已经发生了变化从而导致虚假唤醒;

故在使用条件变量来判断共享资源状态时应用while()循环来判断,使得当一个或若干个线程被虚假唤醒时能循环判断条件是否满足而决定是否继续向下执行;


信号丢失

请添加图片描述

信号丢失是指一个线程向一个条件变量发送线程唤醒信号时其条件变量中的等待队列并不含任何线程;

当该条件变量中的等待队列进入线程时该信号已经丢失,从而导致在条件应唤醒对应线程时没有任何线程能够接受到对应的唤醒信号从而依旧保持等待状态;

// 线程A(信号发送者)
pthread_mutex_lock(&mutex);
// 改变共享状态
pthread_cond_signal(&cond);  // 发送信号,但此时可能没有线程在等待
pthread_mutex_unlock(&mutex);// 线程B(潜在的接收者,但尚未进入等待状态)
// ... 一些其他操作 ...
pthread_mutex_lock(&mutex);
while (!condition) {pthread_cond_wait(&cond, &mutex);  // 当这里执行时,信号可能已经发送并丢失
}
pthread_mutex_unlock(&mutex);

以该段代码为例,其中 线程A 因条件变量条件满足时向条件变量发送状态表示条件满足需要唤醒其中一个或多个线程;

但此时条件变量中的等待队列中不含有任何线程;

信号丢失一般情况下会发生在线程在进入等待队列中的这个时间间隙,没有及时进入等待队列导致信号丢失;

  • 信号丢失与虚假唤醒的区别

    • 信号丢失

      信号丢失是有实际信号发出,但是没有线程成功接收;

    • 虚假唤醒

      虚假唤醒是线程在没有实际唤醒信号的情况下被唤醒;

信号丢失可能导致程序死锁或功能错误;

在设计时使用条件变量的多线程程序时必须考虑并防止信号丢失;


生产者消费者模型的模拟实现

请添加图片描述

以单消费者单生产者为例:

/* BlockQueue.hpp */#ifndef BLOCK_QUEUE_HPP
#define BLOCK_QUEUE_HPP#include <pthread.h>
#include <unistd.h>#include <iostream>
#include <queue>
template <class T>
class BlockQueue {static const int defaultnum = 10; // 设置初始最大容量public:BlockQueue(int maxcap = defaultnum) : maxcap_(maxcap) {pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&c_cond_, nullptr);pthread_cond_init(&p_cond_, nullptr);low_water_ = maxcap_ / 3;hight_water_ = maxcap_ * 2 / 3;}T pop() {  // 消费 Consumerpthread_mutex_lock(&mutex_);while (q_.size() == 0) { // 使用 while 循环防止线程虚假唤醒pthread_cond_wait(&c_cond_, &mutex_);}T out = q_.front();q_.pop();pthread_cond_signal(&p_cond_);pthread_mutex_unlock(&mutex_);return out;}void push(const T& in) {  // 生产 Productorpthread_mutex_lock(&mutex_); // 访问临界资源前进行上锁while (q_.size() == maxcap_) { // 进行条件判断 条件满足时pthread_cond_wait(&p_cond_, &mutex_);}q_.push(in);pthread_cond_signal(&c_cond_);pthread_mutex_unlock(&mutex_);}~BlockQueue() {pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&c_cond_);pthread_cond_destroy(&p_cond_);}private:std::queue<T> q_;  // 表示临界资源/* (STL容器不被保护 用户若是需要保护则需要自己上锁) */int maxcap_;  // 极值 表示该队列所能容纳的最大数据量pthread_mutex_t mutex_;  // 用于保护临界资源的互斥锁pthread_cond_t c_cond_;  // 条件变量pthread_cond_t p_cond_;int low_water_;    // 低水平位线int hight_water_;  // 高水平位线/*可通过加入水平位线条件控制生产者和消费者的生产消费策略如资源大于多少时通知消费者消费资源小于多少时通知生产者生产*/
};
#endif
  • 类的定义

    • BlockQueue<T>

      一个模板类,实现了一个线程安全的阻塞队列;

    • T

      队列中存储的元素类型;

  • 成员变量

    • q_

      一个STL队列容器,用于存储数据;

    • maxcap_

      一个极值,用于表明当前队列的最大容量;

    • mutex_

      互斥锁,用于保护临界资源;

    • c_cond_

      消费者条件变量,用于将不满足条件的消费者线程加载进对应的等待队列中;

    • p_cond_

      生产者条件变量,用于将不满足条件的生产者线程加载进对应的等待队列中;

    • low_water_hight_water_

      高低水平位线,可定义控制生产和消费策略,如容器数据大于多少时通知消费者消费,容器数据小于多少时通知生产者生产(该代码中未使用);

  • 构造函数

    初始化互斥锁和条件变量,设置最大容量和水平位线;

  • pop()

    该函数用于消费者进行消费动作,即从队列中获取一个数据;

    • 获取当前互斥锁
    • 队列为空时等待消费者条件变量
    • 取出队首元素并返回
    • 发信号给生产者条件变量通知生产者生产
    • 解锁互斥锁
  • push()

    该函数用于生产者进行生产动作,即将数据加入至队列容器中;

    • 获取互斥锁
    • 当队列满时等待生产者条件变量
    • 将元素加入队列
    • 发送信号给消费者条件变量通知消费者消费
    • 解锁互斥锁
  • 析构函数

    销毁互斥锁和条件变量;

该段代码为该生产者消费者模型的核心代码,数据类型既可传入内置类型也可传入自定义类型完成一系列任务;

假设存在一个任务类:

/* Task.hpp */#ifndef TASK_HPP
#define TASK_HPP
#include <iostream>// 定义错误代码枚举
enum { DIV_ERR = 1, MOD_ERR, NONE };class Task {public:// 构造函数:初始化所有成员变量Task(int num1, int num2, char oper): num1_(num1), num2_(num2), exit_code_(0), result_(0), oper_(oper) {}// 析构函数(当前为空)~Task() {}// 执行任务的主要函数void run() {switch (oper_) {case '+':result_ = num1_ + num2_;break;case '-':result_ = num1_ - num2_;break;case '*':result_ = num1_ * num2_;break;case '/': {if (num2_ == 0) {exit_code_ = DIV_ERR;  // 设置除零错误result_ = -1;          // 除零时结果设为-1} elseresult_ = num1_ / num2_;break;}case '%': {if (num2_ == 0) {exit_code_ = MOD_ERR;  // 设置模零错误result_ = -1;          // 模零时结果设为-1} elseresult_ = num1_ % num2_;break;}default:exit_code_ = NONE;  // 未知操作符break;}}// 重载()运算符,使对象可以像函数一样被调用void operator()() { run(); }// 获取计算结果int getresult() { return result_; }// 获取退出代码int getexitcode() { return exit_code_; }// 获取第一个操作数int getnum1() { return num1_; }// 获取第二个操作数int getnum2() { return num2_; }// 获取操作符char getoper() { return oper_; }private:int num1_;      // 第一个操作数int num2_;      // 第二个操作数int exit_code_; // 退出代码,用于表示操作是否成功int result_;    // 计算结果char oper_;     // 操作符
};#endif

这段代码定义了一个Task类,用于表示和执行简单的算术运算任务;

该类包含两个操作数和一个操作符,计算结果和退出代码;

构造函数初始化所有成员变量;

run()方法是核心功能,将根据操作符执行相应的算术运算;

对于除法和取模特别处理了除数为0的情况并设置相应的错误码;

重载了()运算符进行仿函数设置;

提供了几个getter方法来访问私有成员变量;

对应的测试函数为如下:

#include <string>
#include "BlockQueue.hpp"
#include "Task.hpp"
using namespace std;#include <unistd.h>
#include <ctime>// 定义可能的运算符
string opers = "+-*/%";// 消费者线程函数
void *Consumer(void *args) {// 将传入的参数转换为 BlockQueue<Task> 指针BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while (true) {// 从队列中取出一个任务Task task = bq->pop();// 执行任务task();// 打印任务执行结果printf("The thread-%3lu handled a task , %2d %c %2d = %3d , exit code : %d\n",pthread_self() % 1000, task.getnum1(), task.getoper(),task.getnum2(), task.getresult(), task.getexitcode());cout << "------------------------------------" << endl;}return nullptr;
}// 生产者线程函数
void *Productor(void *args) {int len = opers.size();// 将传入的参数转换为 BlockQueue<Task> 指针BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while (true) {// 休眠0.5秒,控制生产速度usleep(500000);// 随机生成两个操作数和一个运算符int data1 = rand() % 10;int data2 = rand() % 10;char op = opers[rand() % len];// 创建新任务Task t(data1, data2, op);// 将任务推入队列bq->push(t);// 打印生成的任务信息printf("The thread-%3lu push a task ,%2d %c %2d = ? \n", pthread_self() % 1000,data1, op, data2);}return nullptr;
}int main() {// 初始化随机数生成器srand(time(nullptr));// 创建一个 BlockQueue<Task> 对象作为共享队列BlockQueue<Task> *bq = new BlockQueue<Task>();// 声明线程ID变量pthread_t c_tid, p_tid;// 创建消费者线程pthread_create(&c_tid, nullptr, Consumer, bq);// 创建生产者线程pthread_create(&p_tid, nullptr, Productor, bq);// 等待线程结束(实际上是无限等待)pthread_join(c_tid, nullptr);pthread_join(p_tid, nullptr);// 清理资源(实际上不会执行到这里)delete bq;return 0;
}

该函数主要用于测试生产者消费者模型,其中生产者消费者所生产与消费的数据为Task任务;

  • 头文件和局部变量

    包含了必要的头文件以及定义的"BlockQueue.hpp""Task.hpp";

    定义了一个string对象为opers包含所有可能的运算符;

  • 消费者函数Consumer

    在接收一个指向BlockQueue<Task>的指针作为参数;

    在一个无限循环中进行以下动作:

    • 从队列中取出一个任务(bq->pop())
    • 执行任务(task())
    • 打印任务执行结果,包括线程id,操作数,运算符,结果以及退出码
  • 生产者函数Productor

    接收一个指向BlockQueue<Task>的指针;

    在一个无限循环中进行以下动作:

    • 每隔0.5susleep(500000)生成一个新任务
    • 随机生成两个09的操作数和一个随机运算符
    • 创建一个新的Task对象并将其推入队列(bq->push(t))
    • 打印生成的任务信息
  • 主函数main

    初始化一个随机数生成器(种一个随机数种子);

    创建一个BlockQueue<Task>对象同时创建一个消费者线程和一个生产者线程并向其传入BlockQueue指针;

    等待两个线程结束(此处为无限循环,即无限等待);

    最后删除对应的BlockQueue对象;

总体流程为生产者线程持续生成随机算术任务并将其加入共享队列(临界资源区)中,消费者线程从该队列取出任务并执行随后打印执行结果;

其中BlockQueue类负责处理线程同步,确保生产者和消费者能够安全的访问共享队列;

对应的执行结果为:

$ ./blockqueue 
The thread-448 push a task , 6 -  9 = ? 
The thread-152 handled a task ,  6 -  9 =  -3 , exit code : 0
------------------------------------
The thread-448 push a task , 2 /  8 = ? 
The thread-152 handled a task ,  2 /  8 =   0 , exit code : 0
------------------------------------
The thread-448 push a task , 6 +  8 = ? 
The thread-152 handled a task ,  6 +  8 =  14 , exit code : 0
------------------------------------
The thread-448 push a task , 3 %  1 = ? 
The thread-152 handled a task ,  3 %  1 =   0 , exit code : 0
------------------------------------
The thread-448 push a task , 5 +  9 = ? 
The thread-152 handled a task ,  5 +  9 =  14 , exit code : 0
------------------------------------
The thread-448 push a task , 7 /  8 = ? 
The thread-152 handled a task ,  7 /  8 =   0 , exit code : 0
------------------------------------
...
...
  • 多生产者多消费者

    当前BlockQueue生产者消费者模型支持多生产者多消费者的情况,对应只需将代码修改为:

    int main() {// 初始化随机数生成器srand(time(nullptr));// 创建一个 BlockQueue<Task> 对象作为共享队列BlockQueue<Task> *bq = new BlockQueue<Task>();// 创建消费者线程pthread_t c_tids[3], p_tids[3];for (int i = 0; i < 3; ++i) {pthread_create(c_tids + i, nullptr, Consumer, bq);}// 创建生产者线程for (int i = 0; i < 3; ++i) {pthread_create(p_tids + i, nullptr, Productor, bq);}// 等待线程结束(实际上是无限等待)for (int i = 0; i < 3; ++i) pthread_join(c_tids[i], nullptr);for (int i = 0; i < 3; ++i) pthread_join(p_tids[i], nullptr);// 清理资源(实际上不会执行到这里)delete bq;return 0;
    }
    

    该代码中创建了多个生产者与多个消费者线程,其中使用了c_tids[3]p_tids[3]数组来保存管理线程;

    其余代码可不变;

    对应的运行结果为(删除分隔符-------的打印):

    $ ./blockqueue 
    The thread-392 push a task , 3 +  2 = ? 
    The thread-208 handled a task ,  3 +  2 =   5 , exit code : 0
    The thread- 96 push a task , 8 +  7 = ? 
    The thread-688 push a task , 2 %  3 = ? 
    The thread-800 handled a task ,  8 +  7 =  15 , exit code : 0
    The thread-504 handled a task ,  2 %  3 =   2 , exit code : 0
    The thread-392 push a task , 3 %  3 = ? 
    The thread-504 handled a task ,  3 %  3 =   0 , exit code : 0
    The thread-688 push a task , 4 -  8 = ? 
    The thread-208 handled a task ,  0 -  9 =  -9 , exit code : 0
    The thread- 96 push a task , 0 -  9 = ? 
    The thread-800 handled a task ,  4 -  8 =  -4 , exit code : 0
    The thread-392 push a task , 6 %  4 = ? 
    The thread-208 handled a task ,  6 *  5 =  30 , exit code : 0
    The thread- 96 push a task , 6 %  8 = ? 
    The thread-688 push a task , 6 *  5 = ? 
    The thread-504 handled a task ,  6 %  4 =   2 , exit code : 0
    The thread-800 handled a task ,  6 %  8 =   6 , exit code : 0
    ...
    ...
    

    可在运行前在消费者函数或是生产者函数利用usleep()sleep()控制其生产者与消费者的生产消费速率;

    该模型生产者和消费者的生产消费速率不需严格控制,其对端将自行同步;


参考代码

请添加图片描述

(供参考) CSDN - Dio夹心小面包 / Gitee - 半介莽夫


http://www.ppmy.cn/news/1503557.html

相关文章

传统自然语言处理(NLP)与大规模语言模型(LLM)详解

自然语言处理&#xff08;NLP&#xff09;和大规模语言模型&#xff08;LLM&#xff09;是理解和生成人类语言的两种主要方法。本文将介绍传统NLP和LLM的介绍、运行步骤以及它们之间的比较&#xff0c;帮助新手了解这两个领域的基础知识。 传统自然语言处理&#xff08;NLP&…

JavaScript异步编程的Promise

目录 1.对Promise的了解 &#xff08;1&#xff09;介绍 &#xff08;2&#xff09;Promise 的优缺点 2.Promise的基本用法 &#xff08;1&#xff09;创建Promise对象 &#xff08;2&#xff09;Promise方法then() &#xff08;3&#xff09;Promise方法catch() &…

C语言实现三子棋

通过一段时间的学习&#xff0c;我们已经能够较为熟练地使用分支语句&#xff0c;循环语句&#xff0c;创建函数&#xff0c;创建数组&#xff0c;创建随机数等。之前我们做过一个扫雷游戏&#xff0c;今天让我们再尝试创作一个三子棋游戏吧~ 一、三子棋游戏的思路 三子棋的游…

语音交互、AI问答,等你来体验!

功能背景 在实际大屏应用中&#xff0c;用户向大屏直接下达语音指令显的越来越便捷&#xff0c;其中体现的交互感也比通过动作指令来的更加强烈&#xff0c;给用户带来更高效的服务体验。目前EasyV平台开发的自定义事件交互已经很完善&#xff0c;组件之间可以进行触发联动。 …

Spring Boot集成udp通讯

Spring Boot集成udp通讯 加入依赖编辑配置文件配置相关属性具体业务类客户端调试 加入依赖 <!--加入UDP通信所需依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId&…

Spring Security 概述,鸟瞰 Spring Security 及其功能

在本文中&#xff0c;我们将从鸟瞰的角度了解 Spring Security 的用途以及它能为我们提供什么。网络上的任何东西都可能是攻击的潜在受害者。不幸的是&#xff0c;在这个即使是最富有、最具创新性的技术公司也会受到黑客攻击的世界里&#xff0c;保护 Web 应用程序并实现授权和…

【常用库】【pytorch】基本部件

基本元件 1. 卷积 2. batchnorm loss函数 torch.nn.MSELoss() >>> a torch.rand(3) >>> a tensor([0.2161, 0.2227, 0.9175]) >>> b torch.rand(3) >>> b tensor([0.6976, 0.9149, 0.4918]) >>> mse torch.nn.MSELOSS() &…

APACHE安装与应用

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:Linux运维老纪的首页…