Linux相关概念和易错知识点(33)(基于阻塞队列和环形队列的生产消费模型实现、sem的应用)

server/2025/3/4 7:02:36/

目录

1.基于阻塞队列的生产消费模型

(1)生产消费模型实现的条件

(2)初始化模型

(3)生产和消费

(3)所有代码

①test.cc

②myBlockQueue.hpp

2.基于环形的生产消费模型、sem的应用

(1)mySem的实现

①PV操作

②mySem的封装

(2)环形队列的初始化

(3)生产和消费

(4)所有代码

①test.cc

②myRingBuffer.hpp


1.基于阻塞队列的生产消费模型

(1)生产消费模型实现的条件

要实现生产消费模型,无外乎是3种关系(生产者和生产者、消费者和消费者、生产者和消费者)、2个角色(生产者、消费者)、1个交易场所(阻塞队列、环形队列等),不同的生产消费模型交易场所不同,可以满足不同需求。

线程池就是一个典型的生产消费模型,它的交易场所就是队列queue,在上篇博客中代码已经表明了。要维护好生产消费模型,3种关系一定要保证,即生产者和生产者之间的互斥关系,消费者和消费者的互斥关系,生产者和消费者的同步互斥关系。以线程池为例,主线程在队列尾插,线程池的线程从队头取线程,同时生产者只有一个,必然互斥。消费者之间有cond,一定互斥。于是满足生产消费模型的条件。

后续我们的设计一定要满足这“321”原则,缺一不可。

(2)初始化模型

该模型含有生产者等待队列和消费者等待队列,两个队列共用一把锁,把这些维护好,那三种关系也就不成问题了。

(3)生产和消费

这就是线程池中的采用的方法。消费者一开始就全部陷入该队列进行等待,当消费之后再唤醒生产者;同理,生产者一开始就要生产数据,生产了数据就唤醒消费者。

也就是说生产者和消费者同时进入各自的函数,生产者一来就有空间生产,而消费者此时就一定没内容消费,所以此时生产消费是互补的状态,达成了同步。生产者生产了数据消费者就会被唤醒,消费者消费了也会通知正在等待的生产者,循环往复。

(3)所有代码

①test.cc

#include "myMutex.hpp"
#include "myThread.hpp"
#include "myCond.hpp"
#include "myBlockQueue.hpp"using namespace std;myBlockQueueModule::myBlockQueue<string> bq;void fun_consumer(int num)
{while (1){string ret = bq.consumer_pop();sleep(1);printf("%d号消费者获取的数据: %s\n", num, ret.c_str());}
}void fun_productor(int num)
{while (1){string str = to_string(num);bq.productor_push(str);printf("%d号生产者已写入数据\n", num);}
}int main()
{// 消费者myThreadModule::myThread<int> thread6(fun_consumer, 6);myThreadModule::myThread<int> thread7(fun_consumer, 7);myThreadModule::myThread<int> thread8(fun_consumer, 8);myThreadModule::myThread<int> thread9(fun_consumer, 9);myThreadModule::myThread<int> thread10(fun_consumer, 10);sleep(1);// 生产者myThreadModule::myThread<int> thread1(fun_productor, 1);myThreadModule::myThread<int> thread2(fun_productor, 2);myThreadModule::myThread<int> thread3(fun_productor, 3);myThreadModule::myThread<int> thread4(fun_productor, 4);myThreadModule::myThread<int> thread5(fun_productor, 5);thread1.start();thread2.start();thread3.start();thread4.start();thread5.start();thread6.start();thread7.start();thread8.start();thread9.start();thread10.start();thread1.join();thread2.join();thread3.join();thread4.join();thread5.join();thread6.join();thread7.join();thread8.join();thread9.join();thread10.join();return 0;
}

②myBlockQueue.hpp

