【Linux】生产者消费者模型

news/2024/11/20 4:39:55/

生产者消费者模型

  • 什么是生产者消费者模型
  • 生产者消费者模型的优点
  • c++模拟实现生产者消费者模型
  • 信号量
  • 基于信号量重新对生产消费模型进行设计
  • 基于环形队列的生产消费模型

什么是生产者消费者模型

生产消费模型可以总结为:123规则

123规则:
1:一个线程安全的队列 (具有互斥+同步)
2:两种角色的线程 (生成者线程+消费者线程)
3:三种关系:生产者之间互斥,消费者之间互斥,生产者和消费者之间互斥+同步

生产者消费者模型的优点

忙闲不均:如果消费者只是每隔一秒从队列中取出数据,而生产者没有限制,随时高速往队列中放数据
生产者和消费者解耦
支持高并发和高可用

c++模拟实现生产者消费者模型

先创建一个线程安全的队列

#pragma once
#include<queue>
#include<pthread.h>#define CAPACITY 1class SafeQueue
{
public:SafeQueue(){pthread_mutex_init(&lock_, NULL);pthread_cond_init(&cons_cond_, NULL);pthread_cond_init(&prod_cond_, NULL);capacity_ = CAPACITY;}~SafeQueue(){pthread_mutex_destroy(&lock_);pthread_cond_destroy(&cons_cond_);pthread_cond_destroy(&prod_cond_);}void push(int data){pthread_mutex_lock(&lock_);while(que_.size() >= capacity_){pthread_cond_wait(&prod_cond_, &lock_);}que_.push(data);pthread_mutex_unlock(&lock_);pthread_cond_signal(&cons_cond_);}void pop(int *data){pthread_mutex_lock(&lock_);while(que_.empty()){pthread_cond_wait(&cons_cond_, &lock_);}*data = que_.front();que_.pop();pthread_mutex_unlock(&lock_);pthread_cond_signal(&prod_cond_);}private:std::queue<int> que_;pthread_mutex_t lock_;pthread_cond_t cons_cond_;pthread_cond_t prod_cond_;size_t capacity_;
};

再创建一个生产消费者模型

#pragma once
#include<stdio.h>
#include<unistd.h>
//#include<pthread.h>
#include<iostream>
#include"safe_queue.hpp"class ConsAndProd
{
public:ConsAndProd(int tc = 1){sq_ = NULL;thread_count_ = tc;data_ = 0;pthread_mutex_init(&lock_d, NULL);}~ConsAndProd(){if(sq_){delete sq_;}pthread_mutex_destroy(&lock_d);}int OnInit(){sq_ = new SafeQueue();if(sq_ = NULL){return -1;}return 0;}int start(){for(int i = 0; i < thread_count_; ++i){pthread_t tid;int res = pthread_create(&tid, NULL, ConsStart, (void*)this);if(res < 0){return -1;}res = pthread_create(&tid, NULL, ProdStart, (void*)this);if(res < 0){return -1;}}}static void* ConsStart(void* arg){pthread_detach(pthread_self());ConsAndProd* cap = (ConsAndProd*)arg;while(1){int data;cap->sq_->pop(&data);printf("i am cons_thread%p, i cons%d\n", pthread_self(), data);}return NULL;}static void* ProdStart(void* arg){pthread_detach(pthread_self());ConsAndProd* cap = (ConsAndProd*)arg;while(1){pthread_mutex_lock(&cap->lock_d);cap->sq_->push(cap->data_++);pthread_mutex_unlock(&cap->lock_d);sleep(1);}return NULL;}private:SafeQueue* sq_;int data_;pthread_mutex_t lock_d;int thread_count_;
};

最后创建一个main入口函数

#include"cons2prod.hpp"int main()
{ConsAndProd* cap = new ConsAndProd(2);if(cap->OnInit() < 0){std::cout << "init safe_queue failed\n";return 0;}if(cap->start() < 0){std::cout << "Cons_prod create failed\n";return 0;}while(1){sleep(1);}delete cap;return 0;
}

信号量

信号量的本质是资源计数器+PCB等待队列,与条件变量相比,只是多了一个资源计数器。有了资源计数器后,达到无冲突的访问共享资源目的。

信号量:既能使用信号量进行互斥,又能使用信号量进行同步。

在这里插入图片描述

信号量的初始化

#include<semaphore.h>
int sem_init(sem_t *sem, init pshared, unsigned int value);

参数:
pshared: 0表示线程间共享,非0表示进程间共享
value:信号量初始值,即有多少个资源

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

int sem_wait(sem_t *sem);
//注意:调用等待函数,会进入PCB等待队列,并且信号量的值会减1,
//判断资源计数器的值是否小于0, 是:阻塞等待,将执行流放到PCB等待队列,否:则接口返回

