<生产者、消费者问题>——《Linux》

news/2025/2/21 20:12:10/

 目录

1. 生产者消费者模型

1.1 为何要使用生产者消费者模型

1.2 生产者消费者模型优点

2.基于BlockingQueue的生产者消费者模型

2.1 BlockingQueue

2.2 C++ queue模拟阻塞队列的生产消费模型

3.POSIX信号量

4.基于环形队列的生产消费模型

后记:●由于作者水平有限,文章难免存在谬误之处,敬请读者斧正,俚语成篇,恳望指教!

                                                                           ——By 作者:新晓·故知


1. 生产者消费者模型

1.1 为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

1.2 生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

2.基于BlockingQueue的生产者消费者模型

2.1 BlockingQueue

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

2.2 C++ queue模拟阻塞队列的生产消费模型

代码:
为了便于理解,我们以单生产者,单消费者,来进行讲解。

#include <iostream>
#include <queue>
#include <stdlib.h>
#include <pthread.h>
#define NUM 8
class BlockQueue
{
private:std::queue<int> q;int cap;pthread_mutex_t lock;pthread_cond_t full;pthread_cond_t empty;private:void LockQueue(){pthread_mutex_lock(&lock);}void UnLockQueue(){pthread_mutex_unlock(&lock);}void ProductWait(){pthread_cond_wait(&full, &lock);}void ConsumeWait(){pthread_cond_wait(&empty, &lock);}void NotifyProduct(){pthread_cond_signal(&full);}void NotifyConsume(){pthread_cond_signal(&empty);}bool IsEmpty(){return (q.size() == 0 ? true : false);}bool IsFull(){return (q.size() == cap ? true : false);}public:BlockQueue(int _cap = NUM) : cap(_cap){pthread_mutex_init(&lock, NULL);pthread_cond_init(&full, NULL);pthread_cond_init(&empty, NULL);}void PushData(const int &data){LockQueue();while (IsFull()){NotifyConsume();std::cout << "queue full, notify consume data, product stop." << std::endl;ProductWait();}q.push(data);// NotifyConsume();UnLockQueue();}void PopData(int &data){LockQueue();while (IsEmpty()){NotifyProduct();std::cout << "queue empty, notify product data, consume stop." << std::endl;ConsumeWait();}data = q.front();q.pop();// NotifyProduct();UnLockQueue();}~BlockQueue(){pthread_mutex_destroy(&lock);pthread_cond_destroy(&full);pthread_cond_destroy(&empty);}
};
void *consumer(void *arg)
{BlockQueue *bqp = (BlockQueue *)arg;int data;for (;;){bqp->PopData(data);std::cout << "Consume data done : " << data << std::endl;}
}
// more faster
void *producter(void *arg)
{BlockQueue *bqp = (BlockQueue *)arg;srand((unsigned long)time(NULL));for (;;){int data = rand() % 1024;bqp->PushData(data);std::cout << "Prodoct data done: " << data << std::endl;// sleep(1);}
}
int main()
{BlockQueue bq;pthread_t c, p;pthread_create(&c, NULL, consumer, (void *)&bq);pthread_create(&p, NULL, producter, (void *)&bq);pthread_join(c, NULL);pthread_join(p, NULL);return 0;
}

模拟实现:

BlockQueue.hpp:

#pragma once
#include <iostream>
#include <queue>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
using namespace std;// 实现新需求: 我只想保存最新的5个任务,如果来了任务,老的任务,我想让他直接被丢弃(自行选择实现)const uint32_t gDefaultCap = 5;
template <class T>
class BlockQueue
{
public:BlockQueue(uint32_t cap = gDefaultCap) : cap_(cap){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&conCond_, nullptr);pthread_cond_init(&proCond_, nullptr);}~BlockQueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&conCond_);pthread_cond_destroy(&proCond_);}public://生产接口void push(const T &in) // const &: 纯输入{// 加锁// 判断->是否适合生产->bq是否为满->程序员视角的条件->1. 满(不生产) 2. 不满(生产)// if(满) 不生产,休眠// else if(不满) 生产,唤醒消费者// 解锁lockQueue();while (isFull()) // ifFull就是我们在临界区中设定的条件{// before: 当我等待的时候,会自动释放mutex_proBlockWait(); //阻塞等待,等待被唤醒。 被唤醒 != 条件被满足(概率虽然很小),被唤醒 && 条件被满足//解决伪唤醒(使用while)// after: 当我醒来的时候,我是在临界区里醒来的!!}// 条件满足,可以生产pushCore(in); //生产完成// wakeupCon(); // 唤醒消费者unlockQueue();wakeupCon(); // 唤醒消费者}//消费接口T pop(){// 加锁// 判断->是否适合消费->bq是否为空->程序员视角的条件->1. 空(不消费) 2. 有(消费)// if(空) 不消费,休眠// else if(有) 消费,唤醒生产者// 解锁lockQueue();while (isEmpty()){conBlockwait(); //阻塞等待,等待被唤醒,?}// 条件满足,可以消费T tmp = popCore();unlockQueue();wakeupPro(); // 唤醒生产者return tmp;}private:void lockQueue(){pthread_mutex_lock(&mutex_);}void unlockQueue(){pthread_mutex_unlock(&mutex_);}bool isEmpty(){return bq_.empty();}bool isFull(){return bq_.size() == cap_;}void proBlockWait() // 生产者一定是在临界区中的!{// 1. 在阻塞线程的时候,会自动释放mutex_锁pthread_cond_wait(&proCond_, &mutex_);}void conBlockwait() //阻塞等待,等待被唤醒{// 1. 在阻塞线程的时候,会自动释放mutex_锁pthread_cond_wait(&conCond_, &mutex_);// 2. 当阻塞结束,返回的时候,pthread_cond_wait,会自动帮你重新获得mutex_,然后才返回// 为什么我们上节课,写的代码,批量退出线程的时候,发现无法退出?  //唤醒时, 调用pthread_cond_wait(&conCond_, &mutex_);,重新去竞争这个锁,多个线程去竞争一个,且那个拥有锁的线程退出没有释放锁,最后导致其他线程阻塞。}void wakeupPro() // 唤醒生产者{pthread_cond_signal(&proCond_);}void wakeupCon() // 唤醒消费者{pthread_cond_signal(&conCond_);}void pushCore(const T &in){bq_.push(in); //生产完成}T popCore(){T tmp = bq_.front();bq_.pop();return tmp;}private:uint32_t cap_;           //容量queue<T> bq_;            // blockqueuepthread_mutex_t mutex_;  //保护阻塞队列的互斥锁pthread_cond_t conCond_; // 让消费者等待的条件变量pthread_cond_t proCond_; // 让生产者等待的条件变量
};

