文章目录
- 1. 进程线程间的互斥相关背景概念
- 2. 互斥量mutex
- 3. 互斥量mutex接口
- 3.1 初始化互斥量
- 3.2 销毁互斥量
- 3.3 互斥量加锁和解锁
- 3.4 互斥量的封装
- 4. 线程同步
- 同步概念与竞态条件
- 条件变量
- 5. 条件变量函数
- 5.1 初始化
- 5.2 销毁
- 5.3 等待条件满足
- 5.4 唤醒等待
- 5.5 条件变量封装
- 5.6 条件变量使用规范
- 6. 生产者消费者模型
- 基于BlockingQueue的生产者消费者模型
- 7. 结语
1. 进程线程间的互斥相关背景概念
- 临界资源:多线程执行流共享的资源就叫做临界资源;
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区;
- 互斥:任何时刻,互斥保证有且只有⼀个执⾏流进⼊临界区,访问临界资源,通常对临界资源起保护作用;
- 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。
2. 互斥量mutex
有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。然而多个线程并发的操作共享变量,会带来⼀些问题。
共享变量被保护起来后就可以称之为临界资源
例如如下代码:
// 操作共享变量会有问题的售票系统代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
int ticket = 100;void *route(void *arg)
{char *id = (char*)arg;while ( 1 ) {if ( ticket > 0 ) {usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;} else {break;}}
}
int main()
{pthread_t t1, t2, t3, t4;pthread_create(&t1, NULL, route, "thread 1");pthread_create(&t2, NULL, route, "thread 2");pthread_create(&t3, NULL, route, "thread 3");pthread_create(&t4, NULL, route, "thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);
}
会出现以下可能的结果:
thread 4 sells ticket:100
...
thread 4 sells ticket:1
thread 2 sells ticket:0
thread 1 sells ticket:-1
thread 3 sells ticket:-2
为什么会出现票数为负数的情况?
- 这是因为if 语句判断条件为真以后,代码可以并发的切换到其他线程,其他线程可能将最后一张票抢完后又切换回来,此时剩余票数为0,但是当前线程已经在if判断语句内了,还会进行票数-1,此时剩余票数就会减到负数;
- 此外usleep 这个模拟漫长业务的过程,在这个业务过程中,可能有很多个线程会进⼊该代码段…
- 综上所述,最根本在于
--ticket
操作本⾝就不是⼀个原子操作
要解决以上问题,需要做到三点:
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许⼀个线程进入该临界区。
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到这三点,本质上就是需要⼀把锁。Linux上提供的这把锁叫互斥量。
如下图所示:
3. 互斥量mutex接口
3.1 初始化互斥量
初始化互斥量有两种方法:
- 方法1,静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
- 方法2,动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
参数:
- mutex:要初始化的互斥量
- attr:NULL
3.2 销毁互斥量
销毁互斥量需要注意:
- 使用
PTHREAD_ MUTEX_ INITIALIZER
初始化的互斥量不需要销毁 - 不要销毁⼀个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
3.3 互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
- 返回值:成功返回0,失败返回错误号
调⽤ pthread_ lock
时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么
pthread_ lock
调用会陷⼊阻塞(执行流被挂起),等待互斥量解锁。
通过对互斥量的学习,我们就可以改进上面的售票系统:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sched.h>
int ticket = 100;
pthread_mutex_t mutex;
void *route(void *arg)
{char *id = (char*)arg;while ( 1 ) {pthread_mutex_lock(&mutex);if ( ticket > 0 ) {usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;pthread_mutex_unlock(&mutex);} else {pthread_mutex_unlock(&mutex);break;}}
}int main( void )
{pthread_t t1, t2, t3, t4;pthread_mutex_init(&mutex, NULL);pthread_create(&t1, NULL, route, "thread 1");pthread_create(&t2, NULL, route, "thread 2");pthread_create(&t3, NULL, route, "thread 3");pthread_create(&t4, NULL, route, "thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);pthread_mutex_destroy(&mutex);
}
通过对公共资源的加锁保护我们就可以保证访问资源的原子性,这样就不会出现之前的错误了。
3.4 互斥量的封装
通过上述互斥量mutex接口的学习,我们就可以基于上述函数对互斥量进行封装:
#pragma once
#include <iostream>
#include <string>
#include <pthread.h>namespace MutexModule
{class Mutex{public:// 删除不要的拷⻉和赋值Mutex(const Mutex &) = delete;const Mutex &operator=(const Mutex &) = delete;Mutex(){// 初始化锁int n = pthread_mutex_init(&_mutex, nullptr);if (n != 0)std::perror("初始化失败...");}void Lock(){int n = pthread_mutex_lock(&_mutex);if (n != 0)std::perror("Lock失败...");}void Unlock(){int n = pthread_mutex_unlock(&_mutex);if (n != 0)std::perror("Unlock失败...");}pthread_mutex_t *MutexPtr() // 获取原始指针{return &_mutex;}~Mutex(){int n = pthread_mutex_destroy(&_mutex);if (n != 0)std::perror("_mutex销毁失败...");}private:pthread_mutex_t _mutex;};// 采⽤RAII⻛格,进⾏锁管理class LockGuard{public:LockGuard(Mutex &mutex) : _mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;};
}
封装好互斥量后我们再采用RAII智能指针的风格,对锁进行管理。
- 抢票的代码就可以更新成为:
// 抢票的代码就可以更新成为
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include "Lock.hpp"
using namespace MutexModule;
int ticket = 1000;Mutex mutex;
void *route(void *arg)
{char *id = (char *)arg;while (1){LockGuard lockguard(mutex); // 使⽤RAII⻛格的锁if (ticket > 0){usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;}else{break;}}return nullptr;
}
int main(void)
{pthread_t t1, t2, t3, t4;pthread_create(&t1, NULL, route, (void *)"thread 1");pthread_create(&t2, NULL, route, (void *)"thread 2");pthread_create(&t3, NULL, route, (void *)"thread 3");pthread_create(&t4, NULL, route, (void *)"thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);
}
4. 线程同步
同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从⽽有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
条件变量
- 当⼀个线程互斥地访问某个变量时,发现在其它线程改变状态之前,它什么也做不了。例如⼀个线程访问队列时,发现队列为空,它只能等待,只到其它线程将⼀个节点添加到队列中。这种情况就需要用到条件变量。
多个线程竞争临界资源时,未争夺到的线程需要在一个地方按顺序进行等待,竞争到的线程使用完临界资源释放锁后如果需要再次使用,也需要在该地方进行等待,这就是条件变量。
5. 条件变量函数
5.1 初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
参数:
- cond:要初始化的条件变量
- attr:NULL
5.2 销毁
int pthread_cond_destroy(pthread_cond_t *cond)
5.3 等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
- cond:要在这个条件变量上等待
- mutex:互斥量
✨为什么 pthread_cond_wait 需要互斥量?
- 条件等待是线程间同步的一种手段,如果只有⼀个线程,条件不满足,⼀直等下去都不会满足,所以必须要有⼀个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
- 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以⼀定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。
简单来说就是线程在条件变量下等待时一定在临界资源内,当唤醒时一定需要重新持有锁,这样才能保护公共资源。
5.4 唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
测试代码如下:
#include <iostream>
#include <cstdio>
#include <string>
#include <pthread.h>
#include <unistd.h>pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *active(void *arg)
{std::string name = static_cast<const char *>(arg);while (true){pthread_mutex_lock(&mutex);// 没有对于资源是否就绪的判定pthread_cond_wait(&cond, &mutex); // mutex??printf("%s is active!\n", name.c_str());pthread_mutex_unlock(&mutex);}
}int main()
{pthread_t tid1, tid2, tid3;pthread_create(&tid1, nullptr, active, (void *)"thread-1");pthread_create(&tid2, nullptr, active, (void *)"thread-2");pthread_create(&tid3, nullptr, active, (void *)"thread-3");sleep(1);printf("Main thread ctrl begin...\n");while (true){printf("main wakeup thread...\n");pthread_cond_signal(&cond);//唤醒单个线程//pthread_cond_broadcast(&cond);//唤醒所有线程sleep(1);}pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);
}
- 使用条件变量一次唤醒单个线程:
- 使用条件变量一次唤醒所有线程:
5.5 条件变量封装
#pragma once#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"namespace CondModule
{using namespace MutexModule;class Cond{public:Cond(){int n = ::pthread_cond_init(&_cond, nullptr);if(n!=0)std::perror("Cond初始化失败...");}void Wait(Mutex &mutex) {int n = ::pthread_cond_wait(&_cond, mutex.LockPtr());if(n!=0)std::perror("cond wait失败...");}void Notify(){int n = ::pthread_cond_signal(&_cond);if(n!=0)std::perror("notify失败...");}void NotifyAll(){int n = ::pthread_cond_broadcast(&_cond);if(n!=0)std::perror("notifyall失败...");}~Cond(){int n = ::pthread_cond_destroy(&_cond);if(n!=0)std::perror("cond销毁失败...");}private:pthread_cond_t _cond;};
}
为了让条件变量更具有通⽤性,建议封装的时候,不要在Cond类内部引⽤对应的封装互斥量,要不然后⾯组合的时候,会因为代码耦合的问题难以初始化,因为⼀般⽽⾔Mutex和Cond基本是⼀起创建的。
5.6 条件变量使用规范
• 等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
6. 生产者消费者模型
-
为何要使用生产者消费者模型
生产者消费者模式就是通过⼀个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于⼀个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是⽤来给生产者和消费者解耦的。 -
生产者消费者模型优点
• 解耦
• 支持并发
• 支持忙闲不均
生产消费者模型可以总结为
321原则
;3种关系:生产者与生产者,生产者与消费者,消费者与消费者;2种角色:生产者与消费者;1个交易场所。
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞):
代码如下:
#pragma once#include <iostream>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"namespace BlockQueueModule
{using namespace MutexModule;using namespace CondModule;static const int gcap = 10;template <typename T>class BlockQueue{private:bool IsFull(){return _q.size() == _cap;}bool IsEmpty(){return _q.size() == 0;}public:BlockQueue(int cap = gcap): _cap(cap),_cwait_num(0),_pwait_num(0){}void Equeue(const T &in) // 生产者{LockGuard lockguard(_mutex);while(IsFull())//必须得是while{std::cout<<"生产者线程进入等待..."<<std::endl;++_pwait_num;_productor_cond.Wait(_mutex);//走到这里线程已经被唤醒,所以要--std::cout<<"生产者线程被唤醒..."<<std::endl;--_pwait_num;}//现在可以生产数据_q.push(in);// 肯定有数据!if (_cwait_num)//如果消费者那边有线程在等待{_consumer_cond.Notify();}}void Pop(T* out){LockGuard lockguard(_mutex);while(IsEmpty()){std::cout<<"消费者线程进入等待..."<<std::endl;++_cwait_num;_consumer_cond.Wait(_mutex);//线程被唤醒&&重新申请并持有锁(它会在临界区内醒来!)std::cout<<"消费者线程被唤醒..."<<std::endl;--_cwait_num;}//现在可以消费数据*out = _q.front();_q.pop();//肯定有空位给消费者生产if(_pwait_num)//如果生产者那边有线程在等待{_productor_cond.Notify();}}~BlockQueue(){}private:std::queue<T> _q;int _cap;Mutex _mutex;Cond _productor_cond;Cond _consumer_cond;int _cwait_num;int _pwait_num;};
}
这里我们使用了之前封装的锁与条件变量
- 单⽣产,单消费:
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>using namespace BlockQueueModule;void *Consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){int data;// 1. 从bq拿到数据bq->Pop(&data);// 2.做处理printf("Consumer, 消费了一个数据: %d\n", data);}
}void *Productor(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);int data = 0;while (true){sleep(2);bq->Equeue(data);printf("producter 生产了一个数据: %d\n", data);data++;}
}int main()
{// 交易场所,不仅仅可以用来进行传递数据// 传递任务!!!v1: 对象 v2BlockQueue<int> *bq = new BlockQueue<int>(5); // 共享资源 -> 临界资源// 单生产,单消费pthread_t c1, p1; pthread_create(&c1, nullptr, Consumer, bq);pthread_create(&p1, nullptr, Productor, bq);pthread_join(c1, nullptr);pthread_join(p1, nullptr);delete bq;return 0;
}
结果如下:
- 多生产,多消费:
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>using namespace BlockQueueModule;void *Consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){int data;// 1. 从bq拿到数据bq->Pop(&data);// 2.做处理printf("Consumer, 消费了一个数据: %d\n", data);}
}void *Productor(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);int data = 0;while (true){sleep(2);bq->Equeue(data);printf("producter 生产了一个数据: %d\n", data);data++;}
}int main()
{// 交易场所,不仅仅可以用来进行传递数据// 传递任务!!!v1: 对象 v2BlockQueue<int> *bq = new BlockQueue<int>(5); // 共享资源 -> 临界资源pthread_t c1, p1,c2,p2, p3;pthread_create(&c1, nullptr, Consumer, bq);pthread_create(&c2, nullptr, Consumer, bq);pthread_create(&p1, nullptr, Productor, bq);pthread_create(&p2, nullptr, Productor, bq);pthread_create(&p3, nullptr, Productor, bq);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);pthread_join(p3, nullptr);delete bq;return 0;
}
结果如下:
7. 结语
以上就是有关线程互斥与同步有关的内容啦,线程互斥指的是多个线程访问公共资源,保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用;线程同步指的是在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。以上就是今天所有的内容啦~ 完结撒花~ 🥳🎉🎉