【Linux】线程同步和生产消费模型

embedded/2024/9/25 13:25:55/

目录

条件变量

接口

简单使用

生产消费模型

生产消费模型代码


条件变量

上篇博客我们介绍了线程互斥,是通过加锁的方式把共享资源保护起来变成临界资源,同一时刻只允许一个线程访问临界资源

但是不同的线程申请锁的能力是有差别的,就拿抢票的代码举例,如果一个线程申请锁的能力很强,此时票全被一个线程抢走那也是不合理的,为了解决这种不合理的现象,我们就引入了线程同步的概念,简单来说就是线程之间去排队,让所有的线程访问临界资源具有一定的顺序性,这样线程之间就不会有饥饿问题了

那么如何实现线程同步呢?我们是引入条件变量,我们可以简单的理解为条件变量就是维护着一个等待队列(wait)和一个通知机制(signal),这个通知机制就是为了唤醒等待队列中的线程

接口

条件变量跟互斥锁一样,就是一个某种类型创建的变量,我们来看看创建和使用条件变量的一些接口

man pthread_cond_init

这些接口和互斥锁的接口可以说是大同小异,同样还是分定义局部的还是全局的条件变量,它们通过不同方法定义

man pthread_cond_wait

这个就是说让执行此接口的线程去条件变量下等待,直到有人唤醒

man pthread_cond_signal

这个就是唤醒等待的线程,broadcast是广播的意思,就是唤醒所有在这个条件变量下等待的线程,signal就是只唤醒一个等待的线程

简单使用

上面我们已经说完了接口,下面我们就来简单用一下,写一个简单的测试代码,看看能不能实现线程间同步

我们打算创建一个主线程,创建一批新线程,新线程去条件变量下等待,然后主线程负责发信号给新线程

#include <iostream>
#include <vector>
#include <string>
#include <memory>
#include <unistd.h>
#include <pthread.h>pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
void *Mastercode(void *args)
{char *name = static_cast<char *>(args);while (1){usleep(100000);pthread_cond_signal(&gcond);std::cout << name << " 唤醒了一个线程 " << std::endl;}delete[] name;
}
void *Slavercode(void *args)
{char *name = static_cast<char *>(args);while (1){pthread_mutex_lock(&gmutex);pthread_cond_wait(&gcond, &gmutex);std::cout << name << " 被唤醒 " << std::endl;pthread_mutex_unlock(&gmutex);}delete[] name;
}
void StartMaster(std::vector<pthread_t> *tids)
{pthread_t tid;int n = pthread_create(&tid, nullptr, Mastercode, (void *)"Master");if (n == 0){std::cout << "create master success" << std::endl;tids->push_back(tid);}
}
void StartSlavers(std::vector<pthread_t> *tids, int num)
{for (int i = 1; i <= num; i++){pthread_t tid;char *name = new char[64];snprintf(name, 64, "slaver-%d", i);int n = pthread_create(&tid, nullptr, Slavercode, (void *)name);if (n == 0){std::cout << "create " << name << std::endl;tids->push_back(tid);}}
}
void WaitAll(std::vector<pthread_t> &tids)
{for (auto &e : tids){pthread_join(e, nullptr);}
}
int main()
{std::vector<pthread_t> tids;StartMaster(&tids);StartSlavers(&tids, 5);WaitAll(tids);return 0;
}

我们可以看到结果就是主线程唤醒的新线程是有顺序的,就是因为新线程是在等待队列中进行排队等待的

生产消费模型

什么是生产消费模型呢?顾名思义,有生产者,有消费者,生产者生产数据给消费者。

简单来说,就是一种数据传输,并且是一种并发传递数据,因为生产者在生产任务(数据)的时候,消费者可以从缓冲区中拿任务;而我们之前比如传参或进程间通信也是一种数据的传输,只不过这时串行的传输

我们简单举一个超市的例子,超市有它不同的供应商(生产者),还有不同的顾客(消费者)

超市内的资源就是共享资源,由于生产者们之间,消费者们之间,生产者和消费者之间不能同时访问超市这种资源,也就是说它们之间是互斥的(为了方便起见这么理解,实际情况复杂更多,并且实际的生产消费模型也就是较为简单的这种情况),所以我们需要对超市进行保护,让超市中的资源变成临界资源。并且供应商们和顾客们都是线程。

超市中的资源是临界资源,那超市是什么呢?超市其实就是保存数据的内存空间,并且为了保存数据更加方便,通常用一种特定的数据结构对象来作为数据交易(生产者把数据交给消费者)的场所。

生产者和消费者之间除了互斥还有同步关系,如果超市中的资源满了,那么生产者就不要生产了,同样,没有资源了,消费者就要等待。这体现的就是一种同步关系

