目录
1. 生产者消费者模型
1.1 为何要使用生产者消费者模型
1.2 生产者消费者模型优点
2.基于BlockingQueue的生产者消费者模型
2.1 BlockingQueue
2.2 C++ queue模拟阻塞队列的生产消费模型
3.POSIX信号量
4.基于环形队列的生产消费模型
后记:●由于作者水平有限,文章难免存在谬误之处,敬请读者斧正,俚语成篇,恳望指教!
——By 作者:新晓·故知
1. 生产者消费者模型
1.1 为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。1.2 生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
2.基于BlockingQueue的生产者消费者模型
2.1 BlockingQueue
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)2.2 C++ queue模拟阻塞队列的生产消费模型
代码:为了便于理解,我们以单生产者,单消费者,来进行讲解。
#include <iostream> #include <queue> #include <stdlib.h> #include <pthread.h> #define NUM 8 class BlockQueue { private:std::queue<int> q;int cap;pthread_mutex_t lock;pthread_cond_t full;pthread_cond_t empty;private:void LockQueue(){pthread_mutex_lock(&lock);}void UnLockQueue(){pthread_mutex_unlock(&lock);}void ProductWait(){pthread_cond_wait(&full, &lock);}void ConsumeWait(){pthread_cond_wait(&empty, &lock);}void NotifyProduct(){pthread_cond_signal(&full);}void NotifyConsume(){pthread_cond_signal(&empty);}bool IsEmpty(){return (q.size() == 0 ? true : false);}bool IsFull(){return (q.size() == cap ? true : false);}public:BlockQueue(int _cap = NUM) : cap(_cap){pthread_mutex_init(&lock, NULL);pthread_cond_init(&full, NULL);pthread_cond_init(&empty, NULL);}void PushData(const int &data){LockQueue();while (IsFull()){NotifyConsume();std::cout << "queue full, notify consume data, product stop." << std::endl;ProductWait();}q.push(data);// NotifyConsume();UnLockQueue();}void PopData(int &data){LockQueue();while (IsEmpty()){NotifyProduct();std::cout << "queue empty, notify product data, consume stop." << std::endl;ConsumeWait();}data = q.front();q.pop();// NotifyProduct();UnLockQueue();}~BlockQueue(){pthread_mutex_destroy(&lock);pthread_cond_destroy(&full);pthread_cond_destroy(&empty);} }; void *consumer(void *arg) {BlockQueue *bqp = (BlockQueue *)arg;int data;for (;;){bqp->PopData(data);std::cout << "Consume data done : " << data << std::endl;} } // more faster void *producter(void *arg) {BlockQueue *bqp = (BlockQueue *)arg;srand((unsigned long)time(NULL));for (;;){int data = rand() % 1024;bqp->PushData(data);std::cout << "Prodoct data done: " << data << std::endl;// sleep(1);} } int main() {BlockQueue bq;pthread_t c, p;pthread_create(&c, NULL, consumer, (void *)&bq);pthread_create(&p, NULL, producter, (void *)&bq);pthread_join(c, NULL);pthread_join(p, NULL);return 0; }
模拟实现:
BlockQueue.hpp:
#pragma once #include <iostream> #include <queue> #include <cstdlib> #include <unistd.h> #include <pthread.h> using namespace std;// 实现新需求: 我只想保存最新的5个任务,如果来了任务,老的任务,我想让他直接被丢弃(自行选择实现)const uint32_t gDefaultCap = 5; template <class T> class BlockQueue { public:BlockQueue(uint32_t cap = gDefaultCap) : cap_(cap){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&conCond_, nullptr);pthread_cond_init(&proCond_, nullptr);}~BlockQueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&conCond_);pthread_cond_destroy(&proCond_);}public://生产接口void push(const T &in) // const &: 纯输入{// 加锁// 判断->是否适合生产->bq是否为满->程序员视角的条件->1. 满(不生产) 2. 不满(生产)// if(满) 不生产,休眠// else if(不满) 生产,唤醒消费者// 解锁lockQueue();while (isFull()) // ifFull就是我们在临界区中设定的条件{// before: 当我等待的时候,会自动释放mutex_proBlockWait(); //阻塞等待,等待被唤醒。 被唤醒 != 条件被满足(概率虽然很小),被唤醒 && 条件被满足//解决伪唤醒(使用while)// after: 当我醒来的时候,我是在临界区里醒来的!!}// 条件满足,可以生产pushCore(in); //生产完成// wakeupCon(); // 唤醒消费者unlockQueue();wakeupCon(); // 唤醒消费者}//消费接口T pop(){// 加锁// 判断->是否适合消费->bq是否为空->程序员视角的条件->1. 空(不消费) 2. 有(消费)// if(空) 不消费,休眠// else if(有) 消费,唤醒生产者// 解锁lockQueue();while (isEmpty()){conBlockwait(); //阻塞等待,等待被唤醒,?}// 条件满足,可以消费T tmp = popCore();unlockQueue();wakeupPro(); // 唤醒生产者return tmp;}private:void lockQueue(){pthread_mutex_lock(&mutex_);}void unlockQueue(){pthread_mutex_unlock(&mutex_);}bool isEmpty(){return bq_.empty();}bool isFull(){return bq_.size() == cap_;}void proBlockWait() // 生产者一定是在临界区中的!{// 1. 在阻塞线程的时候,会自动释放mutex_锁pthread_cond_wait(&proCond_, &mutex_);}void conBlockwait() //阻塞等待,等待被唤醒{// 1. 在阻塞线程的时候,会自动释放mutex_锁pthread_cond_wait(&conCond_, &mutex_);// 2. 当阻塞结束,返回的时候,pthread_cond_wait,会自动帮你重新获得mutex_,然后才返回// 为什么我们上节课,写的代码,批量退出线程的时候,发现无法退出? //唤醒时, 调用pthread_cond_wait(&conCond_, &mutex_);,重新去竞争这个锁,多个线程去竞争一个,且那个拥有锁的线程退出没有释放锁,最后导致其他线程阻塞。}void wakeupPro() // 唤醒生产者{pthread_cond_signal(&proCond_);}void wakeupCon() // 唤醒消费者{pthread_cond_signal(&conCond_);}void pushCore(const T &in){bq_.push(in); //生产完成}T popCore(){T tmp = bq_.front();bq_.pop();return tmp;}private:uint32_t cap_; //容量queue<T> bq_; // blockqueuepthread_mutex_t mutex_; //保护阻塞队列的互斥锁pthread_cond_t conCond_; // 让消费者等待的条件变量pthread_cond_t proCond_; // 让生产者等待的条件变量 };
BlockQueueTest.cc:
#include "Task.hpp" #include "BlockQueue.hpp" #include <ctime>const std::string ops = "+-*/%"; // 并发,并不是在临界区中并发(一般),而是生产前(before blockqueue),消费后(after blockqueue)对应的并发 void *consumer(void *args) {BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);while (true){Task t = bqp->pop(); // 消费任务int result = t(); //处理任务 --- 任务也是要花时间的!int one, two;char op;t.get(&one, &two, &op);cout << "consumer[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << "=" << result << endl;} } void *productor(void *args) {BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);while (true){// 1. 制作任务 --- 要不要花时间?? -- 网络,磁盘,用户int one = rand() % 50;int two = rand() % 20;char op = ops[rand() % ops.size()];Task t(one, two, op);// 2. 生产任务bqp->push(t);cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << "=?" << endl;sleep(1);} }int main() {srand((unsigned long)time(nullptr) ^ getpid());// 定义一个阻塞队列// 创建两个线程,productor, consumer// productor ----- consumer// BlockQueue<int> bq;// bq.push(10);// int a = bq.pop();// cout << a << endl;// 既然可以使用int类型的数据,我们也可以使用自己封装的类型,包括任务// BlockQueue<int> bq;BlockQueue<Task> bq;pthread_t c, p;pthread_create(&c, nullptr, consumer, &bq);pthread_create(&p, nullptr, productor, &bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0; }
Task.hpp:
#pragma once #include <iostream> #include <string>class Task { public:Task() :elemOne_(0),elemTwo_(0),operator_('0'){}Task(int one, int two, char op) :elemOne_(one),elemTwo_(two),operator_(op){}int operator() (){return run();}int run(){int result = 0;switch (operator_){case '+':result = elemOne_ + elemTwo_;break;case '-':result = elemOne_ - elemTwo_;break;case '*':result = elemOne_ * elemTwo_;break;case '/':{if (elemTwo_ == 0){std::cout << "div zero, abort" << std::endl;result = -1;}else{result = elemOne_ / elemTwo_;}}break;case '%':{if (elemTwo_ == 0){std::cout << "mod zero, abort" << std::endl;result = -1;}else{result = elemOne_ % elemTwo_;}}break;default:std::cout << "非法操作: " << operator_ << std::endl;break;}return result;}int get(int *e1, int *e2, char *op){*e1 = elemOne_;*e2 = elemTwo_;*op = operator_;} private:int elemOne_;int elemTwo_;char operator_; };
makefile:
CC=g++ FLAGS=-std=c++11 LD=-lpthread bin=blockQueue src=BlockQueueTest.cc$(bin):$(src)$(CC) -o $@ $^ $(LD) $(FLAGS) .PHONY:clean clean:rm -f $(bin)
3.POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。初始化信号量#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value); 参数:pshared:0表示线程间共享,非零表示进程间共享value:信号量初始值
销毁信号量int sem_destroy(sem_t *sem);等待信号量功能:等待信号量,会将信号量的值减1int sem_wait(sem_t *sem);发布信号量功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。int sem_post(sem_t *sem);上面的生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序 (POSIX信号量)信号量是一个计数器,描述临界资源数量的计数器。二元信号量==互斥锁
4.基于环形队列的生产消费模型
- 环形队列采用数组模拟,用模运算来模拟环状特性
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
- 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
模拟实现:
#include <iostream> #include <vector> #include <stdlib.h> #include <semaphore.h> #include <unistd.h> #include <pthread.h> #define NUM 16 class RingQueue { private:std::vector<int> q;int cap;sem_t data_sem;sem_t space_sem;int consume_step;int product_step;public:RingQueue(int _cap = NUM) : q(_cap), cap(_cap){sem_init(&data_sem, 0, 0);sem_init(&space_sem, 0, cap);consume_step = 0;product_step = 0;}void PutData(const int &data){sem_wait(&space_sem); // Pq[consume_step] = data;consume_step++;consume_step %= cap;sem_post(&data_sem); // V}void GetData(int &data){sem_wait(&data_sem);data = q[product_step];product_step++;product_step %= cap;sem_post(&space_sem);}~RingQueue(){sem_destroy(&data_sem);sem_destroy(&space_sem);} }; void *consumer(void *arg) {RingQueue *rqp = (RingQueue *)arg;int data;for (;;){rqp->GetData(data);std::cout << "Consume data done : " << data << std::endl;sleep(1);} } // more faster void *producter(void *arg) {RingQueue *rqp = (RingQueue *)arg;srand((unsigned long)time(NULL));for (;;){int data = rand() % 1024;rqp->PutData(data);std::cout << "Prodoct data done: " << data << std::endl;// sleep(1);} } int main() {RingQueue rq;pthread_t c, p;pthread_create(&c, NULL, consumer, (void *)&rq);pthread_create(&p, NULL, producter, (void *)&rq);pthread_join(c, NULL);pthread_join(p, NULL); }
RingQueue.hpp:
![]()
#pragma once #include <iostream> #include <vector> #include <string> #include <semaphore.h> using namespace std;const int gCap = 10; template <class T> class RingQueue { public:RingQueue(int cap = gCap):ringqueue_(cap),pIndex_(0),cIndex_(0){// 生产sem_init(&roomSem_, 0, ringqueue_.size());// 消费sem_init(&dataSem_, 0, 0);pthread_mutex_init(&pmutex_ ,nullptr);pthread_mutex_init(&cmutex_ ,nullptr);}// 生产void push(const T &in){sem_wait(&roomSem_); //无法被多次的申请pthread_mutex_lock(&pmutex_);ringqueue_[pIndex_] = in; //生产的过程pIndex_++; // 写入位置后移pIndex_ %= ringqueue_.size(); // 更新下标,保证环形特征pthread_mutex_unlock(&pmutex_);sem_post(&dataSem_);}// 消费T pop(){sem_wait(&dataSem_);pthread_mutex_lock(&cmutex_);T temp = ringqueue_[cIndex_];cIndex_++;cIndex_ %= ringqueue_.size();// 更新下标,保证环形特征pthread_mutex_unlock(&cmutex_);sem_post(&roomSem_);return temp;}~RingQueue(){sem_destroy(&roomSem_);sem_destroy(&dataSem_);pthread_mutex_destroy(&pmutex_);pthread_mutex_destroy(&cmutex_);} private:vector<T> ringqueue_; // 唤醒队列sem_t roomSem_; // 衡量空间计数器,productorsem_t dataSem_; // 衡量数据计数器,consumeruint32_t pIndex_; // 当前生产者写入的位置, 如果是多线程,pIndex_也是临界资源uint32_t cIndex_; // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源pthread_mutex_t pmutex_;pthread_mutex_t cmutex_; };
RingQueueTest.cc:
#include "RingQueue.hpp" #include <ctime> #include <unistd.h>// 我们是单生产者,单消费者 // 多生产者,多消费者??代码怎么改? // 为什么呢???多生产者,多消费者? // 不要只关心把数据或者任务,从ringqueue 放拿的过程,获取数据或者任务,处理数据或者任务,也是需要花时间的!void *productor(void *args) {RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);while(true){int data = rand()%10;rqp->push(data);cout << "pthread[" << pthread_self() << "]" << " 生产了一个数据: " << data << endl;sleep(1);} }void *consumer(void *args) {RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);while(true){//sleep(10);int data = rqp->pop();cout << "pthread[" << pthread_self() << "]" << " 消费了一个数据: " << data << endl;} }int main() {srand((unsigned long)time(nullptr)^getpid());RingQueue<int> rq;pthread_t c1,c2,c3, p1,p2,p3;pthread_create(&p1, nullptr, productor, &rq);pthread_create(&p2, nullptr, productor, &rq);pthread_create(&p3, nullptr, productor, &rq);pthread_create(&c1, nullptr, consumer, &rq);pthread_create(&c2, nullptr, consumer, &rq);pthread_create(&c3, nullptr, consumer, &rq);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(c3, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);pthread_join(p3, nullptr);return 0; }
makefile:
![]()
CC=g++ FLAGS=-std=c++11 LD=-lpthread bin=ringQueue src=RingQueueTest.cc$(bin):$(src)$(CC) -o $@ $^ $(LD) $(FLAGS) .PHONY:clean clean:rm -f $(bin)