目录
1.基于阻塞队列的生产消费模型
(1)生产消费模型实现的条件
(2)初始化模型
(3)生产和消费
(3)所有代码
①test.cc
②myBlockQueue.hpp
2.基于环形的生产消费模型、sem的应用
(1)mySem的实现
①PV操作
②mySem的封装
(2)环形队列的初始化
(3)生产和消费
(4)所有代码
①test.cc
②myRingBuffer.hpp
1.基于阻塞队列的生产消费模型
(1)生产消费模型实现的条件
要实现生产消费模型,无外乎是3种关系(生产者和生产者、消费者和消费者、生产者和消费者)、2个角色(生产者、消费者)、1个交易场所(阻塞队列、环形队列等),不同的生产消费模型交易场所不同,可以满足不同需求。
如线程池就是一个典型的生产消费模型,它的交易场所就是队列queue,在上篇博客中代码已经表明了。要维护好生产消费模型,3种关系一定要保证,即生产者和生产者之间的互斥关系,消费者和消费者的互斥关系,生产者和消费者的同步互斥关系。以线程池为例,主线程在队列尾插,线程池的线程从队头取线程,同时生产者只有一个,必然互斥。消费者之间有cond,一定互斥。于是满足生产消费模型的条件。
后续我们的设计一定要满足这“321”原则,缺一不可。
(2)初始化模型
该模型含有生产者等待队列和消费者等待队列,两个队列共用一把锁,把这些维护好,那三种关系也就不成问题了。
(3)生产和消费
这就是线程池中的采用的方法。消费者一开始就全部陷入该队列进行等待,当消费之后再唤醒生产者;同理,生产者一开始就要生产数据,生产了数据就唤醒消费者。
也就是说生产者和消费者同时进入各自的函数,生产者一来就有空间生产,而消费者此时就一定没内容消费,所以此时生产消费是互补的状态,达成了同步。生产者生产了数据消费者就会被唤醒,消费者消费了也会通知正在等待的生产者,循环往复。
(3)所有代码
①test.cc
#include "myMutex.hpp"
#include "myThread.hpp"
#include "myCond.hpp"
#include "myBlockQueue.hpp"using namespace std;myBlockQueueModule::myBlockQueue<string> bq;void fun_consumer(int num)
{while (1){string ret = bq.consumer_pop();sleep(1);printf("%d号消费者获取的数据: %s\n", num, ret.c_str());}
}void fun_productor(int num)
{while (1){string str = to_string(num);bq.productor_push(str);printf("%d号生产者已写入数据\n", num);}
}int main()
{// 消费者myThreadModule::myThread<int> thread6(fun_consumer, 6);myThreadModule::myThread<int> thread7(fun_consumer, 7);myThreadModule::myThread<int> thread8(fun_consumer, 8);myThreadModule::myThread<int> thread9(fun_consumer, 9);myThreadModule::myThread<int> thread10(fun_consumer, 10);sleep(1);// 生产者myThreadModule::myThread<int> thread1(fun_productor, 1);myThreadModule::myThread<int> thread2(fun_productor, 2);myThreadModule::myThread<int> thread3(fun_productor, 3);myThreadModule::myThread<int> thread4(fun_productor, 4);myThreadModule::myThread<int> thread5(fun_productor, 5);thread1.start();thread2.start();thread3.start();thread4.start();thread5.start();thread6.start();thread7.start();thread8.start();thread9.start();thread10.start();thread1.join();thread2.join();thread3.join();thread4.join();thread5.join();thread6.join();thread7.join();thread8.join();thread9.join();thread10.join();return 0;
}
②myBlockQueue.hpp
#pragma once#include <iostream>
#include <queue>
#include "myMutex.hpp"
#include "myCond.hpp"using namespace std;namespace myBlockQueueModule
{const int max_capacity = 5; // 最多允许出现10个数据template <typename T>class myBlockQueue{private:bool isFull(){return _data.size() == _capacity;}bool isEmpty(){return _data.size() == 0;}public:myBlockQueue(): _capacity(max_capacity), _wait_consumer(0), _wait_productor(0){} // 已创建由阻塞队列实现的生产消费模型T consumer_pop() // 消费者从队列中消费{myMutexModule::myLockGuard LockGuard(_mutex); // RAII思想,借助临时变量的生命周期来释放锁while (isEmpty()){_wait_consumer++; // 一个消费者已进入等待队列,当前等待队列的消费线程数++_consumer_cond.wait(_mutex); // 该消费者线程进入等待队列_wait_consumer--; // 一个消费者已离开等待队列,当前等待队列的消费线程数--}T ret = _data.front(); // 不为空才能消费,走到这里一定都是不为空的_data.pop(); // 消费完就把数据扔了if (_wait_productor > 0) // 消费者刚刚消费了一个数据,所以一定有空位置,唤醒生产者来生产{_productor_cond.notify();}return ret;}void productor_push(const T &data) // 生产者生产数据{myMutexModule::myLockGuard LockGuard(_mutex); // RAII思想,借助临时变量的生命周期来释放锁while (isFull()){_wait_productor++; // 一个生产者已进入等待队列,当前等待队列的线程数++_productor_cond.wait(_mutex); // 该生产者线程进入等待队列_wait_productor--; // 一个生产者已进入等待队列,当前等待队列的线程数++}_data.push(data); // 不为满才能消费,走到这里一定都是不为满的if (_wait_productor > 0){_consumer_cond.notify();}}private:queue<T> _data; // 生产者和消费者共同访问的资源,相当于超市int _capacity; // 缓存最大容量,单位是T的个数myMutexModule::myMutex _mutex; // 一把锁,打开超市大门的唯一一把myCondModule::myCond _productor_cond; // 生产者的等待队列myCondModule::myCond _consumer_cond; // 消费者的等待队列int _wait_consumer; // 正在等待的消费者数量int _wait_productor; // 正在等待的生产者数量};}
2.基于环形的生产消费模型、sem的应用
(1)mySem的实现
①PV操作
sem信号量最重要的就是PV资源,其中P是获取资源(对应sem--),V是归还资源(对应sem++)。wait申请信号量如果成功就会继续执行下面的代码,失败的话就会被阻塞在wait函数,和mutex一样。但注意sem并不具备仅允许单线程进入的功能,只要资源足够多,sem一次性可以分发资源给多个线程。
②mySem的封装
封装原理和mutex和cond一模一样,都是封装成管理sem的类,之后通过调用这个对象的方法来进行sem操作
#pragma once#include <semaphore.h> //信号量需要的库,全局或局部都需要手动init
#include <iostream>
using namespace std;namespace mySemModule
{class mySem{public:mySem(int init_value){sem_init(&_sem, 0, init_value); // 第二个参数默认为0,第三个参数就是环形队列中剩余资源数}void P() // 申请信号量,买电影票,sem--{sem_wait(&_sem); // 申请不到就会阻塞,因此一来消费者就会被阻塞// 信号量表示要申请的资源数目,申请成功了就有资源,申请不成功就阻塞// 但它不可代替锁,因为多个线程来申请资源都可能同时申请成功// 只要有资源,信号量就不会阻塞它,这和锁是不一样的,锁是站在线程角度,sem站在资源角度}void V() // 归还信号量,sem++{sem_post(&_sem);}~mySem(){sem_destroy(&_sem);}private:sem_t _sem; // 管理sem的结构体,一个结构体描述一个sem};}
(2)环形队列的初始化
这里需要注意的是生产者的sem一来就是max_capacity,消费者的就是0,初始化的值就是当前可用资源。并且这次我创建了两把锁,因为对于环形队列来说生产者和消费者之间是解耦的,它们都只需要去申请sem即可,这和我们前面的逻辑是不一样的。
在阻塞队列中,生产者和消费者闲暇时间都在阻塞队列中,需要互相唤醒。而在这里sem即代表资源本身,生产者在没有生产空间时就被阻塞在P()中,只要消费者消费了资源,sem++,生产者就能马上去生产。生产者和消费者不用相互提醒,它们只需要对sem进行操作即可。
(3)生产和消费
生产者P()后生产数据,生产完后就++消费者的sem,同理,生产者消费者都仅需对对方的sem进行操作就能做到生产消费的循环。
sem不能充当mutex的加锁功能,因此还需要加锁,中间的环形队列毕竟是临界资源,sem也可能一次性放进来大量线程,否则会导致写入错乱。
生产者和消费者的下标相对位置一定不会变,因为sem一定会维护好的。所以我们没必要去关心环形队列本身。
(4)所有代码
①test.cc
#include "myThread.hpp"
#include "myRingBuffer.hpp"using namespace std;myRingBufferModule::myRingBuffer<string> ring_buffer;void fun_consumer(int num)
{while (1){sleep(1);string ret = ring_buffer.consumer_pop();printf("%d号消费者接收数据:%s\n", num, ret.c_str());}
}void fun_productor(int num)
{while (1){string str(to_string(num));ring_buffer.productor_push(str);printf("%d号生产者已发送数据\n", num);}
}int main()
{// 消费者myThreadModule::myThread<int> thread6(fun_consumer, 6);myThreadModule::myThread<int> thread7(fun_consumer, 7);myThreadModule::myThread<int> thread8(fun_consumer, 8);myThreadModule::myThread<int> thread9(fun_consumer, 9);myThreadModule::myThread<int> thread10(fun_consumer, 10);// 生产者myThreadModule::myThread<int> thread1(fun_productor, 1);myThreadModule::myThread<int> thread2(fun_productor, 2);myThreadModule::myThread<int> thread3(fun_productor, 3);myThreadModule::myThread<int> thread4(fun_productor, 4);myThreadModule::myThread<int> thread5(fun_productor, 5);thread1.start();thread2.start();thread3.start();thread4.start();thread5.start();thread6.start();thread7.start();thread8.start();thread9.start();thread10.start();thread1.join();thread2.join();thread3.join();thread4.join();thread5.join();thread6.join();thread7.join();thread8.join();thread9.join();thread10.join();return 0;
}
②myRingBuffer.hpp
#pragma once#include "mySem.hpp"
#include "myMutex.hpp"
#include <iostream>
#include <vector>
#include <semaphore.h> //信号量需要的库,全局或局部都需要手动init
using namespace std;namespace myRingBufferModule
{static int max_capacity = 30;template <typename T>class myRingBuffer{public:myRingBuffer(): _ring(max_capacity), // 提前开辟好空间_capacity(max_capacity), _productor_pos(0), _consumer_pos(0), // 容量下标信息_productor_space_sem(max_capacity), _consumer_data_sem(0) // 用0初始化,调用mySem构造函数{}void productor_push(const T &data) // 生产者生产数据{_productor_space_sem.P(); // 生产者申请sem,空间少一个,数据会多一个myMutexModule::myLockGuard LockGuard(_productor_mutex); // 虽然可能有多个线程都P成功,// 但必须保证单线程访问临界资源,保护临界区_ring[_productor_pos++] = data;_productor_pos %= _capacity; // 保持环状_consumer_data_sem.V(); // 生产消耗空间,数据多了一个,sem++// 这样就维护好了消费者和生产者的同步关系}T consumer_pop() // 消费者从队列中消费{_consumer_data_sem.P(); // 消费者申请sem,空间多一个,数据会少一个// 申请不到就会阻塞,因此一来消费者就会被阻塞//_productor_pos和_consumer_pos由于sem的阻塞特性永远不会再次相等// 所以我们不需要处理pos相同的情况// 先申请信号量,每个线程都把信号量申请到了,只受到锁的限制// 先锁的话就只有一个线程来申请信号量,其它线程被拦在锁外面,之后得线程还要申请锁、信号量,效率要低些// 类似于先买票,再排队进电影院和先排队,在电影院门口买票的区别,显然前者效率高myMutexModule::myLockGuard LockGuard(_consumer_mutex); // 虽然可能有多个线程都P成功,// 但必须保证单线程访问临界资源,保护临界区// 两把锁,并发逻辑,效率更高T ret = _ring[_consumer_pos++];_consumer_pos %= _capacity;_productor_space_sem.V(); // 消费消耗资源,空间多了一个,sem++// 这样就维护好了消费者和生产者的同步关系return ret;}private:vector<T> _ring; // 环形队列,作为超市int _capacity; // 缓存最大容量,单位是T的个数int _productor_pos; // 生产者生产数据保存的下标int _consumer_pos; // 生产者消费数据的下标mySemModule::mySem _productor_space_sem; // 生产者的信号量(空间信号量)mySemModule::mySem _consumer_data_sem; // 消费者的信号量(数据信号量)int _wait_consumer; // 正在等待的消费者数量int _wait_productor; // 正在等待的生产者数量myMutexModule::myMutex _productor_mutex; // 生产者队列中的锁myMutexModule::myMutex _consumer_mutex; // 消费者队列中的锁};}