Linux线程(四) 生产者消费者模型

news/2024/9/19 23:17:25/ 标签: 开发语言, linux, 运维, 服务器

目录

一、什么是生产者消费者模型

基本概念

优点以及应用场景

二、 基于阻塞队列的生产者消费者模型

三、POSIX信号量

四、基于环形队列的生产消费模型


一、什么是生产者消费者模型

        Linux下的生产者消费者模型是一种经典的多线程或多进程编程设计模式,它用于解决资源访问的同步问题,特别是在涉及任务分配、数据处理和资源共享的场景中。

基本概念

生产者:负责生成数据项并将其放入共享的缓冲区(队列)。当缓冲区满时,生产者可能需要等待(阻塞)直到有空间可用。

消费者:从缓冲区中取出数据项进行处理。如果缓冲区为空,消费者可能需要等待(阻塞)直到有新数据产生。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

优点以及应用场景

生产者消费者模型作为一种经典的并发设计模式,在软件开发中特别是涉及多线程或多进程协作的场景下,展现出诸多优势。

解耦:生产者和消费者之间通过共享缓冲区(如队列)进行间接通信,减少了直接的依赖关系,使得生产者和消费者的代码可以独立开发和维护,提高了模块的复用性和系统的灵活性。

支持并发:生产者和消费者通常作为独立的执行单元运行,可以并行工作,充分利用多核处理器的计算能力,提升系统整体的吞吐量和响应速度。

平衡资源利用:通过调整缓冲区的大小和管理生产者与消费者的数量,可以有效平衡生产速率和消费速率,防止生产过剩导致资源浪费或者消费过快导致资源饥饿,从而优化系统性能。

应用场景:

生产者消费者模型广泛应用于各种领域,如网络通信中的数据包处理、数据库的异步写入、GUI应用中的事件处理系统、多线程下载和处理等,任何需要解耦数据生产与数据消费过程的场景都可以考虑使用这一模式。 

二、 基于阻塞队列的生产者消费者模型

        在这个模型中,阻塞队列扮演了生产者和消费者之间的中介角色,它负责存储生产者产生的数据,并安全地传递给消费者处理。关键在于,阻塞队列能够自动管理同步问题,确保线程安全,同时提供阻塞机制来平衡生产与消费的速度。

阻塞队列属于仓库这一临界资源,而同一时刻只能有一个线程进入阻塞队列进行操作,所以要用到互斥锁,同时还要思考如果是消费者该如何知道有东西可以买了呢,如果是生产者如何知道仓库的东西不够了需要生产呢,这个时候就需要两个条件变量push_cond和pop_cond

关于条件变量在上篇文章中讲过,可以参考:

Linux线程(三)死锁与线程同步

push_cond

当生产者将阻塞队列放满时,就需要等待消费者消费完来唤醒生产者继续生产。

pop_cond

当消费者把队列消费空时,消费者会等待生产者往阻塞队列加资源后来唤醒消费者继续消费。 

接下来我们来实现一个基于阻塞队列的生产者消费者模型

访问阻塞队列一定会涉及到加锁,我们首先可以设计一个LockGuard(RAII)思想,利用类出作用域自动销毁来实现解锁,防止忘记解锁造成死锁。

LockGuard.hpp

#pragma once
#include <pthread.h>class Mutex
{
private:pthread_mutex_t* _mutex;public:Mutex(pthread_mutex_t* lock):_mutex(lock){}void Lock(){pthread_mutex_lock(_mutex);}void Unlock(){pthread_mutex_unlock(_mutex);}~Mutex(){}
};class LockGuard
{
private:Mutex _mutex;
public:LockGuard(pthread_mutex_t *lock):_mutex(lock){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}
};

随后我们来实现一个任务类,模仿消费者拿到资源:

Task.hpp:

#pragma once
#include <iostream>
#include <string>const int defaultvalue = 0;enum
{ok = 0,div_zero,mod_zero,unknow
};const std::string opers = "+-*/%)(&";class Task
{
public:Task(){}Task(int x, int y, char op): data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok){}void Run(){switch (oper){case '+':result = data_x + data_y;break;case '-':result = data_x - data_y;break;case '*':result = data_x * data_y;break;case '/':{if (data_y == 0)code = div_zero;elseresult = data_x / data_y;}break;case '%':{if (data_y == 0)code = mod_zero;elseresult = data_x % data_y;}break;default:code = unknow;break;}}void operator()(){Run();}std::string PrintTask(){std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=?";return s;}std::string PrintResult(){std::string s;s = std::to_string(data_x);s += oper;s += std::to_string(data_y);s += "=";s += std::to_string(result);s += " [";s += std::to_string(code);s += "]";return s;}~Task(){}private:int data_x;int data_y;char oper; // + - * / %int result;int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4
}; 