#pragma once#include <iostream>
#include <queue>
#include "myMutex.hpp"
#include "myCond.hpp"using namespace std;namespace myBlockQueueModule
{const int max_capacity = 5; // 最多允许出现10个数据template <typename T>class myBlockQueue{private:bool isFull(){return _data.size() == _capacity;}bool isEmpty(){return _data.size() == 0;}public:myBlockQueue(): _capacity(max_capacity), _wait_consumer(0), _wait_productor(0){} // 已创建由阻塞队列实现的生产消费模型T consumer_pop() // 消费者从队列中消费{myMutexModule::myLockGuard LockGuard(_mutex); // RAII思想,借助临时变量的生命周期来释放锁while (isEmpty()){_wait_consumer++;			 // 一个消费者已进入等待队列,当前等待队列的消费线程数++_consumer_cond.wait(_mutex); // 该消费者线程进入等待队列_wait_consumer--;			 // 一个消费者已离开等待队列,当前等待队列的消费线程数--}T ret = _data.front(); // 不为空才能消费,走到这里一定都是不为空的_data.pop();		   // 消费完就把数据扔了if (_wait_productor > 0) // 消费者刚刚消费了一个数据,所以一定有空位置,唤醒生产者来生产{_productor_cond.notify();}return ret;}void productor_push(const T &data) // 生产者生产数据{myMutexModule::myLockGuard LockGuard(_mutex); // RAII思想,借助临时变量的生命周期来释放锁while (isFull()){_wait_productor++;			  // 一个生产者已进入等待队列,当前等待队列的线程数++_productor_cond.wait(_mutex); // 该生产者线程进入等待队列_wait_productor--;			  // 一个生产者已进入等待队列,当前等待队列的线程数++}_data.push(data); // 不为满才能消费,走到这里一定都是不为满的if (_wait_productor > 0){_consumer_cond.notify();}}private:queue<T> _data; // 生产者和消费者共同访问的资源,相当于超市int _capacity;	// 缓存最大容量,单位是T的个数myMutexModule::myMutex _mutex;		  // 一把锁,打开超市大门的唯一一把myCondModule::myCond _productor_cond; // 生产者的等待队列myCondModule::myCond _consumer_cond;  // 消费者的等待队列int _wait_consumer;	 // 正在等待的消费者数量int _wait_productor; // 正在等待的生产者数量};}

2.基于环形的生产消费模型、sem的应用

(1)mySem的实现

①PV操作

sem信号量最重要的就是PV资源,其中P是获取资源(对应sem--),V是归还资源(对应sem++)。wait申请信号量如果成功就会继续执行下面的代码,失败的话就会被阻塞在wait函数,和mutex一样。但注意sem并不具备仅允许单线程进入的功能,只要资源足够多,sem一次性可以分发资源给多个线程。

②mySem的封装

封装原理和mutex和cond一模一样,都是封装成管理sem的类,之后通过调用这个对象的方法来进行sem操作

#pragma once#include <semaphore.h> //信号量需要的库,全局或局部都需要手动init
#include <iostream>
using namespace std;namespace mySemModule
{class mySem{public:mySem(int init_value){sem_init(&_sem, 0, init_value); // 第二个参数默认为0,第三个参数就是环形队列中剩余资源数}void P() // 申请信号量,买电影票,sem--{sem_wait(&_sem); // 申请不到就会阻塞,因此一来消费者就会被阻塞// 信号量表示要申请的资源数目,申请成功了就有资源,申请不成功就阻塞// 但它不可代替锁,因为多个线程来申请资源都可能同时申请成功// 只要有资源,信号量就不会阻塞它,这和锁是不一样的,锁是站在线程角度,sem站在资源角度}void V() // 归还信号量,sem++{sem_post(&_sem);}~mySem(){sem_destroy(&_sem);}private:sem_t _sem; // 管理sem的结构体,一个结构体描述一个sem};}

(2)环形队列的初始化

这里需要注意的是生产者的sem一来就是max_capacity,消费者的就是0,初始化的值就是当前可用资源。并且这次我创建了两把锁,因为对于环形队列来说生产者和消费者之间是解耦的,它们都只需要去申请sem即可,这和我们前面的逻辑是不一样的。

在阻塞队列中,生产者和消费者闲暇时间都在阻塞队列中,需要互相唤醒。而在这里sem即代表资源本身,生产者在没有生产空间时就被阻塞在P()中,只要消费者消费了资源,sem++,生产者就能马上去生产。生产者和消费者不用相互提醒,它们只需要对sem进行操作即可。

(3)生产和消费

生产者P()后生产数据,生产完后就++消费者的sem,同理,生产者消费者都仅需对对方的sem进行操作就能做到生产消费的循环。

sem不能充当mutex的加锁功能,因此还需要加锁,中间的环形队列毕竟是临界资源,sem也可能一次性放进来大量线程,否则会导致写入错乱。

生产者和消费者的下标相对位置一定不会变,因为sem一定会维护好的。所以我们没必要去关心环形队列本身。

(4)所有代码

①test.cc

#include "myThread.hpp"
#include "myRingBuffer.hpp"using namespace std;myRingBufferModule::myRingBuffer<string> ring_buffer;void fun_consumer(int num)
{while (1){sleep(1);string ret = ring_buffer.consumer_pop();printf("%d号消费者接收数据:%s\n", num, ret.c_str());}
}void fun_productor(int num)
{while (1){string str(to_string(num));ring_buffer.productor_push(str);printf("%d号生产者已发送数据\n", num);}
}int main()
{// 消费者myThreadModule::myThread<int> thread6(fun_consumer, 6);myThreadModule::myThread<int> thread7(fun_consumer, 7);myThreadModule::myThread<int> thread8(fun_consumer, 8);myThreadModule::myThread<int> thread9(fun_consumer, 9);myThreadModule::myThread<int> thread10(fun_consumer, 10);// 生产者myThreadModule::myThread<int> thread1(fun_productor, 1);myThreadModule::myThread<int> thread2(fun_productor, 2);myThreadModule::myThread<int> thread3(fun_productor, 3);myThreadModule::myThread<int> thread4(fun_productor, 4);myThreadModule::myThread<int> thread5(fun_productor, 5);thread1.start();thread2.start();thread3.start();thread4.start();thread5.start();thread6.start();thread7.start();thread8.start();thread9.start();thread10.start();thread1.join();thread2.join();thread3.join();thread4.join();thread5.join();thread6.join();thread7.join();thread8.join();thread9.join();thread10.join();return 0;
}

②myRingBuffer.hpp

#pragma once#include "mySem.hpp"
#include "myMutex.hpp"
#include <iostream>
#include <vector>
#include <semaphore.h> //信号量需要的库,全局或局部都需要手动init
using namespace std;namespace myRingBufferModule
{static int max_capacity = 30;template <typename T>class myRingBuffer{public:myRingBuffer(): _ring(max_capacity),											// 提前开辟好空间_capacity(max_capacity), _productor_pos(0), _consumer_pos(0), // 容量下标信息_productor_space_sem(max_capacity), _consumer_data_sem(0)		// 用0初始化,调用mySem构造函数{}void productor_push(const T &data) // 生产者生产数据{_productor_space_sem.P(); // 生产者申请sem,空间少一个,数据会多一个myMutexModule::myLockGuard LockGuard(_productor_mutex); // 虽然可能有多个线程都P成功,// 但必须保证单线程访问临界资源,保护临界区_ring[_productor_pos++] = data;_productor_pos %= _capacity; // 保持环状_consumer_data_sem.V(); // 生产消耗空间,数据多了一个,sem++// 这样就维护好了消费者和生产者的同步关系}T consumer_pop() // 消费者从队列中消费{_consumer_data_sem.P(); // 消费者申请sem,空间多一个,数据会少一个// 申请不到就会阻塞,因此一来消费者就会被阻塞//_productor_pos和_consumer_pos由于sem的阻塞特性永远不会再次相等// 所以我们不需要处理pos相同的情况// 先申请信号量,每个线程都把信号量申请到了,只受到锁的限制// 先锁的话就只有一个线程来申请信号量,其它线程被拦在锁外面,之后得线程还要申请锁、信号量,效率要低些// 类似于先买票,再排队进电影院和先排队,在电影院门口买票的区别,显然前者效率高myMutexModule::myLockGuard LockGuard(_consumer_mutex); // 虽然可能有多个线程都P成功,// 但必须保证单线程访问临界资源,保护临界区// 两把锁,并发逻辑,效率更高T ret = _ring[_consumer_pos++];_consumer_pos %= _capacity;_productor_space_sem.V(); // 消费消耗资源,空间多了一个,sem++// 这样就维护好了消费者和生产者的同步关系return ret;}private:vector<T> _ring; // 环形队列,作为超市int _capacity;	 // 缓存最大容量,单位是T的个数int _productor_pos; // 生产者生产数据保存的下标int _consumer_pos;	// 生产者消费数据的下标mySemModule::mySem _productor_space_sem; // 生产者的信号量(空间信号量)mySemModule::mySem _consumer_data_sem;	 // 消费者的信号量(数据信号量)int _wait_consumer;	 // 正在等待的消费者数量int _wait_productor; // 正在等待的生产者数量myMutexModule::myMutex _productor_mutex; // 生产者队列中的锁myMutexModule::myMutex _consumer_mutex;	 // 消费者队列中的锁};}


http://www.ppmy.cn/server/171794.html

相关文章

C/C++ | 每日一练 (4)

&#x1f4a2;欢迎来到张胤尘的技术站 &#x1f4a5;技术如江河&#xff0c;汇聚众志成。代码似星辰&#xff0c;照亮行征程。开源精神长&#xff0c;传承永不忘。携手共前行&#xff0c;未来更辉煌&#x1f4a5; 文章目录 C/C | 每日一练 (4)题目参考答案基础容器序列容器std:…

解锁 Hutool - Captcha:轻松打造图片验证码

各位开发者朋友们&#xff0c;在如今的互联网应用里&#xff0c;图片验证码可是保障系统安全、防止恶意攻击的重要手段。想象一下&#xff0c;如果没有验证码&#xff0c;那些自动化的恶意脚本就可能肆意地对我们的系统发起攻击&#xff0c;比如暴力破解密码、批量注册虚假账号…

【玩转全栈】----Django基本配置和介绍

目录 Django基本介绍&#xff1a; Django基本配置&#xff1a; 安装Django 创建项目 创建app 注册app Django配置路由URL Django创建视图 启动项目 Django基本介绍&#xff1a; Django是一个开源的、基于Python的高级Web框架&#xff0c;旨在以快速、简洁的方式构建高质量的We…

均值与标准差、标准误的关系

一、说明 本文介绍基本的统计学概念&#xff0c;标准差和标准误&#xff0c;此两个概念都与均值期望有一定联系&#xff0c;但它们之间本质上是不同的。 二、均值与标准差的标准误差&#xff1a;概述 标准差 &#xff08;SD&#xff09; 衡量从单个数据值到平均值的变异量或…

突破网络壁垒:实现 Mac SSH 访问 Windows WSL Ubuntu 的最佳实践20250301

突破网络壁垒&#xff1a;实现 Mac SSH 访问 Windows WSL Ubuntu 的最佳实践 背景与痛点 在现代开发环境中&#xff0c;开发者通常会面临不同操作系统之间的协同工作。例如&#xff1a; 主要开发环境位于 Windows 的 WSL Ubuntu 子系统需要从局域网内的 Mac 设备进行远程访问…

Android Binder 用法详解

Binder 是 Android 系统中的一种进程间通信&#xff08;IPC&#xff09;机制&#xff0c;它允许不同进程之间进行高效通信。Binder 在 Android 系统中被广泛使用&#xff0c;例如在 Activity 与 Service 的交互中。 Binder 的基本组成 实现 Binder 通信通常包含以下几个关键部…

【Leetcode 每日一题】131. 分割回文串

问题背景 给你一个字符串 s s s&#xff0c;请你将分割成一些子串&#xff0c;使每个子串都是 回文串 。返回 s s s 所有可能的分割方案。 数据约束 1 ≤ s . l e n g t h ≤ 16 1 \le s.length \le 16 1≤s.length≤16 s s s 仅由小写英文字母组成 解题过程 经典回溯题&…

微服务学习(2):实现SpringAMQP对RabbitMQ的消息收发

目录 SpringAMQP是什么 为什么采用SpringAMQP SpringAMQP应用 准备springBoot工程 实现消息发送 SpringAMQP是什么 Spring AMQP是Spring框架下用于简化AMQP&#xff08;高级消息队列协议&#xff09;应用开发的一套工具集&#xff0c;主要针对RabbitMQ等消息中间件的集成…