发布信号量

int sem_post(sem_t *sem);
//注意:调用post函数,会将当前资源++,也就是信号量加1,表示有资源被释放,
//判断资源计数器的值是否小于等于0,是:通知PCB等待队列,否:不用通知,因为没有线程在等待

基于信号量重新对生产消费模型进行设计

创建一个线程安全的队列

#pragma once 
#include<queue>
#include<semaphore.h>class safequeue
{
public:safequeue(){sem_init(&_lock, 0, 1);sem_init(&cons_sem, 0, 0);sem_init(&prod_sem, 0, 2);//初始只有一个空闲资源,现在改为2}~safequeue(){}void push(int data){//先保证对于生产者而言,是有资源的,有资源再去抢_lock//实际上就是要先保证同步sem_wait(&prod_sem);//当生产者有资源时,要保证生产者之间是互斥的,所以要抢_lock//实际上就是要保证互斥sem_wait(&_lock);_que.push(data);//当某一个生产者生产完,需要释放_lock,让其他生产者生产sem_post(&_lock);//当生产者生产完,需要通知消费者来消费sem_post(&cons_sem);}void pop(int* data){sem_wait(&cons_sem);sem_wait(&_lock);*data = _que.front();_que.pop();sem_post(&_lock);sem_post(&prod_sem);}
private:std::queue<int> _que;/** 保证多个线程互斥访问,* */sem_t _lock;//消费者信号量,初始化资源为0,后期资源个数由生产者加sem_t cons_sem;//生产者信号量,描述队列的空闲空间,初始化由程序员定,后期资源个数由消费者加sem_t prod_sem;
};

创建两个角色,保证互斥和同步

#include<stdio.h>
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include"safequeue.hpp"#define THREADCOUNT 2
int g_count = 0;void* consstart(void* arg)
{safequeue* sq = (safequeue*)arg;while(1){int data;sq->pop(&data);printf("i cons %d\n", data);}return NULL;
}
void* prodstart(void* arg)
{safequeue* sq = (safequeue*)arg;sem_t lock_count;sem_init(&lock_count, 0, 1);while(1){sq->push(g_count); //由于当前有2个生产者和消费者,对于g_count而言,就不是线程安全的。需要加锁保护sem_wait(&lock_count);g_count++;sem_post(&lock_count);//sleep(1);}sem_destroy(&lock_count);return NULL;
}int main()
{/*创建两种角色的线程*/safequeue* sq = new safequeue();if(sq == NULL){return 0;}pthread_t cons[THREADCOUNT], prod[THREADCOUNT];for(int i = 0; i < THREADCOUNT; ++i){int res = pthread_create(&cons[i], NULL, consstart, (void*)sq);if(res < 0){return 0;}res = pthread_create(&prod[i], NULL, prodstart, (void*)sq);}for(int i = 0; i < THREADCOUNT; ++i){pthread_join(cons[i], NULL);pthread_join(prod[i], NULL);}delete sq;return 0;
}

基于环形队列的生产消费模型

注意:实际上,我们看到的当前操作系统中缓冲区大多数都是用环形队列来设计的。

首先我们看下一个队列是如何工作的:生产者先向队列中push数据,顺序是0—>n,当队列中有数据时,消费者就可以从队列中pop数据。
在这里插入图片描述

因为队列的空间不可能无穷大,所以当消费者一直不消费时,生产者就会将队列填满,那么就如下图所示,生产者指向的是队列的尾部。
在这里插入图片描述

此时,若有消费者从0下标的位置进行消费,就会将0位置的元素pop,生产者就需要从队列的末尾指向队列的头部。

可问题是,怎么样才能让生产者指向队列的头部呢?
我们可以通过模运算来实现。
设置一个生产/消费下标值,生产/消费一次后,下标值++,然后将当前下标值%队列长度。

在这里插入图片描述

使用这样的模运算,就能让下标从队尾指向对头,看起来就像是一个环形一样,这样就实现了环形队列。

如果用上述生产消费者模型来写的话,只需要改变push和pop的部分代码即可

#pragma once 
#include<queue>
#include<semaphore.h>class safequeue
{
public:safequeue(){sem_init(&_lock, 0, 1);sem_init(&cons_sem, 0, 0);sem_init(&prod_sem, 0, 4);//设置初始空闲资源为4//往队列中读和写的初始值为0pos_r = 0;pos_w = 0;}~safequeue(){}void push(int data){//先保证对于生产者而言,是有资源的,有资源再去抢_lock//实际上就是要先保证同步sem_wait(&prod_sem);//当生产者有资源时,要保证生产者之间是互斥的,所以要抢_lock//实际上就是要保证互斥sem_wait(&_lock);//原代码//_que.push(data);//修改为环形队列的代码arr[pos_w] = data;pos_w = (pos_w + 1) % 4;//更新位置//当某一个生产者生产完,需要释放_lock,让其他生产者生产sem_post(&_lock);//当生产者生产完,需要通知消费者来消费sem_post(&cons_sem);}void pop(int* data){sem_wait(&cons_sem);sem_wait(&_lock);//原代码//*data = _que.front();//_que.pop();//替换为环形队列的代码*data = arr[pos_r];pos_r = (pos_r + 1) % 4;//更新位置sem_post(&_lock);sem_post(&prod_sem);}
private://不使用queue的结构//std::queue<int> _que;//为了验证环形队列,使用数组的结构,保证空间大小固定为4int arr[4];/** 保证多个线程互斥访问,* */sem_t _lock;//消费者信号量,初始化资源为0,后期资源个数由生产者加sem_t cons_sem;//生产者信号量,描述队列的空闲空间,初始化由程序员定,后期资源个数由消费者加sem_t prod_sem;//增加了两个往队列中读和写的位置int pos_r;int pos_w;
};

http://www.ppmy.cn/news/66808.html

相关文章

股票K线基础知识1

K线图 K线图是反映价格在某一时间周期内波动情况的图表&#xff0c;它由开盘价、收盘价、最高价、最低价四个要素构成&#xff0c;若当日收盘价高于开盘价&#xff0c;这表明价格处于上涨状态&#xff0c;此时K线图多用红色表示&#xff1b;若当日收盘价低于开盘价&#xff0c…

黎曼积分的概念

黎曼积分的概念 引入 设 f f f是闭区间 [ a , b ] [a,b] [a,b]上的非负连续函数&#xff0c; D D D是坐标系中由直线 x a xa xa&#xff0c; x b xb xb&#xff0c; x x x轴和曲线 y f ( x ) yf(x) yf(x)围成的图形。求 D D D的面积 S S S。 我们可以在 [ a , b ] [a,b] …

Java中单例(单态、原子)设计模式(饿汉式/懒汉式)

目录 友情提醒一、什么是设计模式1&#xff09;设计模式简述2&#xff09;什么是单例设计模式3&#xff09;单例设计模式思路 二、饿汉式1&#xff09;饿汉式单例设计模式的特点2&#xff09;实现一个饿汉式单例设计 三、懒汉式1&#xff09;懒汉式单例设计模式的特点2&#xf…

draw.io二次开发(3)从删删减减开始定制自己的drawio

经过克隆代码、配置IntelliJ/IDEA和Tomcat、以及本地部署&#xff08;详见前几篇&#xff09;之后&#xff0c;终于到了上手改代码的环节了。 首先需要强调的一点是&#xff1a;千万不要去改 *.min.js 文件中的代码&#xff0c;这些文件都是生成的压缩代码&#xff0c;我们一定…

分布式系统概念和设计——时间和全局状态(分布式系统中的时间问题)

分布式系统概念和设计 时间和全局状态 全局物理时间的缺乏使我们很难查明分布式程序的执行时状态。 我们经常需要知道当进程B处在某种状态依赖进程是什么状态&#xff0c;但不能通过依靠物理时钟理解一个同一个时刻到底是什么情况。 维护分布数据一致性算法检查发送给服务器的…

边缘计算盒子适合用于哪些场景?

边缘计算盒子适用于在智慧工地、智慧工厂、智慧园区和智慧城管等场景中可以实现多种算法功能&#xff0c;以下是一些应用和实现算法功能&#xff1a; 智慧工地&#xff1a;实时视频监控与分析&#xff1a;边缘计算盒子可以处理实时监控视频流&#xff0c;进行人员和车辆识别、…

Jenkins入门教程

一、开始使用 Jenkins 本导读将向您介绍使用 Jenkins、Jenkins 的主要特性和 Jenkins Pipeline 的基本知识。 本导读使用“独立”的 Jenkins 发行版&#xff0c;它可以在您自己本地的机器上运行。 准备工作 第一次使用 Jenkins&#xff0c;您需要&#xff1a; 机器要求&…

【C++进阶之路】类和对象(中)

文章目录 前言六大默认成员函数 一.构造函数性质默认构造函数构造函数(需要传参) 二.析构函数性质默认析构函数练习 三.拷贝构造函数基本性质&#xff1a;形参必须是引用默认拷贝构造浅拷贝深拷贝自定义类型 四.赋值运算符重载函数基本特征全局的运算符重载函数局部的运算符重载…