随后我们来实现一个阻塞队列,要注意一个时刻只能有一个线程访问,所以再push操作和pop操作时要加锁。

block_queue.hpp:

#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
#include"LockGuard.hpp"const int defaultcap=5;//默认容量为5
template<class T>
class block_queue
{
private:std::queue<T> _q;int _capacity;   //_q.size() == _capacity, 满了,不能在生产,_q.size() == 0, 空,不能消费了pthread_mutex_t _mutex;pthread_cond_t _push_cond;  //给生产者pthread_cond_t _pop_cond;  //给消费者// int _consumer_water_line;  // _consumer_water_line = _capacity / 3 * 2// int _productor_water_line; // _productor_water_line = _capacity / 3
public:block_queue(int cap=defaultcap):_capacity(cap){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_push_cond,nullptr);pthread_cond_init(&_pop_cond,nullptr);}bool IsFull(){return _q.size() == _capacity;}bool IsEmpty(){return _q.size() == 0;}void Push(const T &in){LockGuard lockguard(&_mutex);while(IsFull())pthread_cond_wait(&_push_cond,&_mutex);_q.push(in);//通知消费者可以消费了// if(_q.size() > _productor_water_line) pthread_cond_signal(&_c_cond);  // 也可以是当资源数量大于指定阈值时再通知pthread_cond_signal(&_pop_cond);}void Pop(T *out)//要取出任务{LockGuard lockgugrd(&_mutex);while(IsEmpty())pthread_cond_wait(&_pop_cond,&_mutex);*out=_q.front();_q.pop();//通知生产者可以生产了// if(_q.size() < _consumer_water_line) pthread_cond_signal(&_p_cond);      //也可以是当资源数量大于指定阈值时再通知pthread_cond_signal(&_push_cond);}~block_queue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_push_cond);pthread_cond_destroy(&_pop_cond);}};

makefile文件,当然也可手动生成可执行文件,使用这个较为方便

test_block:Main.ccg++ -o  $@ $^ -lpthread -std=c++11.PHONY:clean
clean:rm -f test_block

使用Main.cc来测试这个模型:

#include"block_queue.hpp"
#include"Task.hpp"
#include<pthread.h>
#include<ctime>
#include<sys/types.h>
#include<unistd.h>void *consumer(void *args)
{block_queue<Task> *bq=static_cast<block_queue<Task>* >(args);while(true){sleep(1);Task t;//取出任务bq->Pop(&t);t();//运行任务std::cout<<"consumer data: "<<t.PrintResult()<<std::endl;}return nullptr;
}void *producror(void *args)
{block_queue<Task> *bq=static_cast<block_queue<Task>*>(args);while(true){sleep(1);int x=rand()%10;usleep(rand()%123);int y=rand()%10;usleep(rand()%1234);char oper=opers[rand()%(opers.size())];Task t(x,y,oper);std::cout<<"productor data: "<<t.PrintTask()<<std::endl;bq->Push(t);}return nullptr;
}int main()
{srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self()); // 只是为了形成更随机的数据block_queue<Task> *bq=new block_queue<Task>();pthread_t c,p;pthread_create(&c,nullptr,consumer,bq);pthread_create(&p,nullptr,producror,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}

运行结果如下

可以看到生产者每生产一个,消费者就拿到一个。

如果让生产者不休眠

可以看到消费者将阻塞队列填满,消费者取队首元素来执行。

总体流程就是:

生产过程:生产者创建数据项,并尝试将数据放入阻塞队列。如果队列已达到其容量限制,生产者的push()操作将被阻塞,直到队列中有空间可以添加新数据。

消费过程:消费者从阻塞队列中取出数据项进行处理。当队列为空时,消费者的pop()操作也会被阻塞,直到有新的数据被生产者放入队列。

通知与唤醒:一旦队列状态发生变化(例如有数据被放入或移出),阻塞队列会自动唤醒相应等待的线程,实现高效且线程安全的同步。 

三、POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

在传统的基于阻塞队列的生产者消费者模型中,虽然使用阻塞队列自身可以避免一些复杂的同步问题,但确实存在这样一个情况:当生产者试图向满队列添加数据或消费者试图从空队列中取数据时,它们都需要对整个队列进行加锁,这实际上导致了生产者和消费者之间不必要的锁竞争。

比如生产者消费者模型,生产者只需要关注空间是否足够生产,消费者只需要关注资源是否足够消费,所以开始的时候生产者的信号量就是队列的大小,消费者的信号量就是0,当生成者生产一个资源,生产者信号量-1,消费者+1;当消费者消费一个资源, 生产者信号量+1,消费者-1。

使用POSIX信号量确实可以进一步优化这一模型,使得生产者与生产者之间、消费者与消费者之间存在锁竞争,而生产者和消费者之间不存在直接的锁竞争。这是因为信号量可以用来精确控制对资源的访问权限,而不仅仅是简单地锁定整个资源。

信号量的本质是一个计数器。

这个计数器用于跟踪某个资源(如共享内存区域、打印机等)的可用单位数。信号量机制通过这个计数器来控制多个进程或线程对共享资源的访问,确保资源的合理分配和同步。通过这个计数器的增加和减少,信号量不仅能够控制访问权限,还能协调进程间的同步,是解决并发控制问题的一种有效工具。  

 初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量(P操作)
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量(V操作)
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
上一个生产者 - 消费者的例子是基于阻塞队列 , 其空间可以动态分配 , 现在基于固定大小的环形队列重写这个程序

四、基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性
生产者只需要关注空间 spaceSem 是否足够生产,消费者只需要关注资源 dataSem 是否足够消费,所以开始的时候生产者的信号量就是队列的大小,消费者的信号量就是0,当生成者生产一个资源,生产者信号量-1,消费者+1;当消费者消费一个资源, 生产者信号量+1,消费者-1。

如图所示

代码示例,基于环形队列的生产消费模型其中的资源依旧使用Task来模拟

ringqueue.hpp

#pragma once#include <iostream>
#include <vector>
#include <semaphore.h>
#include "LockGuard.hpp"// 定义默认队列大小
const int defaultSize = 5;// 泛型环形队列类模板
template <typename T>
class RingQueue
{
private:// 信号量P操作,减少信号量计数,若计数<0则阻塞当前线程void P(sem_t &sem){sem_wait(&sem);}// 信号量V操作,增加信号量计数,若唤醒等待的线程void V(sem_t &sem){sem_post(&sem);}public:// 构造函数,初始化环形队列RingQueue(int size = defaultSize): _ringQueue(size), _size(size), _prodStep(0), _consStep(0){// 初始化空间信号量,初始值为队列大小,表示初始时所有空间都是空闲的sem_init(&_spaceSem, 0, size);// 初始化数据信号量,初始值为0,表示队列初始无数据sem_init(&_dataSem, 0, 0);// 初始化生产者和消费者的互斥锁,保护各自的操作步骤pthread_mutex_init(&_prodMutex, nullptr);pthread_mutex_init(&_consMutex, nullptr);}// 向队列添加元素void Push(const T &item){// 1. 减少空间信号量,尝试获取生产空间,若无空间则阻塞P(_spaceSem);{// 2. 加生产者锁,确保生产操作的原子性LockGuard lockGuard(&_prodMutex);// 执行实际的入队操作_ringQueue[_prodStep] = item;_prodStep++;        // 移动生产指针_prodStep %= _size; // 环状处理边界}V(_dataSem);}// 从队列移除元素void Pop(T *outItem){// 1. 减少数据信号量,尝试获取数据,若无数据则阻塞P(_dataSem);{// 2. 加消费者锁,确保消费操作的原子性LockGuard lockGuard(&_consMutex);// 执行实际的出队操作*outItem = _ringQueue[_consStep];_consStep++;        // 移动消费指针_consStep %= _size; // 环状处理边界}//消费者V操作时不冲突,可以解锁 信号量的P操作(wait/减)和V操作(signal/增)都是原子操作。V(_spaceSem);}// 析构函数,释放资源~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);pthread_mutex_destroy(&_prodMutex);pthread_mutex_destroy(&_consMutex);}private:// 环形队列底层使用std::vector存储std::vector<T> _ringQueue;int _size; // 队列大小// 生产者和消费者的步进索引int _prodStep;int _consStep;// 信号量,管理空间和数据的可用性sem_t _spaceSem;sem_t _dataSem;// 互斥锁,分别保护生产者和消费者的步骤更新pthread_mutex_t _prodMutex;pthread_mutex_t _consMutex;
};

Main.cc

#include"ringqueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <pthread.h>
#include <ctime>void *Productor(void *args)
{// sleep(5);RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){// 数据怎么来的?// 1. 有数据,从具体场景中来,从网络中拿数据// 生产前,你的任务从哪里来的呢???int data1 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODOusleep(rand() % 123);int data2 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODOusleep(rand() % 123);char oper = opers[rand() % (opers.size())];Task t(data1, data2, oper);std::cout << "productor task: " << t.PrintTask() << std::endl;// rq->push();rq->Push(t);sleep(1);}
}void *Consumer(void *args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){// sleep(1);Task t;rq->Pop(&t);t();std::cout << "consumer done, data is : " << t.PrintResult() << std::endl;}
}int main()
{srand((uint64_t)time(nullptr) ^ pthread_self());pthread_t c[3], p[2];// 唤醒队列中只能放置整形???// RingQueue<int> *rq = new RingQueue<int>();RingQueue<Task> *rq = new RingQueue<Task>();pthread_create(&p[0], nullptr, Productor, rq);pthread_create(&p[1], nullptr, Productor, rq);pthread_create(&c[0], nullptr, Consumer, rq);pthread_create(&c[1], nullptr, Consumer, rq);pthread_create(&c[2], nullptr, Consumer, rq);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(c[2], nullptr);return 0;
}

运行如图所示 

环形队列中的生产者和消费者通过同步与互斥机制维持着一种动态平衡,确保数据的连续生产和消费,体现了典型的生产者-消费者问题的解决方案。 


http://www.ppmy.cn/news/1461510.html

相关文章

网站localhost和127.0.0.1可以访问,本地ip不可访问解决方案

部署了一个网站, 使用localhost和127.0.0.1加端口号可以访问, 但是使用本机的ip地址加端口号却不行. 原因可能有多种. 可能的原因: 1 首先要确认是否localhost对应的端口是通的(直接网址访问), 以及你无法访问的那个本机ip是否正确(使用ping测试)&#xff1b; 2 检查本机的防火…

FPGA - Xilinx系列高速收发器---GTX

1&#xff0c;GTX是什么&#xff1f; GT &#xff1a;Gigabit Transceiver千兆比特收发器&#xff1b; GTX &#xff1a;Xilinx 7系列FPGA的高速串行收发器&#xff0c;硬核 xilinx的7系列FPGA根据不同的器件类型&#xff0c;集成了GTP、GTX、GTH、GTZ四种串行高速收发器&am…

短视频人设定位有哪些:四川京之华锦信息技术公司

短视频人设定位有哪些&#xff1a;打造独特魅力的关键 随着短视频平台的兴起&#xff0c;越来越多的内容创作者开始涌现&#xff0c;他们凭借各自独特的魅力在网络世界中崭露头角。而在这其中&#xff0c;一个成功的短视频账号背后&#xff0c;往往有一个清晰、鲜明的人设定位…

【新手入门】Github与Git使用教程

Github与Git 一、Github基础教程 1.1 基本操作 点击代码文件可以直接查看文件的内容&#xff0c;支持在线修改文件&#xff0c;只需要点击(文件内容)右上角的编辑按钮即可进行编辑。 README.md一般介绍项目的功能&#xff0c;用法&#xff0c;注意事项&#xff1b;有时还有…

Self-attention自注意力机制

Self-attention 和 CNN&#xff08;卷积神经网络&#xff09;是深度学习中常用的两种架构&#xff0c;用于不同类型的任务。下面是它们之间的比较&#xff1a; Self-Attention&#xff08;自注意力机制&#xff09; 适用范围: Self-attention 主要用于处理序列数据&#xff0c…

力扣HOT100 - 45. 跳跃游戏 II

解题思路&#xff1a; 贪心 class Solution {public int jump(int[] nums) {int end 0;int maxPosition 0;int steps 0;for (int i 0; i < nums.length - 1; i) {maxPosition Math.max(maxPosition, i nums[i]);if (i end) {end maxPosition;steps;}}return steps;…

VSCode:隐藏工程中的文件和目录

VSCode&#xff1a;设置搜索时的排除目录_vscode全局搜索排除掉某些目录-CSDN博客 介绍了如何排除搜索目录 有时也需要隐藏工程中不必关注的文件和目录。 假设工程中的文件结构如下 $ tree . ├── doc │ └── readme.txt ├── m.cpp └── user_guide 可以通过如下方…

vueday1

1.作用&#xff1a;利用表达式进行插值&#xff0c;渲染到页面中 三元表达式、点语法、数组对应项&#xff0c;方法、点语法方法 1.使用的时候需要存在&#xff0c;需要在data里面声明&#xff0c;如果没有声明会报错 2.ifelse不能用 3.不能在标签属性中使用{{}}插值 <!DO…

前端CSS3基础1(新增长度单位,盒子模型,背景,边框,文本属性,渐变,字体,2D变换,3D变换)

前端CSS3基础1&#xff08;新增长度单位&#xff0c;盒子模型&#xff0c;背景&#xff0c;边框&#xff0c;文本属性&#xff0c;渐变&#xff0c;字体&#xff0c;2D变换&#xff0c;3D变换&#xff09; CSS3 新增长度单位CSS3 新增盒子模型相关属性box-sizing怪异盒模型box-…

golang中的类和接口

类 在 Go 语言中并没有类的概念&#xff0c;而是使用结构体来实现面向对象的特性。通过 type 关键字可以定义自定义类型&#xff0c;包括结构体类型。下面是一个简单的示例&#xff1a; package mainimport "fmt"// 定义一个结构体类型 type Person struct {Name s…

如何基于可靠事件模式实现最终一致性?

今天我们一起来探讨一个分布式环境下的常见问题,这个问题与数据的一致性有关。那么,什么是数据一致性呢?要回答这个问题,需要我们回顾一下单块系统和分布式系统中对于数据处理的不同需求。 我们知道,传统的单块系统通常都只与一个数据库进行交互,所有的数据处理过程都位于…

算法-卡尔曼滤波之卡尔曼滤波的第二个方程:预测方程(状态外推方程)

在上一节中&#xff0c;使用了静态模型&#xff0c;我们推导出了卡尔曼滤波的状态更新方程&#xff0c;但是在实际情况下&#xff0c;系统都是动态&#xff0c;预测阶段&#xff0c;前后时刻的状态是改变的&#xff0c;此时我们引入预测方程&#xff0c;也叫状态外推方程&#…

ICode国际青少年编程竞赛- Python-5级训练场-函数练习2

ICode国际青少年编程竞赛- Python-5级训练场-函数练习2 1、 def get_item(a):Spaceship.step(1)Dev.step(a)Dev.turnLeft()Dev.step(1)Spaceship.step(1)Dev.turnRight()Dev.step(-a)Spaceship.step(1) get_item(3) get_item(2) get_item(3) get_item(1) get_item(5)2、 de…

鸿蒙内核源码分析(VFS篇) | 文件系统和谐共处的基础

基本概念 | 官方定义 VFS&#xff08;Virtual File System&#xff09;是文件系统的虚拟层&#xff0c;它不是一个实际的文件系统&#xff0c;而是一个异构文件系统之上的软件粘合层&#xff0c;为用户提供统一的类Unix文件操作接口。由于不同类型的文件系统接口不统一&#x…

HTML常用标签-表单标签

表单标签 1 表单标签2 表单项标签2.1 单行文本框2.2 密码框2.3 单选框2.4 复选框2.5 下拉框2.6 按钮2.7 隐藏域2.8 多行文本框2.9 文件标签 1 表单标签 表单标签,可以实现让用户在界面上输入各种信息并提交的一种标签. 是向服务端发送数据主要的方式之一 form标签,表单标签,其内…

MySQL用SQL取三列中最大的数据值

1、有如下数据&#xff1a; ABC000097.0600330.72330.720069.650027.8827.85086.92086.92219.42219.4219.41 需要展示为如下形式&#xff1a; ABC结果列0000097.06097.060330.72330.72330.7200669.65009.6527.8827.85027.8886.92086.9286.92219.42219.4219.41219.42 解决办…

git仓库使用

git仓库是会限制空间大小限制的 git网络库的容量限制_github仓库大小限制-CSDN博客 git是用于管理github的工具 电脑左下角搜索git打开GitBash.exe 进入到要下载到本地的目录 下载到本地的文件不要更改&#xff01; 如果要使用请务必把文件复制到别的空间去再在这个别的空间…

Linux基础知识面试题

1. 请描述Linux操作系统的安装过程&#xff0c;并说明其中的关键步骤。 Linux操作系统的安装过程通常涉及以下几个关键步骤&#xff1a; 准备安装介质&#xff1a;需要从官网或者其他可靠来源下载Linux发行版的ISO镜像文件&#xff0c;并制作一个启动U盘或者烧录到DVD中。现在…

C++自定义脚本文件执行

FunctionCall.h&#xff1a; #include <sstream> #include <string> #include <vector> // 函数调用 class FunctionCall { public: FunctionCall(); ~FunctionCall(); std::string call(const st…

Kotlin协程中调度器Dispatchers的介绍

关于Kotlin中协程使用的调度器&#xff08;Dispatchers&#xff09;的介绍 viewLifecycleOwner.lifecycleScope.launch(Dispatchers.IO) {// todo }在 Kotlin 的协程中&#xff0c;Dispatchers 是用于指定协程运行的调度器&#xff08;dispatcher&#xff09;&#xff0c;它决…