生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
一、堵塞队列
(1)三种关系
生产者vs生产者:互斥(加锁)
消费者vs消费者:互斥(加锁)
生产者vs消费者:互斥和同步(加锁和条件变量)
(2)代码实现
Makefile
test:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f test
BlockQueue.hpp
#include<iostream>
#include<queue>
#include<unistd.h>
#define M 10
template<class T>
class BlockQueue{
public:
BlockQueue(T cap=M)
:_capacity(M)
{pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_pcond,nullptr);pthread_cond_init(&_ccond,nullptr);}bool IsFull()
{return q.size()==_capacity;
}
bool empty()
{return q.size()==0;
}void Push( T in)
{pthread_mutex_lock(&_mutex);//if(!IsFull())//可能出现伪唤醒while(IsFull())//健壮性{pthread_cond_wait(&_pcond,&_mutex);}q.push(in);std::cout<<"push:"<<in<<std::endl;sleep(1);pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_mutex);
}void Pop()
{pthread_mutex_lock(&_mutex);while(empty())//if(!empty())//如果这里使用if判断的话,可能出现伪唤醒问题。{pthread_cond_wait(&_ccond,&_mutex);}auto n=q.front();std::cout<<"pop:"<<n<<std::endl;sleep(1);q.pop();pthread_cond_signal(&_pcond);pthread_mutex_unlock(&_mutex);}~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}private:pthread_mutex_t _mutex;pthread_cond_t _pcond;pthread_cond_t _ccond;int _capacity=M;std::queue<T> q;};
Main.cc
#include<pthread.h>
#include"BlockQueue.hpp"
#include<iostream>
void* consumer(void* argv)
{BlockQueue<int>* q=static_cast<BlockQueue<int>*>(argv);int i=0;while(true){ q->Pop();}return nullptr;
}
void * productor(void* argv)
{BlockQueue<int>* q=static_cast<BlockQueue<int>*>(argv);while(true){int data=rand()%10+1;q->Push(data);}
return nullptr;
}int main()
{srand((unsigned)time(NULL)^getpid()^pthread_self());pthread_t c,p;BlockQueue<int>* bq=new BlockQueue<int>();pthread_create(&c,nullptr,consumer,bq);pthread_create(&p,nullptr,productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
(3)总结
以上是单生产和单消费,对于多生产和多消费也是可以的,因为是同一个队列,同一把锁,同一把锁就决定了,生产者和生产者,消费者和消费者之间就是互斥的,生产者和消费者的条件变量提供了同步。
队列中的数据也可以是任务,堵塞队列可以实现高并发高效率,在与每个线程都拿到了自己的任务
并且处理任务,处理任务也是需要时间的,这个决定了,每个线程拿到任务都在跑自己的任务代码,实现高并发。同时条件变量让生产者和消费者进行同步,保证了安全性。
1.消费者和生产者调度优先级
如果消费者线程先调度,队列为空,消费者就会在条件变量下进行等待,等生产者生产商品了,就会唤醒消费者进行消费。
2.如何控制生产消费的节奏
我们可以通过sleep控制。比如消费者消费进行休眠的话,可以给生产者足够的时间进行生产
3.伪唤醒
如果用if来判断队列为空可能会出现伪唤醒,有些线程处于等待堵塞,竞争锁的状态,一旦队列为空而线程竞争到了锁就会出现队列为空依然进行pop的现象。
while(true)可以提供检查,if判断可能有风险。
环形队列
(1)POSIX信号量
posix和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步.
快速认识接口:
(1)初始化
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);参数: pshared:0表示线程间共享,非零表示进程间共享value:信号量初始值
(2)销毁
int sem_destroy(sem_t *sem);
(3)等待
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem);//p()
(3)通知
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。int sem_post(sem_t *sem);//V()
三种关系
(2)对于环形队列,生产者和消费者就有两种场景会指向同一个位置。要么为空,要么为满,其他情况不会指向同一个位置。
1.如果队列为空,只能让生产者先生产,消费者不可以消费---------互斥
2.如果队列为满,只能让消费者先消费,然后到生产者生产---------同步
3.其余情况,消费者都是在生产者后面的,两个位置不同,即使pop和push同时进行,也是安全的,这个就是多线程高并发可以进入临界区的原因。
如何实现多线程中的生产者和生产者,消费者和消费者的互斥问题,对于循环队列,我们要定义两把锁,一个是push队列的锁,一个是pop队列的锁。pv操作是原子性的,让生产者和消费者进行同步其中又可以体现互斥。
代码实现
makefile
ringtest:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f ringtest
Main.cc
#include<iostream>
#include<pthread.h>
#include"RingQueue.hpp"
#include<pthread.h>void* producter(void* args)
{while(true){ RingQueue<int>* rq=static_cast<RingQueue<int>*>(args);int n=1;n=rand()%3+1;rq->Push(n);}}
void* consumer(void* argv)
{while(true)
{RingQueue<int>* q=static_cast<RingQueue<int>*>(argv);int date=0;q->Pop(&date);}}int main()
{srand(time(nullptr));pthread_t p,c;
pthread_mutex_t pm,cm;LockGuard pmutex(&pm),cmutex(&cm);RingQueue<int> ringqueue(cmutex,pmutex);
pthread_create(&p,nullptr,consumer,&ringqueue);
pthread_create(&c,nullptr,producter,&ringqueue);pthread_join(p,nullptr);pthread_join(c,nullptr);return 0;
}
RingQueue.hpp
#include<vector>
#include<iostream>
#include"LockGuard.hpp"
#include<unistd.h>
const int Size =10;
template<class T>
class RingQueue{
private:void P(sem_t& sem){sem_wait(&sem);}void V(sem_t& sem){sem_post(&sem);}
public:
RingQueue(LockGuard c,LockGuard p,int s=Size)
:size(s),pmutex(p),cmutex(c),q(s),ppose(0),cpose(0)
{
sem_init(&psem,0,size);
sem_init(&csem,0,0);}// 生产
void Push(const T& in)
{// 先加锁,还是先申请信号量?先申请信号量,效率高。申请到资源的线程,只有竞争到锁,就可以生产了。P(psem);{pmutex;q[ppose] = in;std::cout<<"生产:"<<in<<std::endl;ppose++;ppose %=size;}V(csem);
}
void Pop(T* out)
{P(csem);{cmutex;*out = q[cpose];sleep(1);std::cout<<"消费:"<<*out<<std::endl;cpose++;cpose %=size;}V(psem);}
~RingQueue()
{sem_destroy(&psem);sem_destroy(&csem);
}private:int size;std::vector<int> q;int ppose;//生产者位置int cpose;//消费者位置sem_t psem;//生产者信号量sem_t csem;//消费者信号量LockGuard pmutex;LockGuard cmutex;};
#pragma once
#include<pthread.h>
#include <semaphore.h>
class Mutex{
public:Mutex(pthread_mutex_t* mutex):_Mutex(mutex){pthread_mutex_init(_Mutex,nullptr);}void Lock(){pthread_mutex_lock(_Mutex);}void unlock(){pthread_mutex_unlock(_Mutex);}~Mutex(){pthread_mutex_destroy(_Mutex);}private:pthread_mutex_t* _Mutex;
};class LockGuard{
public:LockGuard(pthread_mutex_t* lock):mutex(lock){mutex.Lock();}~LockGuard(){mutex.unlock();}private:Mutex mutex;};