生产者消费者模型
- 什么是生产者消费者模型
- 生产者消费者模型的优点
- c++模拟实现生产者消费者模型
- 信号量
- 基于信号量重新对生产消费模型进行设计
- 基于环形队列的生产消费模型
什么是生产者消费者模型
生产消费模型可以总结为:123规则
123规则:
1:一个线程安全的队列 (具有互斥+同步)
2:两种角色的线程 (生成者线程+消费者线程)
3:三种关系:生产者之间互斥,消费者之间互斥,生产者和消费者之间互斥+同步
生产者消费者模型的优点
忙闲不均:如果消费者只是每隔一秒从队列中取出数据,而生产者没有限制,随时高速往队列中放数据
生产者和消费者解耦
支持高并发和高可用
c++模拟实现生产者消费者模型
先创建一个线程安全的队列
#pragma once
#include<queue>
#include<pthread.h>#define CAPACITY 1class SafeQueue
{
public:SafeQueue(){pthread_mutex_init(&lock_, NULL);pthread_cond_init(&cons_cond_, NULL);pthread_cond_init(&prod_cond_, NULL);capacity_ = CAPACITY;}~SafeQueue(){pthread_mutex_destroy(&lock_);pthread_cond_destroy(&cons_cond_);pthread_cond_destroy(&prod_cond_);}void push(int data){pthread_mutex_lock(&lock_);while(que_.size() >= capacity_){pthread_cond_wait(&prod_cond_, &lock_);}que_.push(data);pthread_mutex_unlock(&lock_);pthread_cond_signal(&cons_cond_);}void pop(int *data){pthread_mutex_lock(&lock_);while(que_.empty()){pthread_cond_wait(&cons_cond_, &lock_);}*data = que_.front();que_.pop();pthread_mutex_unlock(&lock_);pthread_cond_signal(&prod_cond_);}private:std::queue<int> que_;pthread_mutex_t lock_;pthread_cond_t cons_cond_;pthread_cond_t prod_cond_;size_t capacity_;
};
再创建一个生产消费者模型
#pragma once
#include<stdio.h>
#include<unistd.h>
//#include<pthread.h>
#include<iostream>
#include"safe_queue.hpp"class ConsAndProd
{
public:ConsAndProd(int tc = 1){sq_ = NULL;thread_count_ = tc;data_ = 0;pthread_mutex_init(&lock_d, NULL);}~ConsAndProd(){if(sq_){delete sq_;}pthread_mutex_destroy(&lock_d);}int OnInit(){sq_ = new SafeQueue();if(sq_ = NULL){return -1;}return 0;}int start(){for(int i = 0; i < thread_count_; ++i){pthread_t tid;int res = pthread_create(&tid, NULL, ConsStart, (void*)this);if(res < 0){return -1;}res = pthread_create(&tid, NULL, ProdStart, (void*)this);if(res < 0){return -1;}}}static void* ConsStart(void* arg){pthread_detach(pthread_self());ConsAndProd* cap = (ConsAndProd*)arg;while(1){int data;cap->sq_->pop(&data);printf("i am cons_thread%p, i cons%d\n", pthread_self(), data);}return NULL;}static void* ProdStart(void* arg){pthread_detach(pthread_self());ConsAndProd* cap = (ConsAndProd*)arg;while(1){pthread_mutex_lock(&cap->lock_d);cap->sq_->push(cap->data_++);pthread_mutex_unlock(&cap->lock_d);sleep(1);}return NULL;}private:SafeQueue* sq_;int data_;pthread_mutex_t lock_d;int thread_count_;
};
最后创建一个main入口函数
#include"cons2prod.hpp"int main()
{ConsAndProd* cap = new ConsAndProd(2);if(cap->OnInit() < 0){std::cout << "init safe_queue failed\n";return 0;}if(cap->start() < 0){std::cout << "Cons_prod create failed\n";return 0;}while(1){sleep(1);}delete cap;return 0;
}
信号量
信号量的本质是资源计数器+PCB等待队列,与条件变量相比,只是多了一个资源计数器。有了资源计数器后,达到无冲突的访问共享资源目的。
信号量:既能使用信号量进行互斥,又能使用信号量进行同步。
信号量的初始化
#include<semaphore.h>
int sem_init(sem_t *sem, init pshared, unsigned int value);
参数:
pshared: 0表示线程间共享,非0表示进程间共享
value:信号量初始值,即有多少个资源
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
int sem_wait(sem_t *sem);
//注意:调用等待函数,会进入PCB等待队列,并且信号量的值会减1,
//判断资源计数器的值是否小于0, 是:阻塞等待,将执行流放到PCB等待队列,否:则接口返回
发布信号量
int sem_post(sem_t *sem);
//注意:调用post函数,会将当前资源++,也就是信号量加1,表示有资源被释放,
//判断资源计数器的值是否小于等于0,是:通知PCB等待队列,否:不用通知,因为没有线程在等待
基于信号量重新对生产消费模型进行设计
创建一个线程安全的队列
#pragma once
#include<queue>
#include<semaphore.h>class safequeue
{
public:safequeue(){sem_init(&_lock, 0, 1);sem_init(&cons_sem, 0, 0);sem_init(&prod_sem, 0, 2);//初始只有一个空闲资源,现在改为2}~safequeue(){}void push(int data){//先保证对于生产者而言,是有资源的,有资源再去抢_lock//实际上就是要先保证同步sem_wait(&prod_sem);//当生产者有资源时,要保证生产者之间是互斥的,所以要抢_lock//实际上就是要保证互斥sem_wait(&_lock);_que.push(data);//当某一个生产者生产完,需要释放_lock,让其他生产者生产sem_post(&_lock);//当生产者生产完,需要通知消费者来消费sem_post(&cons_sem);}void pop(int* data){sem_wait(&cons_sem);sem_wait(&_lock);*data = _que.front();_que.pop();sem_post(&_lock);sem_post(&prod_sem);}
private:std::queue<int> _que;/** 保证多个线程互斥访问,* */sem_t _lock;//消费者信号量,初始化资源为0,后期资源个数由生产者加sem_t cons_sem;//生产者信号量,描述队列的空闲空间,初始化由程序员定,后期资源个数由消费者加sem_t prod_sem;
};
创建两个角色,保证互斥和同步
#include<stdio.h>
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include"safequeue.hpp"#define THREADCOUNT 2
int g_count = 0;void* consstart(void* arg)
{safequeue* sq = (safequeue*)arg;while(1){int data;sq->pop(&data);printf("i cons %d\n", data);}return NULL;
}
void* prodstart(void* arg)
{safequeue* sq = (safequeue*)arg;sem_t lock_count;sem_init(&lock_count, 0, 1);while(1){sq->push(g_count); //由于当前有2个生产者和消费者,对于g_count而言,就不是线程安全的。需要加锁保护sem_wait(&lock_count);g_count++;sem_post(&lock_count);//sleep(1);}sem_destroy(&lock_count);return NULL;
}int main()
{/*创建两种角色的线程*/safequeue* sq = new safequeue();if(sq == NULL){return 0;}pthread_t cons[THREADCOUNT], prod[THREADCOUNT];for(int i = 0; i < THREADCOUNT; ++i){int res = pthread_create(&cons[i], NULL, consstart, (void*)sq);if(res < 0){return 0;}res = pthread_create(&prod[i], NULL, prodstart, (void*)sq);}for(int i = 0; i < THREADCOUNT; ++i){pthread_join(cons[i], NULL);pthread_join(prod[i], NULL);}delete sq;return 0;
}
基于环形队列的生产消费模型
注意:实际上,我们看到的当前操作系统中缓冲区大多数都是用环形队列来设计的。
首先我们看下一个队列是如何工作的:生产者先向队列中push数据,顺序是0—>n,当队列中有数据时,消费者就可以从队列中pop数据。
因为队列的空间不可能无穷大,所以当消费者一直不消费时,生产者就会将队列填满,那么就如下图所示,生产者指向的是队列的尾部。
此时,若有消费者从0下标的位置进行消费,就会将0位置的元素pop,生产者就需要从队列的末尾指向队列的头部。
可问题是,怎么样才能让生产者指向队列的头部呢?
我们可以通过模运算来实现。
设置一个生产/消费下标值,生产/消费一次后,下标值++,然后将当前下标值%队列长度。
使用这样的模运算,就能让下标从队尾指向对头,看起来就像是一个环形一样,这样就实现了环形队列。
如果用上述生产消费者模型来写的话,只需要改变push和pop的部分代码即可
#pragma once
#include<queue>
#include<semaphore.h>class safequeue
{
public:safequeue(){sem_init(&_lock, 0, 1);sem_init(&cons_sem, 0, 0);sem_init(&prod_sem, 0, 4);//设置初始空闲资源为4//往队列中读和写的初始值为0pos_r = 0;pos_w = 0;}~safequeue(){}void push(int data){//先保证对于生产者而言,是有资源的,有资源再去抢_lock//实际上就是要先保证同步sem_wait(&prod_sem);//当生产者有资源时,要保证生产者之间是互斥的,所以要抢_lock//实际上就是要保证互斥sem_wait(&_lock);//原代码//_que.push(data);//修改为环形队列的代码arr[pos_w] = data;pos_w = (pos_w + 1) % 4;//更新位置//当某一个生产者生产完,需要释放_lock,让其他生产者生产sem_post(&_lock);//当生产者生产完,需要通知消费者来消费sem_post(&cons_sem);}void pop(int* data){sem_wait(&cons_sem);sem_wait(&_lock);//原代码//*data = _que.front();//_que.pop();//替换为环形队列的代码*data = arr[pos_r];pos_r = (pos_r + 1) % 4;//更新位置sem_post(&_lock);sem_post(&prod_sem);}
private://不使用queue的结构//std::queue<int> _que;//为了验证环形队列,使用数组的结构,保证空间大小固定为4int arr[4];/** 保证多个线程互斥访问,* */sem_t _lock;//消费者信号量,初始化资源为0,后期资源个数由生产者加sem_t cons_sem;//生产者信号量,描述队列的空闲空间,初始化由程序员定,后期资源个数由消费者加sem_t prod_sem;//增加了两个往队列中读和写的位置int pos_r;int pos_w;
};