BlockQueueTest.cc: 

#include "Task.hpp"
#include "BlockQueue.hpp"
#include <ctime>const std::string ops = "+-*/%";
// 并发,并不是在临界区中并发(一般),而是生产前(before blockqueue),消费后(after blockqueue)对应的并发
void *consumer(void *args)
{BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);while (true){Task t = bqp->pop(); // 消费任务int result = t();    //处理任务 --- 任务也是要花时间的!int one, two;char op;t.get(&one, &two, &op);cout << "consumer[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << "=" << result << endl;}
}
void *productor(void *args)
{BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);while (true){// 1. 制作任务 --- 要不要花时间?? -- 网络,磁盘,用户int one = rand() % 50;int two = rand() % 20;char op = ops[rand() % ops.size()];Task t(one, two, op);// 2. 生产任务bqp->push(t);cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << "=?" << endl;sleep(1);}
}int main()
{srand((unsigned long)time(nullptr) ^ getpid());// 定义一个阻塞队列// 创建两个线程,productor, consumer// productor -----  consumer// BlockQueue<int> bq;// bq.push(10);// int a = bq.pop();// cout << a << endl;// 既然可以使用int类型的数据,我们也可以使用自己封装的类型,包括任务// BlockQueue<int> bq;BlockQueue<Task> bq;pthread_t c, p;pthread_create(&c, nullptr, consumer, &bq);pthread_create(&p, nullptr, productor, &bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

Task.hpp:

#pragma once
#include <iostream>
#include <string>class Task
{
public:Task() :elemOne_(0),elemTwo_(0),operator_('0'){}Task(int one, int two, char op) :elemOne_(one),elemTwo_(two),operator_(op){}int operator() (){return run();}int run(){int result = 0;switch (operator_){case '+':result = elemOne_ + elemTwo_;break;case '-':result = elemOne_ - elemTwo_;break;case '*':result = elemOne_ * elemTwo_;break;case '/':{if (elemTwo_ == 0){std::cout << "div zero, abort" << std::endl;result = -1;}else{result = elemOne_ / elemTwo_;}}break;case '%':{if (elemTwo_ == 0){std::cout << "mod zero, abort" << std::endl;result = -1;}else{result = elemOne_ % elemTwo_;}}break;default:std::cout << "非法操作: " << operator_ << std::endl;break;}return result;}int get(int *e1, int *e2, char *op){*e1 = elemOne_;*e2 = elemTwo_;*op = operator_;}
private:int elemOne_;int elemTwo_;char operator_;
};

makefile: 

CC=g++
FLAGS=-std=c++11
LD=-lpthread
bin=blockQueue
src=BlockQueueTest.cc$(bin):$(src)$(CC) -o $@ $^ $(LD) $(FLAGS)
.PHONY:clean
clean:rm -f $(bin)

3.POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:pshared:0表示线程间共享,非零表示进程间共享value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem);
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);
上面的生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序 (POSIX信号量)
信号量是一个计数器,描述临界资源数量的计数器。二元信号量==互斥锁

