color:#494949">当一个线程互斥地访问某个变量时c;它可能发现在其它线程改变状态之前c;它什么也做不了。
color:#494949">例如:一个线程访问队列时c;发现队列为空c;它只能等待c;直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
color:#fe2c24">同步:在保证数据安全的前提下c;让线程能够按照某种特定的顺序访问临界资源c;从而有效避免饥饿问题。
color:#494949">竞态条件:因为时序问题c;而导致程序异常c;我们称之为竞态条件。在线程场景下c;这种问题也不难理解。
color:#0d0016">int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict
color:#0d0016">attr);
color:#0d0016">参数:
color:#0d0016">cond:要初始化的条件变量
color:#0d0016">attr:NULL
color:#0d0016">int pthread_cond_destroy(pthread_cond_t *cond);
color:#0d0016">int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
color:#0d0016">参数:
color:#0d0016">cond:要在这个条件变量上等待
color:#0d0016">mutex:互斥量c;后面详细解释
color:#0d0016">int pthread_cond_broadcast(pthread_cond_t *cond);//唤醒一批线程。
color:#0d0016">int pthread_cond_signal(pthread_cond_t *cond);//唤醒一个线程。
color:#494949">示例代码:
color:#4da8ee">makefile:
<code class="language-cpp">testCond:testCond.ccg++ -o $@ $^ -std=c++11 -lpthread .PHONY:clean clean:rm -f testCondcode>
<code class="language-cpp">#include <iostream> #include <pthread.h> #include <unistd.h> #include <string>int tickets = 1000; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *start_routine(void *args) {std::string name = static_cast<const char *>(args);while (true){pthread_mutex_lock(&mutex);pthread_cond_wait(&cond, &mutex);// 判断暂时省略std::cout << name << "->" << tickets << std::endl;tickets--;pthread_mutex_unlock(&mutex);} }int main() {// 通过条件变量控制线程的执行pthread_t t[4];for (int i = 0; i < 4; i++){char *name = new char[64];snprintf(name, 64, "thread %d", i + 1);pthread_create(t + i, nullptr, start_routine, (void *)name);}while (true){sleep(1);//pthread_cond_broadcast(&cond); // 唤醒一批线程pthread_cond_signal(&cond);//唤醒一个线程std::cout << "main thread wakeup one thread... " << std::endl;}for (const auto &i : t){pthread_join(i, nullptr);}return 0; }code>
pthread_cond_signal:唤醒一个线程。 pthread_cond_broadcast:唤醒一批线程。
c="https://i-blog.csdnimg.cn/direct/b7452713278f40929fb84d56d93b4c9a.png" width="295" />
c="https://i-blog.csdnimg.cn/direct/553857376e3c449995c707df5d7e84b0.png" width="349" />
这些线程会持续等待一个条件变量的信号。主线程每隔 1 秒就会发送一个条件变量信号c;唤醒其中一个等待的线程。被唤醒的线程会输出当前剩余的票数并将票数减 1。
可以看到c;由于条件变量的存在c;输出结果变得有顺序性。
1. 保证条件检查和等待操作的原子性
在多线程环境中c;线程需要先检查某个条件是否满足c;如果不满足则进入等待状态。这个检查条件和进入等待的操作必须是原子的c;否则可能会出现竞态条件。
例如c;在生产者 - 消费者模型中c;消费者线程需要检查缓冲区是否为空c;如果为空则等待。假设没有互斥量保护c;可能会出现以下情况:
color:#494949">使用互斥量可以保证条件检查和进入等待状态这两个操作的原子性。color:#fe2c24">当线程调用pthread_cond_wait时c;它会先释放互斥量c;然后进入等待状态;当被唤醒时c;又会重新获取互斥量color:#494949">。这样就避免了上述竞态条件的发生。
2. 保护共享资源和条件变量
color:#0d0016">条件变量通常与共享资源相关联c;线程在检查条件和修改共享资源时需要保证线程安全。互斥量可以用来保护这些共享资源c;确保同一时间只有一个线程能够访问和修改它们。
color:#0d0016">在调用pthread_cond_wait之前c;线程需要先获取互斥量c;这样可以保证在检查条件和进入等待状态时c;其他线程不会同时修改共享资源和条件变量。当线程被唤醒后c;再次获取互斥量c;又可以保证在处理共享资源时的线程安全color:#494949">。
color:#0d0016">生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯c;而通过阻塞队列来进行通讯c;所以生产者生产完数据之后不用等待消费者处理c;直接扔给阻塞队列c;消费者不找生产者要数据c;而是直接从阻塞队列里取c;阻塞队列就相当于一个缓冲区c;平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
color:#494949">“321”原则(便于记忆)
c="https://i-blog.csdnimg.cn/direct/0b7a284d18b24845b90c5c9063fdb8c8.png" width="1108" />
color:#333333">在多线程编程中阻塞队列color:#333333">(Blocking Queue)color:#333333">是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于c;当队列为空时c;从队列获取元素的操作将会被阻塞c;直到队列中被放入了元素;当队列满时c;往队列里存放元素的操作也会被阻塞c;直到有元素被从队列中取出(color:#333333">以上的操作都是基于不同的线程来说的c;线程在对阻塞队列进程操作时会被阻塞)。
c="https://i-blog.csdnimg.cn/direct/c6dd26ac9cfa4f58a343f2d36d409b3c.png" width="836" />
下面我们以单生产者c;单消费者为例:
color:#4da8ee">makefile:
<code class="language-cpp">MainCp:MainCp.ccg++ -o $@ $^ -std=c++11 -lpthread .PHONY:clean clean:rm -f MainCpcode>
color:#4da8ee">BlockQueue.hpp:
<code class="language-cpp">#include <iostream> #include <queue> #include <pthread.h>const int gmaxcap = 5;template <class T> class BlockQueue { public:BlockQueue(const int &maxcap = gmaxcap) : _maxcap(maxcap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}void push(const T &in) // 输入型参数c;const &;输出型参数 *;输入输出型参数 &;{pthread_mutex_lock(&_mutex);// 1.判断while (is_full())// if(is_full())//细节2:充当判断的语法必须是whilec;不能是ifc;因为在被唤醒时c;有可能存在异常或伪唤醒。{// 细节1:pthread_cond_wait是在临界区啊。// pthread_cond_wait的第二个参数c;必须是我们正在使用的互斥锁。// a.pthread_cond_wait:该函数调用的时候c;会以原子性的方式c;将锁释放c;并将自己挂起。// b.pthread_cond_wait: 该函数在被唤醒返回的时候c;会自动的重新获取你传入的锁。pthread_cond_wait(&_pcond, &_mutex); // 因为生产条件不满足c;无法生产c;生产者进行等待。}// 2.走到这里c;一定是没有满的。_q.push(in);// 3.一定能保证阻塞队列里有数据。// 细节3:pthread_cond_signal:可以放在临界区内部c;也可以放在外部。pthread_cond_signal(&_ccond); // 唤醒消费者消费。这里可以有一定的策略。pthread_mutex_unlock(&_mutex);// pthread_cond_siganl(&_ccond);}void pop(T *out){pthread_mutex_lock(&_mutex);// 1.判断while (is_empty())// if(is_empty()){pthread_cond_wait(&_ccond, &_mutex); // 因为消费条件不满足c;无法消费c;消费者进行等待。}// 2.走到这里c;一定是不为空的。*out = _q.front();_q.pop();// 3.一定能保证阻塞队列里至少有一个空位置。pthread_cond_signal(&_pcond); // 唤醒生产者生产。这里可以有一定的策略。pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}private:bool is_empty(){return _q.empty();}bool is_full(){return _q.size() == _maxcap;}private:std::queue<T> _q;int _maxcap; // 队列中元素的上限pthread_mutex_t _mutex;pthread_cond_t _pcond; // 生产者对应的条件变量pthread_cond_t _ccond; // 消费者对应的条件变量 };code>
<code class="language-cpp">#include "BlockQueue.hpp" #include <ctime> #include <sys/types.h> #include <unistd.h>void *consumer(void *bq_) {BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);while (true){// 生产活动int data;bq->pop(&data);std::cout << "消费数据:" << data << std::endl;}return nullptr; } void *productor(void *bq_) {BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);while (true){// 生产活动int data = rand() % 10 + 1; // 这里我们先用一个随机数构建一个数据。bq->push(data);std::cout << "生产数据:" << data << std::endl;}return nullptr; }int main() {srand((unsigned long)time(nullptr) ^ getpid());BlockQueue<int> *bq = new BlockQueue<int>();pthread_t c, p;// 生产消费要看到同一份资源c;也就是阻塞队列pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0; }code>
c="https://i-blog.csdnimg.cn/direct/3a2180a4aba54bd3a26a806187e84efc.png" width="165" />
让生产者每隔一秒生产一个c;消费者一直消费。那么最终的预期结果就是生产一个c;消费一个;生产一个c;消费一个。
<code class="language-cpp">void *consumer(void *bq_) {BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);while (true){// 生产活动int data;bq->pop(&data);std::cout << "消费数据:" << data << std::endl;}return nullptr; } void *productor(void *bq_) {BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);while (true){// 生产活动int data = rand() % 10 + 1; // 这里我们先用一个随机数构建一个数据。bq->push(data);std::cout << "生产数据:" << data << std::endl;sleep(1);}return nullptr; }code>
c="https://i-blog.csdnimg.cn/direct/b04b2c478e724c5a8d74f10357c26127.png" width="156" />
让消费者每隔一秒消费一个c;生产者一直生产。那么最终的预期结果就是消费一个c;生产一个;消费一个c;生产一个。
<code class="language-cpp">void *consumer(void *bq_) {BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);while (true){// 生产活动int data;bq->pop(&data);std::cout << "消费数据:" << data << std::endl;sleep(1);}return nullptr; } void *productor(void *bq_) {BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(bq_);while (true){// 生产活动int data = rand() % 10 + 1; // 这里我们先用一个随机数构建一个数据。bq->push(data);std::cout << "生产数据:" << data << std::endl;}return nullptr; } code>
c="https://i-blog.csdnimg.cn/direct/cbe96e60b7ad448bbd0a33d8e27895d5.png" width="138" />
这就是基于阻塞队列的生产消费模型。
上面我们阻塞队列里放的就是一个整形数据c;我们可以再完善一下。我们是可以直接在阻塞队列中放任务的。让生产者给消费者派发任务。
color:#4da8ee">makefile:
<code class="language-cpp">MainCp:MainCp.ccg++ -o $@ $^ -std=c++11 -lpthread .PHONY:clean clean:rm -f MainCpcode>
color:#4da8ee">BlockQueue.hpp:
<code class="language-cpp">#include <iostream> #include <queue> #include <pthread.h>const int gmaxcap = 5;template <class T> class BlockQueue { public:BlockQueue(const int &maxcap = gmaxcap) : _maxcap(maxcap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}void push(const T &in) // 输入型参数c;const &;输出型参数 *;输入输出型参数 &;{pthread_mutex_lock(&_mutex);// 1.判断while (is_full())// if(is_full())//细节2:充当判断的语法必须是whilec;不能是ifc;因为在被唤醒时c;有可能存在异常或伪唤醒。eg:一个生产者十个消费者c;broadcast唤醒。{// 细节1:pthread_cond_wait是在临界区啊。// pthread_cond_wait的第二个参数c;必须是我们正在使用的互斥锁。// a.pthread_cond_wait:该函数调用的时候c;会以原子性的方式c;将锁释放c;并将自己挂起。// b.pthread_cond_wait: 该函数在被唤醒返回的时候c;会自动的重新获取你传入的锁。pthread_cond_wait(&_pcond, &_mutex); // 因为生产条件不满足c;无法生产c;生产者进行等待。}// 2.走到这里c;一定是没有满的。_q.push(in);// 3.一定能保证阻塞队列里有数据。// 细节3:pthread_cond_signal:可以放在临界区内部c;也可以放在外部。pthread_cond_signal(&_ccond); // 唤醒消费者消费。这里可以有一定的策略。pthread_mutex_unlock(&_mutex);// pthread_cond_siganl(&_ccond);}void pop(T *out){pthread_mutex_lock(&_mutex);// 1.判断while (is_empty())// if(is_empty()){pthread_cond_wait(&_ccond, &_mutex); // 因为消费条件不满足c;无法消费c;消费者进行等待。}// 2.走到这里c;一定是不为空的。*out = _q.front();_q.pop();// 3.一定能保证阻塞队列里至少有一个空位置。pthread_cond_signal(&_pcond); // 唤醒生产者生产。这里可以有一定的策略。pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}private:bool is_empty(){return _q.empty();}bool is_full(){return _q.size() == _maxcap;}private:std::queue<T> _q;int _maxcap; // 队列中元素的上限pthread_mutex_t _mutex;pthread_cond_t _pcond; // 生产者对应的条件变量pthread_cond_t _ccond; // 消费者对应的条件变量 };code>
color:#4da8ee">Task.hpp:
<code class="language-cpp">#pragma once#include <iostream> #include <cstdio> #include <functional>class Task {using func_t = std::function<int(int, int, char)>;// typedef std::function<int(int,int,char)>func_t; public:Task(){}Task(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callback(func){}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);return buffer;}std::string toTaskString(){char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);return buffer;}private:int _x;int _y;char _op;func_t _callback; };code>
<code class="language-cpp">#include "BlockQueue.hpp" #include "Task.hpp" #include <ctime> #include <sys/types.h> #include <unistd.h>const std::string oper = "+-*/%";int mymath(int x, int y, char op) {int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero error!" << std::endl;result = -1;}else{result = x / y;}}break;case '%':{if (y == 0){std::cerr << "mod zero error!" << std::endl;result = -1;}else{result = x % y;}}break;}return result; }void *consumer(void *bq_) {BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(bq_);while (true){// 消费活动Task t;bq->pop(&t);std::cout << "消费任务:" << t() << std::endl;//sleep(1);}return nullptr; } void *productor(void *bq_) {BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(bq_);while (true){// 生产活动int x = rand() % 100 + 1; // 这里我们先用一个随机数构建一个数据。int y = rand() % 10;int operCode = rand() % oper.size();Task t(x, y, oper[operCode], mymath);bq->push(t);std::cout << "生产任务:" << t.toTaskString() << std::endl;sleep(1);}return nullptr; }int main() {srand((unsigned long)time(nullptr) ^ getpid());BlockQueue<Task> *bq = new BlockQueue<Task>();pthread_t c, p;// 生产消费要看到同一份资源c;也就是阻塞队列pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0; }code>
让生产者sleep1秒c;看到的结果就是生产一个任务c;消费一个任务。
c="https://i-blog.csdnimg.cn/direct/69c8901ff74a41d9946da97c3fd6f49e.png" width="416" />
让消费者sleep1秒c;看到的结果就是消费一个任务c;生产一个任务。
c="https://i-blog.csdnimg.cn/direct/08449e362ec942c9a7ee76cbea7a9a5b.png" width="423" />
这样c;我们就完成了一个线程给另一个线程派发任务:生产者给消费者派发任务。
<code class="language-cpp">// //... //... int main() {srand((unsigned long)time(nullptr) ^ getpid());BlockQueue<Task> *bq = new BlockQueue<Task>();pthread_t c, c1, p, p1, p2;// 生产消费要看到同一份资源c;也就是阻塞队列pthread_create(&p, nullptr, productor, bq);pthread_create(&p1, nullptr, productor, bq);pthread_create(&p2, nullptr, productor, bq);pthread_create(&c1, nullptr, consumer, bq);pthread_create(&c, nullptr, consumer, bq);pthread_join(c, nullptr);pthread_join(c1, nullptr);pthread_join(p, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);delete bq;return 0; }code>
c="https://i-blog.csdnimg.cn/direct/80d885983c144b9e9ffee0d3e7264655.png" width="249" />
可以看到是可以的。无论外部的线程再多c;真正进入到阻塞队列里生产或消费的线程永远只有一个。
生产者要向blockqueue里放任务c;消费者要向blockqueue里取任务。由于有锁的存在c;这个(生产过程和消费过程)过程是串行的c;也就是blockqueue里任何时刻只有一个执行流。那么:
color:#956fe7">1、对于生产者而言c;它获取数据构建任务c;是需要花时间的。
color:#956fe7">2、对于消费者而言c;它拿到任务以后c;是需要花时间处理这个任务的!
color:#fe2c24">所以c;高效并不体现在生产者把任务放进阻塞队列里高效c;或者消费者从阻塞队列里拿任务高效。而是c;体现在多个线程可以同时并发的构建或处理任务。
对于单生产单消费c;它的并发性体现在c;消费者从阻塞队列里拿任务和生产者构建任务c;或者生产者往阻塞队列里放任务和消费者处理任务的过程是并发的。
color:#fe2c24">总结:生产消费模型高效体现在c;可以在生产前c;和消费之后c;让线程并行执行。
color:#fe2c24">创建多线程生产和消费的意义:
多个线程可以并发生产c;并发消费。
以上就是线程同步和基于阻塞队列的生产者消费者模型。