所以,我们要记住生产消费模型,就要记住三种关系(生生,生消,消消),两种角色,一个交易场所即可

为什么使用生产消费模型呢?它有什么好处呢?

1.它可以提供比较好的并发度,供应商生产产品的时候,消费者可以去超市买东西;消费者使用产品时,供应商可以往超市中放东西。

2.生产和消费数据,可以进行解耦,实际就是生产任务由一个线程管,执行任务由一个线程管,它们之间关系很小

3.支持忙闲不均,这个主要是通过引入缓冲区(超市)来实现的,如果没有缓冲区,生产者一会忙,一会闲,消费者就要被迫的这样,有了缓冲区消费者就可以平稳的执行任务,其实就是平衡二者之间处理能力的差异,减小等待时间,提高整体的效率

生产消费模型代码

我们需要考虑用什么来充当临时存放数据的缓冲区,我们可以自定义一个阻塞队列,它的特点就是队列中放满了就等消费者消费,如果没有就等生产者生产,并且最好生产者生产了要通知消费者一下,如果消费者消费了最好通知生产者去生产。这个其实不就用到了条件变量中的等待和通知机制嘛,我们下面的代码就是先不用我们之前封装的pthread库,我们直接用系统调用:

//BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>template <class T>
class BlockQueue
{bool isfull(){return _q.size() == _cap;}bool isempty(){return _q.empty();}public:BlockQueue(int cap = 6): _cap(cap){_producer_wait_num = 0;_consumer_wait_num = 0;pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond_consumer, nullptr);pthread_cond_init(&_cond_producer, nullptr);}void Enqueue(const T &data){pthread_mutex_lock(&_mutex);while (isfull()){_producer_wait_num++;pthread_cond_wait(&_cond_producer, &_mutex);_producer_wait_num--;}_q.push(data);if (_consumer_wait_num > 0)pthread_cond_signal(&_cond_consumer);pthread_mutex_unlock(&_mutex);}void Pop(T *out){pthread_mutex_lock(&_mutex);while (isempty()){_consumer_wait_num++;pthread_cond_wait(&_cond_consumer, &_mutex);_consumer_wait_num--;}*out = _q.front();_q.pop();if (_producer_wait_num > 0)pthread_cond_signal(&_cond_producer);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond_consumer);pthread_cond_destroy(&_cond_producer);}private:std::queue<T> _q;int _cap;pthread_mutex_t _mutex;pthread_cond_t _cond_producer;pthread_cond_t _cond_consumer;int _producer_wait_num;int _consumer_wait_num;
};//Task.hpp
#pragma once
#include<iostream>
#include <string>
#include<functional>
// using Task=std::function<void()>;// void printhello()
// {
//     std::cout<<"hello world"<<std::endl;
// }class Task
{
public:Task() {}Task(int a, int b) : _a(a), _b(b){}std::string result_to_string(){return std::to_string(_a) + " + " + std::to_string(_b) + " = " + std::to_string(_a+_b);}std::string question_to_string(){return std::to_string(_a) + " + " + std::to_string(_b) + " = ?";}private:int _a;int _b;
};//Main.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <vector>
#include <unistd.h>
#include <pthread.h>
#include <ctime>
using std::cout;
using std::endl;
using typeinbp = Task;
using typeofbq = BlockQueue<typeinbp>;struct ThreadData
{ThreadData(const std::string &str = "no_name", typeofbq *pbq = nullptr): _name(str), _pbq(pbq) {}std::string _name;typeofbq *_pbq;
};void *producercode(void *args)
{ThreadData *ptd = static_cast<ThreadData *>(args);while (1){int a = rand() % 100;usleep(1234);int b = rand() % 100;typeinbp tmp(a, b);// ptd->_pbq->Enqueue(printhello);ptd->_pbq->Enqueue(tmp);cout << ptd->_name << " product a ques " << tmp.question_to_string() << endl;sleep(1);}delete ptd;return nullptr;
}
void *consumercode(void *args)
{ThreadData *ptd = static_cast<ThreadData *>(args);while (1){typeinbp tmp;ptd->_pbq->Pop(&tmp);// tmp();cout << ptd->_name << " get a message: " << tmp.result_to_string() << endl;}delete ptd;return nullptr;
}void startproducer(std::vector<pthread_t> *ppids, int num, typeofbq *pbq)
{for (int i = 1; i <= num; i++){std::string name = "producer_" + std::to_string(i);ThreadData *ptd = new ThreadData(name, pbq);pthread_t pid;pthread_create(&pid, nullptr, producercode, (void *)ptd);ppids->push_back(pid);}
}
void startconsumer(std::vector<pthread_t> *ppids, int num, typeofbq *pbq)
{for (int i = 1; i <= num; i++){std::string name = "consumer_" + std::to_string(i);ThreadData *ptd = new ThreadData(name, pbq);pthread_t pid;pthread_create(&pid, nullptr, consumercode, (void *)ptd);ppids->push_back(pid);}
}
void waitall(std::vector<pthread_t> &pids)
{for (auto &pid : pids){pthread_join(pid, nullptr);}
}
int main()
{srand(time(nullptr));std::vector<pthread_t> pids;typeofbq bq(5);startproducer(&pids, 3, &bq);startconsumer(&pids, 4, &bq);waitall(pids);return 0;
}

这段代码有几个关键点需要解释:

signal放unlock之前之后都可以,signal放之前就是先通知再解锁,一解锁别的线程就可以得到锁了;signal放之后就是先解锁再通知,这样也是可以的

这里要用while而不能用if,这其实就跟条件变量的特点有关,一个线程pthread_cond_wait之后就会释放掉锁,因为它不可能持有锁去等待,那样就没人唤醒它了。等到wait被唤醒之后此线程需要重新去竞争锁,万一被别人争到了,等此线程争到了锁后条件又不满足了,此时用if的话,它还是向下走,此时就会出错,所以要用while要重新判断条件

设置这两个变量就是不要生产了或消费了就去唤醒,万一根本就没人等待呢?所以记录一下等待的人数,如果有人等待就去唤醒,没人等待就不唤醒了

我们给阻塞队列中可以放任何东西,可以放一个对象,对象中存着任务,也可以放一个函数对象,让“消费者”去执行这个函数


http://www.ppmy.cn/embedded/94259.html

相关文章

用exceljs和file-saver插件实现纯前端表格导出Excel(支持样式配置,多级表头)

exceljs在Jquery&#xff08;HTML&#xff09;和vue项目中实现导出功能 前言Jquery&#xff08;HTML&#xff09;中实现导出第一步&#xff0c;先在项目本地中导入exceljs和file-saver包第二步&#xff0c;封装导出Excel方法&#xff08;可直接复制粘贴使用&#xff09;第三步&…

整理编程学习笔记的心得

知乎上曾经有过类似的讨论&#xff0c;程序员一般是怎么记自己的编程笔记的&#xff1f;用什么软件记录的&#xff1f;&#xff0c;这是我的答复。 现在重新编辑&#xff0c;调整一些内容&#xff0c;重新发布。 工作上的事情很繁杂&#xff0c;小事情很多&#xff0c;工作时间…

微调LLama 3.1——七月论文审稿GPT第5.5版:拿早期paper-review数据集微调LLama 3.1

前言 对于llama3&#xff0c;我们之前已经做了针对llama3 早7数据微调后的测评 去pk llama2的早7数据微调后&#xff0c;推理测试集中的早期paper&#xff1a;出来7方面review去pk gpt4推理测试集中的早期paper&#xff1a;7方面reviewground truth是早期paper的7方面人工rev…

Docker常见命令

Docker常见命令 以下是一些常见的 Docker 命令及其描述&#xff1a; 命令描述docker --version显示 Docker 的版本信息。docker pull <image>从 Docker 仓库中拉取镜像。docker build -t <name>:<tag> <path>从 Dockerfile 构建镜像并为其指定标签。…

Discourse 将主题打印成 PDF

Discourse 允许用户通过使用 打印主题&#xff08;Print topic&#xff09; 快捷键来生成 PDF 文件。这个快捷键针对操作系统的不同&#xff0c;可以通过键盘上的 ? 来进行查看。 大部分操作系统: ctrlpMacOS: ⌘p 使用快捷键后会打开一个新的浏览器窗口&#xff0c;在这个新…

Arduino学习笔记3——LED闪烁

一、Blink示例 点击“文件”——“示例”——“Basics”——“Blink”打开Blink示例程序。Blink有眨眼、闪亮之意&#xff0c;程序实现的功能也与之类似&#xff0c;可以让板子上的L灯以1秒为间隔点亮熄灭。 二、程序讲解 /*BlinkTurns an LED on for one second, then off …

怎样才算精通 Excel?

最强AI视频生成&#xff1a;小说文案智能分镜智能识别角色和场景批量Ai绘图自动配音添加音乐一键合成视频百万播放量https://aitools.jurilu.com/ 高赞回答很系统&#xff0c;但普通人这么学&#xff0c;没等精通先学废了&#xff01; 4年前&#xff0c;我为了学数据分析&#…

【Vue3】 navigator.mediaDevices 实现翻转摄像头

const openFront ref(true);//true开启前置摄像头&#xff0c;false开启后置摄像头 const video ref(null); //开启摄像头function startCamera(){loadingVideo.value true;// 检测浏览器是否支持mediaDevicesif (navigator.mediaDevices) {let params {audio: false,video…