4.基于环形队列的生产消费模型

  • 环形队列采用数组模拟,用模运算来模拟环状特性

  • 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
  •  但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程

 模拟实现:

#include <iostream>
#include <vector>
#include <stdlib.h>
#include <semaphore.h>
#include <unistd.h>
#include <pthread.h>
#define NUM 16
class RingQueue
{
private:std::vector<int> q;int cap;sem_t data_sem;sem_t space_sem;int consume_step;int product_step;public:RingQueue(int _cap = NUM) : q(_cap), cap(_cap){sem_init(&data_sem, 0, 0);sem_init(&space_sem, 0, cap);consume_step = 0;product_step = 0;}void PutData(const int &data){sem_wait(&space_sem); // Pq[consume_step] = data;consume_step++;consume_step %= cap;sem_post(&data_sem); // V}void GetData(int &data){sem_wait(&data_sem);data = q[product_step];product_step++;product_step %= cap;sem_post(&space_sem);}~RingQueue(){sem_destroy(&data_sem);sem_destroy(&space_sem);}
};
void *consumer(void *arg)
{RingQueue *rqp = (RingQueue *)arg;int data;for (;;){rqp->GetData(data);std::cout << "Consume data done : " << data << std::endl;sleep(1);}
}
// more faster
void *producter(void *arg)
{RingQueue *rqp = (RingQueue *)arg;srand((unsigned long)time(NULL));for (;;){int data = rand() % 1024;rqp->PutData(data);std::cout << "Prodoct data done: " << data << std::endl;// sleep(1);}
}
int main()
{RingQueue rq;pthread_t c, p;pthread_create(&c, NULL, consumer, (void *)&rq);pthread_create(&p, NULL, producter, (void *)&rq);pthread_join(c, NULL);pthread_join(p, NULL);
}

RingQueue.hpp:

 

#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <semaphore.h>
using namespace std;const int gCap = 10;
template <class T>
class RingQueue
{
public:RingQueue(int cap = gCap):ringqueue_(cap),pIndex_(0),cIndex_(0){// 生产sem_init(&roomSem_, 0, ringqueue_.size());// 消费sem_init(&dataSem_, 0, 0);pthread_mutex_init(&pmutex_ ,nullptr);pthread_mutex_init(&cmutex_ ,nullptr);}// 生产void push(const T &in){sem_wait(&roomSem_); //无法被多次的申请pthread_mutex_lock(&pmutex_);ringqueue_[pIndex_] = in; //生产的过程pIndex_++;   // 写入位置后移pIndex_ %= ringqueue_.size(); // 更新下标,保证环形特征pthread_mutex_unlock(&pmutex_);sem_post(&dataSem_);}// 消费T pop(){sem_wait(&dataSem_);pthread_mutex_lock(&cmutex_);T temp = ringqueue_[cIndex_];cIndex_++;cIndex_ %= ringqueue_.size();// 更新下标,保证环形特征pthread_mutex_unlock(&cmutex_);sem_post(&roomSem_);return temp;}~RingQueue(){sem_destroy(&roomSem_);sem_destroy(&dataSem_);pthread_mutex_destroy(&pmutex_);pthread_mutex_destroy(&cmutex_);}
private:vector<T> ringqueue_; // 唤醒队列sem_t roomSem_;       // 衡量空间计数器,productorsem_t dataSem_;       // 衡量数据计数器,consumeruint32_t pIndex_;     // 当前生产者写入的位置, 如果是多线程,pIndex_也是临界资源uint32_t cIndex_;     // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源pthread_mutex_t pmutex_;pthread_mutex_t cmutex_;
};

RingQueueTest.cc:

#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>// 我们是单生产者,单消费者
// 多生产者,多消费者??代码怎么改?
// 为什么呢???多生产者,多消费者?
// 不要只关心把数据或者任务,从ringqueue 放拿的过程,获取数据或者任务,处理数据或者任务,也是需要花时间的!void *productor(void *args)
{RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);while(true){int data = rand()%10;rqp->push(data);cout << "pthread[" << pthread_self() << "]" << " 生产了一个数据: " << data << endl;sleep(1);}
}void *consumer(void *args)
{RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);while(true){//sleep(10);int data = rqp->pop();cout << "pthread[" << pthread_self() << "]" << " 消费了一个数据: " << data << endl;}
}int main()
{srand((unsigned long)time(nullptr)^getpid());RingQueue<int> rq;pthread_t c1,c2,c3, p1,p2,p3;pthread_create(&p1, nullptr, productor, &rq);pthread_create(&p2, nullptr, productor, &rq);pthread_create(&p3, nullptr, productor, &rq);pthread_create(&c1, nullptr, consumer, &rq);pthread_create(&c2, nullptr, consumer, &rq);pthread_create(&c3, nullptr, consumer, &rq);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(c3, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);pthread_join(p3, nullptr);return 0;
}

 makefile:

 

CC=g++
FLAGS=-std=c++11
LD=-lpthread
bin=ringQueue
src=RingQueueTest.cc$(bin):$(src)$(CC) -o $@ $^ $(LD) $(FLAGS)
.PHONY:clean
clean:rm -f $(bin)

 

后记:
●由于作者水平有限,文章难免存在谬误之处,敬请读者斧正,俚语成篇,恳望指教!

                                                                           ——By 作者:新晓·故知


http://www.ppmy.cn/news/6995.html

相关文章

MARKETS AND MARKET LOGIC——The Market‘s Principles (2)

市场逻辑原则、操作程序和特点 下面这节的目标是提出一个相当直接和逻辑性的论点&#xff1a;为了理解市场结构&#xff0c;需要阅读并理解当前市场产生的信息。一旦完成这一点&#xff0c;人们就能够区分市场活动的类型以及价格。为了提供论点的实质&#xff0c;以下原则已被剥…

【云原生之Docker实战】使用Docker部署个人FireflyIII财务系统

【云原生之Docker实战】使用Docker部署个人FireflyIII财务系统 一、FireflyIII介绍1.FireflyIII简介2.FireflyIII特点二、检查本地环境1.检查系统版本2.检查docker版本3.检查docker状态4.检查docker compose版本三、下载FireflyIII镜像四、编辑docker-compose.yaml文件1.创建数…

Numpy常用操作(一)

为什么使用Numpy? Numpy是Python的基础&#xff0c;更是数据科学的通用语言&#xff0c;且与TensorFlow关系密切&#xff0c;Python本身支持list和 tuple的数据存储,但这些数据结构有很多不足&#xff0c;因列表的元素可以为任意对象&#xff0c;因此列表中所保存的是对象的指…

大话测试数据(一)

导读&#xff1a;测试数据的准备至关重要&#xff0c;无论是手工测试还是自动化测试都要以良好的测试数据准备为基础。本文为霍格沃兹测试学院特邀嘉宾&#xff0c;某互联网巨头企业资深测试技术专家刘晓光&#xff08;skytraveler&#xff09;老师对测试数据管理实践的思考总结…

Linux 管理联网 配置静态解析 域名解析

问题引入 # 我们平时在网址栏访问网址的时候&#xff0c; 比如 &#xff1a; 访问 百度 的时候&#xff0c;是怎样访问的 &#xff1f; >>> 那是不是就是在 地址栏 我们输入 www.baidu.com 随后就跳转到 百度的页面去了~&#xff01; 但实际是 &#xff1a; >&g…

经营报表-FineReport配置Oracle外接数据库(2)

1. 配置外接数据库 1.1 外接数据库配置入口 外接数据库的配置入口&#xff0c;有三种形式&#xff1a; 1&#xff09;超级管理员第一次登录数据决策系统时&#xff0c;即可为系统配置外接数据库。如下图所示&#xff1a; 2&#xff09;对于使用内置数据库的系统&#xff0c;管…

【OpenCV-Python】教程:8-2 图像修复 Image Inpainting

OpenCV Python 图像修复 【目标】 去除小噪声和笔画等&#xff1b; 【理论】 大多数人家里都会有一些旧照片&#xff0c;上面有一些黑点&#xff0c;一些笔画等。你想过把它修复回来吗?我们不能简单地在油漆工具中删除它们&#xff0c;因为它只会用白色结构取代黑色结构&a…

4.移动端布局-flex布局**

1、传统布局和flex布局 传统布局&#xff1a;PC端 兼容性好布局繁琐局限性&#xff0c;不能在移动端很好的布局 flex布局&#xff1a;PC端、移动端操作方便&#xff0c;布局简单&#xff0c;移动端应用广泛PC端浏览器支持情况较差IE11或更低版本&#xff0c;不支持或仅部分支…