线程的加载
在内存当中我们知道还有一个关于共享区的概念,在这上面他有对库映射的虚拟地址,也有对创建的线程pthread做的管理。
我们所用的pthread_create()函数呢,其实也就是就是返回在共享区里创建的线程地址。而线程地址指向的首地址其实是线程需要第一次进入的一个函数的首地址。
而在源码里边他就设置了这些函数,所以规定了我们只能传入对应的函数参数,以及函数指针
互斥与同步
在有时不同线程会去访问进程里全局的资源,我们将这种会共同去访问的全局资源,叫做共享资源,也叫临界资源,操作临界资源的代码块我们又叫做临界区。
互斥:当线程要访问临界资源时就会相互竞争,这就叫做互斥。
同步:线程在访问临界资源时需要有序的进程访问,也就是排队访问,这就是同步。
所以对于线程访问临界资源我们是需要加锁处理的我们一般有两种锁
互斥锁
pthread_mutex_t
pthread_mutex是多线程编程中用来实现互斥的一种机制。它可以用来保护共享资源,确保在某一时刻只有一个线程可以访问该资源。
以下是pthread_mutex的几个常用函数的详细讲解:
- pthread_mutex_init
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
该函数用来初始化一个互斥锁。需要传入一个pthread_mutex_t类型的指针作为参数,用来指向要初始化的互斥锁对象。还可以通过第二个参数attr来指定互斥锁的属性,如果不需要特定属性则可以传入NULL。
- pthread_mutex_lock
int pthread_mutex_lock(pthread_mutex_t *mutex)
该函数用来加锁一个互斥锁。需要传入一个pthread_mutex_t类型的指针作为参数,指向要加锁的互斥锁对象。如果互斥锁当前处于被锁定状态,则调用线程会被阻塞,直到互斥锁被解锁。
- pthread_mutex_trylock
int pthread_mutex_trylock(pthread_mutex_t *mutex)
该函数尝试对互斥锁进行加锁。与pthread_mutex_lock不同的是,如果互斥锁当前处于被锁定状态,则函数立即返回,不会阻塞线程。如果加锁成功,则返回0;否则,返回EBUSY。
- pthread_mutex_unlock
int pthread_mutex_unlock(pthread_mutex_t *mutex)
该函数解锁一个互斥锁。需要传入一个pthread_mutex_t类型的指针作为参数,指向要解锁的互斥锁对象。如果有其他线程正在等待这个互斥锁,则其中一个线程会被唤醒并获得锁。
- pthread_mutex_destroy
int pthread_mutex_destroy(pthread_mutex_t *mutex)
该函数销毁一个互斥锁。需要传入一个pthread_mutex_t类型的指针作为参数,指向要销毁的互斥锁对象。在程序退出前应该调用该函数来销毁互斥锁。
使用pthread_mutex的一般步骤如下:
-
定义一个pthread_mutex_t类型的变量作为互斥锁。
-
在需要对共享资源进行访问的代码块中,通过调用pthread_mutex_lock函数来加锁。
-
在使用完共享资源后,通过调用pthread_mutex_unlock函数来解锁。
-
在程序退出前,通过调用pthread_mutex_destroy函数来销毁互斥锁。
通过使用互斥锁,可以确保在某一时刻只有一个线程可以访问共享资源,从而避免竞争条件和数据的不一致性问题。
加锁部分源码:
根据这个
拿抢票来举例子
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sched.h>
int ticket = 3000;
pthread_mutex_t mutex;void *route(void *arg)
{char *id = (char *)arg;while (1){pthread_mutex_lock(&mutex);if (ticket > 0){usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;pthread_mutex_unlock(&mutex);}else{pthread_mutex_unlock(&mutex);break;}}
}int main(void)
{pthread_t t1, t2, t3, t4;//pthread_mutex_init(&mutex, NULL);pthread_create(&t1, NULL, route, (void *)"thread 1");pthread_create(&t2, NULL, route, (void *)"thread 2");pthread_create(&t3, NULL, route, (void *)"thread 3");pthread_create(&t4, NULL, route, (void *)"thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);//pthread_mutex_destroy(&mutex);
}
我们抢票时,访问资源肯定肯定会去访问临界资源的,如果我们不加锁,抢票的时候就会有这样一个结果。
不加锁情况!
因为当票为1时肯定会有多个线程去抢这个票,如果不加保护,就会使这个票减减多次。出现这样的结果。
加锁情况
加上之后,他们就可以做同步了
条件变量
pthread_cond_t是一个条件变量类型,用于线程间的同步和通信。它是一种线程间的信号机制,可以用来实现线程的等待和唤醒操作。
pthread_cond_t的使用通常需要和pthread_mutex_t配合使用,用于实现互斥访问共享资源。
下面是pthread_cond_t的一般用法:
定义和初始化条件变量:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t cond;
pthread_cond_init(&cond, NULL);
等待条件的线程调用pthread_cond_wait()函数来进入等待状态(释放已经持有的互斥锁,并等待条件变量的信号):
pthread_mutex_lock(&mutex); // 先加锁
while (条件不满足) {pthread_cond_wait(&cond, &mutex); // 等待条件变量的信号,并释放锁
}
pthread_mutex_unlock(&mutex); // 条件满足,解锁
满足条件的线程调用pthread_cond_signal()或pthread_cond_broadcast()函数来发送信号,告知等待的线程条件已经满足:
pthread_mutex_lock(&mutex); // 先加锁
// 修改条件、操作共享资源...
pthread_cond_signal(&cond); // 发送信号,唤醒一个线程
// pthread_cond_broadcast(&cond); // 发送信号,唤醒所有线程
pthread_mutex_unlock(&mutex); // 解锁
注意:pthread_cond_signal()函数只是唤醒一个等待的线程,而pthread_cond_broadcast()函数则是唤醒所有等待的线程。
基于阻塞队列的生产消费者模型
⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间 不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔 给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区, 平衡了⽣产者和消费者的处理能⼒。这个阻塞队列就是⽤来给⽣产者和消费者解耦的。
这样生产者与消费就可以解耦。
这时我们只需要维护好三个原则!
3种关系生产者与生产者,1.生产者与生产者(互斥),2.消费者与消费者(互斥),3.消费者与生产者(互斥加同步)。
2个角色,生产者与消费者。
1个交易场所--基于阻塞队列或环形队列的生产场所
#include<iostream>
#include<pthread.h>
#include<queue>
#include<unistd.h>
using std::cout;
using std::endl;namespace zgw
{const int gcapa=4;template<class T>class BlockQueue{public:bool Full(){return _data.size()==_capacity;}bool Empty(){return _data.empty();}BlockQueue(int capacity=gcapa):_capacity(capacity){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_consumer,nullptr);pthread_cond_init(&_productor,nullptr);}void Push(const T& getdata){pthread_mutex_lock(&_mutex);while(Full()){_pwait++;pthread_cond_wait(&_productor,&_mutex);//如果满了就进入等待队列,等待中如果遇到signal,并且有锁才会重新申请到锁_pwait--;}_data.push(getdata);if(_cwait){pthread_cond_signal(&_consumer);}pthread_mutex_unlock(&_mutex);}void Get(T*getdata){pthread_mutex_lock(&_mutex);while(Empty()){_cwait++;pthread_cond_wait(&_consumer,&_mutex);_cwait--;}*getdata=_data.front();_data.pop();if(_pwait){pthread_cond_signal(&_productor);}pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_consumer);pthread_cond_destroy(&_productor);}private:int _pwait=0;int _cwait=0;int _capacity;std::queue<T> _data;pthread_mutex_t _mutex;pthread_cond_t _consumer; //消费等待队列pthread_cond_t _productor; //生产者等待队列};
}
POSIX信号量
POSIX信号量(POSIX semaphore)的使用方法如下:
包含头文件:
#include <semaphore.h>
- 定义信号量变量:
sem_t semaphore;
- 初始化信号量:
sem_init(&semaphore, 0, initial_value);
其中,第一个参数是指向信号量变量的指针,第二个参数是用于指定信号量的作用范围(0表示线程间共享,非0表示进程间共享),第三个参数是信号量的初始值。
等待信号量(P操作):
sem_wait(&semaphore);
该函数会阻塞当前线程,直到信号量的值大于0。一旦信号量的值大于0,它会将信号量的值减1。
发送信号量(V操作):
sem_post(&semaphore);
该函数会将信号量的值加1。如果有其他线程正因为等待信号量而被阻塞,它会唤醒其中一个线程。
销毁信号量:
sem_destroy(&semaphore);
该函数会释放信号量变量占用的资源。在销毁信号量之前,必须确保没有线程在对该信号量进行等待操作。
需要注意的是,对于信号量的操作应该在互斥锁的保护下进行,以确保操作的原子性和正确性。以上是POSIX信号量的基本使用方法,可以根据具体的应用场景和需求进行适当的调整和扩展。
基于环形队列的⽣产消费模型
已经将信号量做了封装
#pragma once#include "Cond.hpp"
//#include"Mutex.hpp"
#include <queue>
#include <vector>
#include "Sem.hpp"
using std::cout;
using std::endl;namespace zgw
{int gcapa = 4;template <class T>class Ringqueue{public:Ringqueue(int _capa = gcapa) : _sempro(_capa), _semcon(0), _capacity(_capa), _rqueue(_capa), _curpro(0), _curcon(0){}void Set(T *data){_sempro.P();{_mutexpro.Lock();_rqueue[_curpro++] = *data;_curpro %= _capacity;_mutexpro.Unlock();}_semcon.V();}void Get(T *data){_semcon.P();{_mutexcon.Lock();*data = _rqueue[_curcon++];_curcon %=_capacity;_mutexcon.Unlock();}_sempro.V();}~Ringqueue(){}private:int _curpro;int _curcon;std::vector<T> _rqueue;int _capacity;Sem _sempro;Sem _semcon;Mymutex _mutexcon;Mymutex _mutexpro;};
}zgw::Ringqueue<int> rq;