手搓线程池
- 线程池工作原理和实现
- 线程池工作原理
- 1. 线程池的基本组成:
- 2. 线程池的基本执行流程:
- 3. 线程池的核心参数:
- 4. 线程池的生命周期:
- 5. 线程池的执行策略:
- 相关知识点
- 线程与进程的比较
- 读写锁
- 互斥锁
- 基于C语言的线程池设计与实现
- 基于C++的线程池设计与实现
线程池工作原理和实现
线程池工作原理
线程池(Thread Pool)是一种预先创建和管理一组工作线程的技术,用来优化并发任务的执行。通过复用这些线程来执行多个任务,线程池可以减少线程创建和销毁的开销,提高系统的性能和响应速度。
1. 线程池的基本组成:
- 任务队列(Task Queue):存放待执行任务的队列。当有新的任务提交时,它会进入任务队列,等待可用线程来执行。
- 工作线程(Worker Threads):线程池中的一组线程,用来处理任务队列中的任务。线程池启动时会创建一定数量的工作线程。
- 线程池管理器(Thread Pool Manager):负责管理线程池的大小、任务分配、线程的创建和销毁。
2. 线程池的基本执行流程:
- 提交任务:用户向线程池提交任务,通常是实现了
Runnable
或Callable
接口的对象。任务被放入任务队列中。 - 任务分配:线程池中的某个空闲工作线程会从任务队列中取出一个任务,并开始执行。任务队列是线程安全的,确保多个线程可以同时取任务而不发生冲突。
- 任务执行:工作线程运行并执行任务中的代码。线程池中的工作线程是循环的,每个线程在完成一个任务后会继续取下一个任务。
- 复用线程:任务完成后,线程不会被销毁,而是回到线程池中成为“空闲线程”,等待下一个任务。
- 动态调整线程数量:如果任务数量激增,线程池可以根据策略(如扩展线程池的大小)创建新的线程来处理更多的任务;如果任务减少,线程池也可能会销毁一些线程以节省资源。
3. 线程池的核心参数:
- 核心线程数(corePoolSize):线程池中保持活跃的核心线程数量,即使线程处于空闲状态也不会被回收。
- 最大线程数(maximumPoolSize):线程池中允许的最大线程数量。当任务数量超过核心线程数时,线程池会根据需求创建更多的线程,直到达到最大线程数。
- 任务队列(workQueue):用于存放等待执行的任务。如果当前所有线程都在忙碌,新的任务会被放入任务队列中。
- 线程存活时间(keepAliveTime):线程池中超过核心线程数的空闲线程在等待新任务的最长等待时间,超过这个时间后将被销毁。
- 线程工厂(ThreadFactory):用于创建新线程,方便自定义线程的属性,如线程的名称、优先级等。
- 拒绝策略(RejectedExecutionHandler):当任务过多而无法处理时(任务队列满了且线程池的线程数已达到上限),线程池会执行拒绝策略,常见策略包括丢弃任务、抛出异常、或由调用者线程直接执行。
4. 线程池的生命周期:
- 运行状态(RUNNING):线程池处于运行状态,可以接受任务并处理任务。
- 关闭状态(SHUTDOWN):线程池不再接受新任务,但会继续处理已经提交的任务。
- 停止状态(STOP):线程池不再接受任务,且会中断正在执行的任务。
- 终止状态(TERMINATED):所有任务执行完毕,线程池中的线程全部销毁,线程池彻底关闭。
5. 线程池的执行策略:
- 先使用核心线程:线程池优先利用核心线程来处理任务,只有在核心线程全部繁忙的情况下,才会将任务放入任务队列。
- 任务队列满了,创建新线程:如果任务队列已满且所有核心线程都在工作,线程池会创建新的线程,直到达到
maximumPoolSize
。 - 拒绝任务:如果任务队列满了,且线程池中的线程数量已经达到
maximumPoolSize
,根据配置的拒绝策略处理新任务。
相关知识点
线程与进程的比较
- 线程启动速度快,轻量级
- 线程使用有一定难度,需要处理数据一致性问题
- 同一线程共享的有堆、全局变量、静态变量、指针等,而独自占有栈
- 线程是调度的基本单位(PC、状态码、通用寄存器、线程栈及栈指针);进程是拥有资源的基本单位(打开文件、堆、代码段等)
- 一个进程内多个线程可以并发;多个进程可以并发
- 拥有资源:线程不拥有系统资源,但一个进程的多个线程可以共享隶属进程的资源;进程是拥有资源的独立单位
- 线程的系统开销小,线程创建销毁只需要处理PC值,状态码,通用寄存器值,线程栈及栈指针即可;进程创建和销毁需要重新分配及销毁task_struct结构。
读写锁
- 多个读者可以同时进行读
- 写者必须互斥(只允许一个写者写,也不能读者写者同时进行)
- 写者优先于读者(一旦有写者,则后续读者必须等待,唤醒时优先考虑写者)
互斥锁
一次只能一个线程拥有互斥锁,其他线程只有等待。
互斥锁是在抢锁失败的情况下主动放弃CPU进入睡眠状态直到锁的状态改变时再唤醒,而操作系统负责线程调度,为了实现锁的状态发生改变时唤醒阻塞的线程或者进程,需要把锁交给操作系统管理,所以互斥锁在加锁操作时设计上下文的切换。互斥锁实际的效率还是可以让人接受的,加锁的时间大概100ns左右,而实际上互斥锁的一种可能的实现是先自旋一段时间,当自旋的时间超过阈值之后再将线程投入到睡眠中,因此在并发运算中使用互斥锁(每次占用锁的时间很短)的效果可能不亚于使用自旋锁。
互斥锁属于sleep-waiting
类型的锁。例如在一个双核的机器上有两个线程A和B,它们分别运行在core 0和core 1上。假设线程A想要通过pthread_mutex_lock
操作去得到一个临界区的锁,而此时这个锁正被线程B所持有,那么线程A就会被阻塞,此时会通过上下文切换将线程A置于等待队列中,此时core 0就可以运行其他的任务(如线程C)。
基于C语言的线程池设计与实现
任务队列
typedef struct Task
{void (*function) (void* arg); // void*是一个泛型,能够接收各种各样的数据类型void* arg;
}Task;
线程池定义
struct ThreadPool
{// 任务队列Task* taskQ; // 队列数组int queueCapacity; // 容量int queueSize; // 当前任务个数int queueFront; // 队头,取数据int queueRear; // 队尾,放数据pthread_t managerID; // 管理者线程IDpthread_t *threadIDs; // 工作的线程IDint minNum; // 最小的线程数int maxNum; // 最大的线程数int busyNum; // 工作的线程个数int liveNum; // 存活的线程个数int exitNum; // 要杀死的线程个数pthread_mutex_t mutexpool; // 锁住整个线程池pthread_mutex_t mutexBusy; // 锁住busyNum变量pthread_cond_t notFull; // 任务队列是否满了pthread_cond_t notEmpty; // 任务队列是否空了int shutdown; // 是否销毁线程池,销毁为1,不销毁为0
};
头文件声明
#ifndef _THREADPOOL_H
#define _THREADPOOL_Htypedef struct ThreadPool ThreadPool;
// 创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int queueSize);// 销毁线程池
int threadPoolDestory(ThreadPool* pool);// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);void* worker(void* arg);void* manager(void* arg);void threadExit(ThreadPool* pool);#endif
源文件定义
#include "threadpool.h"
#include<pthread.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<stdio.h>const int NUMBER = 2;// 任务结构体
typedef struct Task
{ // void*是一个泛型,能够接收各种各样的数据类型void (*function) (void* arg); // 函数指针void* arg;
}Task;// 线程池结构体
struct ThreadPool
{// 任务队列Task* taskQ; // 队列数组int queueCapacity; // 容量int queueSize; // 当前任务个数int queueFront; // 队头,取数据int queueRear; // 队尾,放数据pthread_t managerID; // 管理者线程IDpthread_t *threadIDs; // 工作的线程IDint minNum; // 最小的线程数int maxNum; // 最大的线程数int busyNum; // 工作的线程个数int liveNum; // 存活的线程个数int exitNum; // 要杀死的线程个数pthread_mutex_t mutexpool; // 锁住整个线程池pthread_mutex_t mutexBusy; // 锁住busyNum变量pthread_cond_t notFull; // 任务队列是否满了,用于阻塞生产者pthread_cond_t notEmpty; // 任务队列是否空了,用于阻塞消费者int shutdown; // 是否销毁线程池,销毁为1,不销毁为0
};ThreadPool* threadPoolCreate(int min, int max, int queueSize) {ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));do {if(pool == NULL) {printf("malloc threadpool fail....\n");break;}pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);if(pool->threadIDs == NULL) {printf("malloc threadIDs fail....\n");break;}memset(pool->threadIDs, 0, sizeof(pthread_t) * max);pool->minNum = min;pool->maxNum = max;pool->busyNum = 0;pool->liveNum = min; // 和最小个数相等pool->exitNum = 0;if(pthread_mutex_init(&pool->mutexpool, NULL) != 0 ||pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||pthread_cond_init(&pool->notEmpty, NULL) != 0 ||pthread_cond_init(&pool->notFull, NULL) != 0) {printf("mutex or condif init fail....\n");break;}// 任务队列pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);pool->queueCapacity = queueSize;pool->queueSize = 0;pool->queueFront = 0;pool->queueRear = 0;pool->shutdown = 0;// 创建管理者线程和工作者线程pthread_create(&pool->managerID, NULL, manager, pool); // 第三个参数为管理者线程的任务函数for(int i = 0; i < min; i++) {pthread_create(&pool->threadIDs[i], NULL, worker, pool); // 第三个参数为工作的线程的任务函数}return pool;} while(0);// 释放资源if(pool->threadIDs) free(pool->threadIDs);if(pool->taskQ) free(pool->taskQ);if(pool) free(pool);return NULL;
}int threadPoolDestory(ThreadPool* pool) {if(pool == NULL) {return -1;}// 关闭线程池pool->shutdown = 1;// 阻塞回收管理者线程pthread_join(pool->managerID, NULL);// 唤醒阻塞的消费者线程for(int i = 0; i < pool->liveNum; i++) {pthread_cond_signal(&pool->notEmpty);}// 释放堆内存if(pool->taskQ) {free(pool->taskQ);}if(pool->threadIDs) {free(pool->threadIDs);}pthread_mutex_destroy(&pool->mutexpool);pthread_mutex_destroy(&pool->mutexBusy);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);free(pool);pool = NULL;return 0;
}void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg) {pthread_mutex_lock(&pool->mutexpool);while(pool->queueSize == pool->queueCapacity && !pool->shutdown) {// 阻塞生产者线程pthread_cond_wait(&pool->notFull, &pool->mutexpool);}if(pool->shutdown) {pthread_mutex_unlock(&pool->mutexpool);return;}// 添加任务pool->taskQ[pool->queueRear].function = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pool->queueSize++;pthread_cond_signal(&pool->notEmpty); // 唤醒阻塞在条件变量里的工作的线程,即生产者生产后唤醒消费者pthread_mutex_unlock(&pool->mutexpool);
}int threadPoolBusyNum(ThreadPool* pool) {pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;
}int threadPoolAliveNum(ThreadPool* pool) {pthread_mutex_lock(&pool->mutexpool);int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexpool);return liveNum;
}void* worker(void* arg) {ThreadPool* pool = (ThreadPool*)arg;while(1) {pthread_mutex_lock(&pool->mutexpool); // 访问线程池之前加锁// 当前任务队列是否为空while(pool->queueSize == 0 && !pool->shutdown) { // 任务队列为空并且线程池没有被关闭// 阻塞工作线程pthread_cond_wait(&pool->notEmpty, &pool->mutexpool);// 判断是不是要销毁线程if(pool->exitNum > 0) {pool->exitNum--;if(pool->liveNum > pool->minNum) {pool->liveNum--;pthread_mutex_unlock(&pool->mutexpool); // 阻塞的时候已经获得锁,已经锁上,如果不解开就会死锁threadExit(pool);}}}// 判断线程池是否被关闭了if(pool->shutdown) {pthread_mutex_unlock(&pool->mutexpool); // 避免死锁threadExit(pool);}// 从任务队列中取出一个任务Task task;task.function = pool->taskQ[pool->queueFront].function;task.arg = pool->taskQ[pool->queueFront].arg;// 移动头结点pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity; // 循环队列pool->queueSize--;pthread_cond_signal(&pool->notFull); // 消费者消费完产品后唤醒生产者pthread_mutex_unlock(&pool->mutexpool); // 用完之后解锁printf("thread %ld start working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;pthread_mutex_unlock(&pool->mutexBusy);task.function(task.arg);free(task.arg);task.arg = NULL;printf("thread %ld end working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;pthread_mutex_unlock(&pool->mutexBusy);}return NULL;
}void* manager(void* arg) {ThreadPool* pool = (ThreadPool*)arg;while(!pool->shutdown) {// 每隔三秒钟检测一次sleep(3);// 取出线程池中任务的数量和当前线程的数量pthread_mutex_lock(&pool->mutexpool);int queueSize = pool->queueSize;int liveNumber = pool->liveNum;pthread_mutex_unlock(&pool->mutexpool);// 取出忙的线程的数量pthread_mutex_lock(&pool->mutexBusy);int busyNumber = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);// 添加线程// 任务的个数 大于 存活的线程个数 && 存活的线程数 小于 最大线程数if(queueSize > liveNumber && liveNumber < pool->maxNum) {pthread_mutex_lock(&pool->mutexpool);int count = 0;for(int i = 0; i < pool->maxNum && count < NUMBER && liveNumber < pool->maxNum; i++) {if(pool->threadIDs[i] == 0) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);count++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexpool);}// 销毁线程// 忙的线程 * 2 小于 存活的线程数 && 存活的线程 大于 最小线程数// 之所以不用对pool->minNum加锁,是因为这个值是固定的,只需要读不需要写if(busyNumber * 2 < liveNumber && liveNumber > pool->minNum) {pthread_mutex_lock(&pool->mutexpool);pool->exitNum = NUMBER;pthread_mutex_unlock(&pool->mutexpool);// 让工作的线程自杀for(int i = 0; i < NUMBER; i++) {pthread_cond_signal(&pool->notEmpty);}}}
}void threadExit(ThreadPool* pool) {pthread_t tid = pthread_self(); // 获得线程自身的ID。for(int i = 0; i < pool->maxNum; i++) {if(pool->threadIDs[i] == tid) { // tid对应的线程要退出了pool->threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);
}
测试代码
#include<stdio.h>
#include "threadpool.h"
#include<pthread.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>void taskFunc(void* arg) {int num = *(int*)arg;printf("thread %ld is working, number = %d\n", pthread_self(), num);sleep(1);
}int main() {// 创建线程池ThreadPool* pool = threadPoolCreate(3, 10, 100);for(int i = 0; i < 100; i++) {int* num = (int*)malloc(sizeof(int));*num = i + 100;threadPoolAdd(pool, taskFunc, num);}sleep(30); // 等待子线程把任务处理完毕threadPoolDestory(pool);return 0;
}
基于C++的线程池设计与实现
因为在C++中,delete task.arg时候,由于delete void*类型是有危险的,因为void*指针只占四个字节,因此有可能不能全部地被释放,为了知道在程序中void*实际上是什么类型,因此在C++中可以使用模板来解决这一问题,因为模板可以传递类型。
任务队列声明
#pragma once
#include<queue>
#include<pthread.h>using callback = void (*) (void* arg);// 任务结构体
template <typename T>
struct Task
{ Task<T>() {function = nullptr;arg = nullptr;}Task<T>(callback f, void* arg) {this->arg = (T*)arg;function = f;}callback function; // 函数指针T* arg;
};template <typename T>
class TaskQueue
{
private:std::queue <Task<T>> m_taskQ;pthread_mutex_t m_mutex; // 互斥锁
public:TaskQueue(/* args */);~TaskQueue();// 添加任务void addTask(Task<T> task);void addTask(callback f, void* arg);// 取出一个任务Task<T> takeTask();// 获取当前任务的个数inline size_t taskNumber() { // 没有if判断什么的,比较简单的直接写成内联函数比较好return m_taskQ.size();}
};
任务队列定义
#include "TaskQueue.h"template <typename T>
TaskQueue<T>::TaskQueue(/* args */)
{pthread_mutex_init(&m_mutex, NULL);
}template <typename T>
TaskQueue<T>::~TaskQueue()
{pthread_mutex_destroy(&m_mutex);
}template <typename T>
void TaskQueue<T>::addTask(Task<T> task)
{pthread_mutex_lock(&m_mutex);m_taskQ.push(task);pthread_mutex_unlock(&m_mutex);
}template <typename T>
void TaskQueue<T>::addTask(callback f, void* arg)
{pthread_mutex_lock(&m_mutex);m_taskQ.push(Task<T>(f, arg));pthread_mutex_unlock(&m_mutex);
}template <typename T>
Task<T> TaskQueue<T>::takeTask()
{Task<T> task;pthread_mutex_lock(&m_mutex);if(!m_taskQ.empty()) {task = m_taskQ.front();m_taskQ.pop();}pthread_mutex_unlock(&m_mutex);return task;
}
线程池声明
#pragma once
#include "TaskQueue.h"
#include "TaskQueue.cpp"template <typename T>
class ThreadPool
{
public:ThreadPool(int min, int max);~ThreadPool();// 添加任务void addTask(Task<T> task);// 获取忙线程的个数int getBusyNumber();// 获取活着的线程个数int getAliveNumber();private:// 工作的线程的任务函数static void* worker(void* arg);// 管理者线程的任务函数static void* manager(void* arg);void threadExit();static const int NUMBER = 2;pthread_mutex_t mutexPool;pthread_cond_t notEmpty;pthread_t* threadIDs;pthread_t managerID;TaskQueue<T>* taskQ;int minNum;int maxNum;int busyNum;int liveNum;int exitNum;bool shutdown = false;
};
线程池定义
#include "ThreadPool.h"
#include <iostream>
#include <string.h>
#include <string>
#include <unistd.h>using namespace std;template <typename T>
ThreadPool<T>::ThreadPool(int min, int max) {// 实例化任务队列do {taskQ = new TaskQueue<T>;if(taskQ == nullptr) {cout << "malloc threadIDs fail...\n";break;}threadIDs = new pthread_t[max];if(threadIDs == nullptr) {cout << "malloc threadIDs fail....\n";break;}memset(threadIDs, 0, sizeof(pthread_t) * max);minNum = min;maxNum = max;busyNum = 0;liveNum = min; // 和最小个数相等exitNum = 0;if(pthread_mutex_init(&mutexPool, NULL) != 0 ||pthread_cond_init(¬Empty, NULL) != 0) {cout << "mutex or condif init fail....\n";break;}shutdown = false;/*为什么要把this传给manager呢?因为manager是一个静态方法,静态方法它只能访问类里边的静态成员变量,它不能访问类的非静态成员变量。因此如果我们想要访问这些非静态成员变量,就必须要给这个静态方法传进去一个实例化对象,通过传进去的这个实例化对象来访问里边的非静态成员函数和变量*/// 创建管理者线程和工作者线程pthread_create(&managerID, NULL, manager, this); // 第三个参数为管理者线程的任务函数for(int i = 0; i < min; i++) {pthread_create(&threadIDs[i], NULL, worker, this); // 第三个参数为工作的线程的任务函数}return;} while(0);// 释放资源if(threadIDs) delete []threadIDs;if(taskQ) delete taskQ;
}template <typename T>
ThreadPool<T>::~ThreadPool() {// 关闭线程池shutdown = true;// 阻塞回收管理者线程pthread_join(managerID, NULL);// 唤醒阻塞的消费者线程for(int i = 0; i < liveNum; i++) { // 因为到了最后没有任务了,所以活着的线程都需要关闭pthread_cond_signal(¬Empty);}// 释放堆内存if(taskQ) {delete taskQ;}if(threadIDs) {delete []threadIDs;}pthread_mutex_destroy(&mutexPool);pthread_cond_destroy(¬Empty);
}template <typename T>
void ThreadPool<T>::addTask(Task<T> task) {if(shutdown) {return;}// 添加任务taskQ->addTask(task);pthread_cond_signal(¬Empty); // 唤醒阻塞在条件变量里的工作的线程,即生产者生产后唤醒消费者
}template <typename T>
int ThreadPool<T>::getBusyNumber() {pthread_mutex_lock(&mutexPool);int busyNum = this->busyNum;pthread_mutex_unlock(&mutexPool);return busyNum;
}template <typename T>
int ThreadPool<T>::getAliveNumber() {pthread_mutex_lock(&mutexPool);int liveNum = this->liveNum;pthread_mutex_unlock(&mutexPool);return liveNum;
}template <typename T>
void* ThreadPool<T>::worker(void* arg) {ThreadPool* pool = static_cast<ThreadPool*>(arg); // static_cast相当于c里面的强制类型转换while(1) {pthread_mutex_lock(&pool->mutexPool); // 访问线程池之前加锁// 当前任务队列是否为空while(pool->taskQ->taskNumber() == 0 && !pool->shutdown) { // 任务队列为空并且线程池没有被关闭// 阻塞工作线程pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);// 判断是不是要销毁线程if(pool->exitNum > 0) {pool->exitNum--;if(pool->liveNum > pool->minNum) {pool->liveNum--;pthread_mutex_unlock(&pool->mutexPool); // 阻塞的时候已经获得锁,已经锁上,如果不解开就会死锁pool->threadExit();}}}// 判断线程池是否被关闭了if(pool->shutdown) {pthread_mutex_unlock(&pool->mutexPool); // 避免死锁pool->threadExit();}// 从任务队列中取出一个任务Task<T> task = pool->taskQ->takeTask();pool->busyNum++;// 因为任务队列可以无限大,所以生产者可以不用唤醒pthread_mutex_unlock(&pool->mutexPool); // 用完之后解锁printf("thread %ld start working...\n", pthread_self());task.function(task.arg);delete task.arg; // task.arg = nullptr;printf("thread %ld end working...\n", pthread_self());pthread_mutex_lock(&pool->mutexPool);pool->busyNum--;pthread_mutex_unlock(&pool->mutexPool);}return NULL;
}template <typename T>
void* ThreadPool<T>::manager(void* arg) {ThreadPool* pool = static_cast<ThreadPool*>(arg);while(!pool->shutdown) {// 每隔三秒钟检测一次sleep(3);// 取出线程池中任务的数量和当前线程的数量pthread_mutex_lock(&pool->mutexPool);int queueSize = pool->taskQ->taskNumber();int liveNumber = pool->liveNum;int busyNumber = pool->busyNum; // 取出忙的线程的数量pthread_mutex_unlock(&pool->mutexPool);// 添加线程// 任务的个数 大于 存活的线程个数 && 存活的线程数 小于 最大线程数if(queueSize > liveNumber && liveNumber < pool->maxNum) {pthread_mutex_lock(&pool->mutexPool);int count = 0;for(int i = 0; i < pool->maxNum && count < NUMBER && liveNumber < pool->maxNum; i++) {if(pool->threadIDs[i] == 0) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);count++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}// 销毁线程// 忙的线程 * 2 小于 存活的线程数 && 存活的线程 大于 最小线程数// 之所以不用对pool->minNum加锁,是因为这个值是固定的,只需要读不需要写if(busyNumber * 2 < liveNumber && liveNumber > pool->minNum) {pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;pthread_mutex_unlock(&pool->mutexPool);// 让工作的线程自杀for(int i = 0; i < NUMBER; i++) {// 唤醒已经被阻塞的工作的线程,因为工作的线程无事可做,肯定是被阻塞在条件变量中pthread_cond_signal(&pool->notEmpty); }}}
}template <typename T>
void ThreadPool<T>::threadExit() {pthread_t tid = pthread_self(); // 获得当前线程的线程ID。for(int i = 0; i < maxNum; i++) {if(threadIDs[i] == tid) { // tid对应的线程要退出了threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);
}
测试
#include "ThreadPool.h"
#include "ThreadPool.cpp"
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>void taskFunc(void* arg) {int num = *(int*)arg;printf("thread %ld is working, number = %d\n", pthread_self(), num);sleep(1);
}int main() {// 创建线程池ThreadPool<int> pool(3, 10);for(int i = 0; i < 100; i++) {int* num = new int(i + 100);pool.addTask(Task<int>(taskFunc, num));}sleep(20); // 等待子线程把任务处理完毕return 0;
}
g++ main.cpp -o main -lpthread