从头开始:手搓一个基于C/C++的线程池

ops/2024/9/25 15:01:50/

手搓线程池

  • 线程池工作原理和实现
    • 线程池工作原理
        • 1. 线程池的基本组成:
        • 2. 线程池的基本执行流程:
        • 3. 线程池的核心参数:
        • 4. 线程池的生命周期:
        • 5. 线程池的执行策略:
    • 相关知识点
      • 线程与进程的比较
      • 读写锁
      • 互斥锁
    • 基于C语言的线程池设计与实现
    • 基于C++的线程池设计与实现

线程池工作原理和实现

线程池工作原理

线程池(Thread Pool)是一种预先创建和管理一组工作线程的技术,用来优化并发任务的执行。通过复用这些线程来执行多个任务,线程池可以减少线程创建和销毁的开销,提高系统的性能和响应速度。

1. 线程池的基本组成:
  • 任务队列(Task Queue):存放待执行任务的队列。当有新的任务提交时,它会进入任务队列,等待可用线程来执行。
  • 工作线程(Worker Threads):线程池中的一组线程,用来处理任务队列中的任务。线程池启动时会创建一定数量的工作线程。
  • 线程池管理器(Thread Pool Manager):负责管理线程池的大小、任务分配、线程的创建和销毁。
2. 线程池的基本执行流程:
  1. 提交任务:用户向线程池提交任务,通常是实现了 RunnableCallable 接口的对象。任务被放入任务队列中。
  2. 任务分配:线程池中的某个空闲工作线程会从任务队列中取出一个任务,并开始执行。任务队列是线程安全的,确保多个线程可以同时取任务而不发生冲突。
  3. 任务执行:工作线程运行并执行任务中的代码。线程池中的工作线程是循环的,每个线程在完成一个任务后会继续取下一个任务。
  4. 复用线程:任务完成后,线程不会被销毁,而是回到线程池中成为“空闲线程”,等待下一个任务。
  5. 动态调整线程数量:如果任务数量激增,线程池可以根据策略(如扩展线程池的大小)创建新的线程来处理更多的任务;如果任务减少,线程池也可能会销毁一些线程以节省资源。
3. 线程池的核心参数:
  • 核心线程数(corePoolSize):线程池中保持活跃的核心线程数量,即使线程处于空闲状态也不会被回收。
  • 最大线程数(maximumPoolSize):线程池中允许的最大线程数量。当任务数量超过核心线程数时,线程池会根据需求创建更多的线程,直到达到最大线程数。
  • 任务队列(workQueue):用于存放等待执行的任务。如果当前所有线程都在忙碌,新的任务会被放入任务队列中。
  • 线程存活时间(keepAliveTime):线程池中超过核心线程数的空闲线程在等待新任务的最长等待时间,超过这个时间后将被销毁。
  • 线程工厂(ThreadFactory):用于创建新线程,方便自定义线程的属性,如线程的名称、优先级等。
  • 拒绝策略(RejectedExecutionHandler):当任务过多而无法处理时(任务队列满了且线程池的线程数已达到上限),线程池会执行拒绝策略,常见策略包括丢弃任务、抛出异常、或由调用者线程直接执行。
4. 线程池的生命周期:
  • 运行状态(RUNNING):线程池处于运行状态,可以接受任务并处理任务。
  • 关闭状态(SHUTDOWN):线程池不再接受新任务,但会继续处理已经提交的任务。
  • 停止状态(STOP):线程池不再接受任务,且会中断正在执行的任务。
  • 终止状态(TERMINATED):所有任务执行完毕,线程池中的线程全部销毁,线程池彻底关闭。
5. 线程池的执行策略:
  • 先使用核心线程:线程池优先利用核心线程来处理任务,只有在核心线程全部繁忙的情况下,才会将任务放入任务队列。
  • 任务队列满了,创建新线程:如果任务队列已满且所有核心线程都在工作,线程池会创建新的线程,直到达到 maximumPoolSize
  • 拒绝任务:如果任务队列满了,且线程池中的线程数量已经达到 maximumPoolSize,根据配置的拒绝策略处理新任务。

相关知识点

线程与进程的比较

  • 线程启动速度快,轻量级
  • 线程使用有一定难度,需要处理数据一致性问题
  • 同一线程共享的有堆、全局变量、静态变量、指针等,而独自占有栈
  • 线程是调度的基本单位(PC、状态码、通用寄存器、线程栈及栈指针);进程是拥有资源的基本单位(打开文件、堆、代码段等)
  • 一个进程内多个线程可以并发;多个进程可以并发
  • 拥有资源:线程不拥有系统资源,但一个进程的多个线程可以共享隶属进程的资源;进程是拥有资源的独立单位
  • 线程的系统开销小,线程创建销毁只需要处理PC值,状态码,通用寄存器值,线程栈及栈指针即可;进程创建和销毁需要重新分配及销毁task_struct结构。

读写锁

  • 多个读者可以同时进行读
  • 写者必须互斥(只允许一个写者写,也不能读者写者同时进行)
  • 写者优先于读者(一旦有写者,则后续读者必须等待,唤醒时优先考虑写者)

互斥锁

一次只能一个线程拥有互斥锁,其他线程只有等待。

互斥锁是在抢锁失败的情况下主动放弃CPU进入睡眠状态直到锁的状态改变时再唤醒,而操作系统负责线程调度,为了实现锁的状态发生改变时唤醒阻塞的线程或者进程,需要把锁交给操作系统管理,所以互斥锁在加锁操作时设计上下文的切换。互斥锁实际的效率还是可以让人接受的,加锁的时间大概100ns左右,而实际上互斥锁的一种可能的实现是先自旋一段时间,当自旋的时间超过阈值之后再将线程投入到睡眠中,因此在并发运算中使用互斥锁(每次占用锁的时间很短)的效果可能不亚于使用自旋锁。

互斥锁属于sleep-waiting类型的锁。例如在一个双核的机器上有两个线程A和B,它们分别运行在core 0和core 1上。假设线程A想要通过pthread_mutex_lock操作去得到一个临界区的锁,而此时这个锁正被线程B所持有,那么线程A就会被阻塞,此时会通过上下文切换将线程A置于等待队列中,此时core 0就可以运行其他的任务(如线程C)。

基于C语言的线程池设计与实现

任务队列

typedef struct Task
{void (*function) (void* arg); // void*是一个泛型,能够接收各种各样的数据类型void* arg;
}Task;

线程池定义

struct ThreadPool
{// 任务队列Task* taskQ; // 队列数组int queueCapacity;  // 容量int queueSize;      // 当前任务个数int queueFront;     // 队头,取数据int queueRear;      // 队尾,放数据pthread_t managerID;    // 管理者线程IDpthread_t *threadIDs;   // 工作的线程IDint minNum;             // 最小的线程数int maxNum;             // 最大的线程数int busyNum;            // 工作的线程个数int liveNum;            // 存活的线程个数int exitNum;            // 要杀死的线程个数pthread_mutex_t mutexpool;  // 锁住整个线程池pthread_mutex_t mutexBusy;  // 锁住busyNum变量pthread_cond_t notFull;     // 任务队列是否满了pthread_cond_t notEmpty;    // 任务队列是否空了int shutdown;               // 是否销毁线程池,销毁为1,不销毁为0
};

头文件声明

#ifndef _THREADPOOL_H
#define _THREADPOOL_Htypedef struct ThreadPool ThreadPool;
// 创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int queueSize);// 销毁线程池
int threadPoolDestory(ThreadPool* pool);// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);void* worker(void* arg);void* manager(void* arg);void threadExit(ThreadPool* pool);#endif 

源文件定义

#include "threadpool.h"
#include<pthread.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<stdio.h>const int NUMBER = 2;// 任务结构体
typedef struct Task
{   // void*是一个泛型,能够接收各种各样的数据类型void (*function) (void* arg); // 函数指针void* arg;
}Task;// 线程池结构体
struct ThreadPool
{// 任务队列Task* taskQ; // 队列数组int queueCapacity;  // 容量int queueSize;      // 当前任务个数int queueFront;     // 队头,取数据int queueRear;      // 队尾,放数据pthread_t managerID;    // 管理者线程IDpthread_t *threadIDs;   // 工作的线程IDint minNum;             // 最小的线程数int maxNum;             // 最大的线程数int busyNum;            // 工作的线程个数int liveNum;            // 存活的线程个数int exitNum;            // 要杀死的线程个数pthread_mutex_t mutexpool;  // 锁住整个线程池pthread_mutex_t mutexBusy;  // 锁住busyNum变量pthread_cond_t notFull;     // 任务队列是否满了,用于阻塞生产者pthread_cond_t notEmpty;    // 任务队列是否空了,用于阻塞消费者int shutdown;               // 是否销毁线程池,销毁为1,不销毁为0
};ThreadPool* threadPoolCreate(int min, int max, int queueSize) {ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));do {if(pool == NULL) {printf("malloc threadpool fail....\n");break;}pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);if(pool->threadIDs == NULL) {printf("malloc threadIDs fail....\n");break;}memset(pool->threadIDs, 0, sizeof(pthread_t) * max);pool->minNum = min;pool->maxNum = max;pool->busyNum = 0;pool->liveNum = min; // 和最小个数相等pool->exitNum = 0;if(pthread_mutex_init(&pool->mutexpool, NULL) != 0 ||pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||pthread_cond_init(&pool->notEmpty, NULL) != 0 ||pthread_cond_init(&pool->notFull, NULL) != 0) {printf("mutex or condif init fail....\n");break;}// 任务队列pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);pool->queueCapacity = queueSize;pool->queueSize = 0;pool->queueFront = 0;pool->queueRear = 0;pool->shutdown = 0;// 创建管理者线程和工作者线程pthread_create(&pool->managerID, NULL, manager, pool); // 第三个参数为管理者线程的任务函数for(int i = 0; i < min; i++) {pthread_create(&pool->threadIDs[i], NULL, worker, pool); // 第三个参数为工作的线程的任务函数}return pool;} while(0);// 释放资源if(pool->threadIDs) free(pool->threadIDs);if(pool->taskQ) free(pool->taskQ);if(pool) free(pool);return NULL;
}int threadPoolDestory(ThreadPool* pool) {if(pool == NULL) {return -1;}// 关闭线程池pool->shutdown = 1;// 阻塞回收管理者线程pthread_join(pool->managerID, NULL);// 唤醒阻塞的消费者线程for(int i = 0; i < pool->liveNum; i++) {pthread_cond_signal(&pool->notEmpty);}// 释放堆内存if(pool->taskQ) {free(pool->taskQ);}if(pool->threadIDs) {free(pool->threadIDs);}pthread_mutex_destroy(&pool->mutexpool);pthread_mutex_destroy(&pool->mutexBusy);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);free(pool);pool = NULL;return 0;
}void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg) {pthread_mutex_lock(&pool->mutexpool);while(pool->queueSize == pool->queueCapacity && !pool->shutdown) {// 阻塞生产者线程pthread_cond_wait(&pool->notFull, &pool->mutexpool);}if(pool->shutdown) {pthread_mutex_unlock(&pool->mutexpool);return;}// 添加任务pool->taskQ[pool->queueRear].function = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pool->queueSize++;pthread_cond_signal(&pool->notEmpty); // 唤醒阻塞在条件变量里的工作的线程,即生产者生产后唤醒消费者pthread_mutex_unlock(&pool->mutexpool);
}int threadPoolBusyNum(ThreadPool* pool) {pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;
}int threadPoolAliveNum(ThreadPool* pool) {pthread_mutex_lock(&pool->mutexpool);int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexpool);return liveNum;
}void* worker(void* arg) {ThreadPool* pool = (ThreadPool*)arg;while(1) {pthread_mutex_lock(&pool->mutexpool); // 访问线程池之前加锁// 当前任务队列是否为空while(pool->queueSize == 0 && !pool->shutdown) { // 任务队列为空并且线程池没有被关闭// 阻塞工作线程pthread_cond_wait(&pool->notEmpty, &pool->mutexpool);// 判断是不是要销毁线程if(pool->exitNum > 0) {pool->exitNum--;if(pool->liveNum > pool->minNum) {pool->liveNum--;pthread_mutex_unlock(&pool->mutexpool); // 阻塞的时候已经获得锁,已经锁上,如果不解开就会死锁threadExit(pool);}}}// 判断线程池是否被关闭了if(pool->shutdown) {pthread_mutex_unlock(&pool->mutexpool); // 避免死锁threadExit(pool);}// 从任务队列中取出一个任务Task task;task.function = pool->taskQ[pool->queueFront].function;task.arg = pool->taskQ[pool->queueFront].arg;// 移动头结点pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity; // 循环队列pool->queueSize--;pthread_cond_signal(&pool->notFull); // 消费者消费完产品后唤醒生产者pthread_mutex_unlock(&pool->mutexpool); // 用完之后解锁printf("thread %ld start working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;pthread_mutex_unlock(&pool->mutexBusy);task.function(task.arg);free(task.arg);task.arg = NULL;printf("thread %ld end working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;pthread_mutex_unlock(&pool->mutexBusy);}return NULL;
}void* manager(void* arg) {ThreadPool* pool = (ThreadPool*)arg;while(!pool->shutdown) {// 每隔三秒钟检测一次sleep(3);// 取出线程池中任务的数量和当前线程的数量pthread_mutex_lock(&pool->mutexpool);int queueSize = pool->queueSize;int liveNumber = pool->liveNum;pthread_mutex_unlock(&pool->mutexpool);// 取出忙的线程的数量pthread_mutex_lock(&pool->mutexBusy);int busyNumber = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);// 添加线程// 任务的个数 大于 存活的线程个数 && 存活的线程数 小于 最大线程数if(queueSize > liveNumber && liveNumber < pool->maxNum) {pthread_mutex_lock(&pool->mutexpool);int count = 0;for(int i = 0; i < pool->maxNum && count < NUMBER && liveNumber < pool->maxNum; i++) {if(pool->threadIDs[i] == 0) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);count++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexpool);}// 销毁线程// 忙的线程 * 2 小于 存活的线程数 && 存活的线程 大于 最小线程数// 之所以不用对pool->minNum加锁,是因为这个值是固定的,只需要读不需要写if(busyNumber * 2 < liveNumber && liveNumber > pool->minNum) {pthread_mutex_lock(&pool->mutexpool);pool->exitNum = NUMBER;pthread_mutex_unlock(&pool->mutexpool);// 让工作的线程自杀for(int i = 0; i < NUMBER; i++) {pthread_cond_signal(&pool->notEmpty);}}}
}void threadExit(ThreadPool* pool) {pthread_t tid = pthread_self(); // 获得线程自身的ID。for(int i = 0; i < pool->maxNum; i++) {if(pool->threadIDs[i] == tid) { // tid对应的线程要退出了pool->threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);
}

测试代码

#include<stdio.h>
#include "threadpool.h"
#include<pthread.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>void taskFunc(void* arg) {int num = *(int*)arg;printf("thread %ld is working, number = %d\n", pthread_self(), num);sleep(1);
}int main() {// 创建线程池ThreadPool* pool = threadPoolCreate(3, 10, 100);for(int i = 0; i < 100; i++) {int* num = (int*)malloc(sizeof(int));*num = i + 100;threadPoolAdd(pool, taskFunc, num);}sleep(30); // 等待子线程把任务处理完毕threadPoolDestory(pool);return 0;
}

image-20240922170244684

基于C++的线程池设计与实现

因为在C++中,delete task.arg时候,由于delete void*类型是有危险的,因为void*指针只占四个字节,因此有可能不能全部地被释放,为了知道在程序中void*实际上是什么类型,因此在C++中可以使用模板来解决这一问题,因为模板可以传递类型。

任务队列声明

#pragma once
#include<queue>
#include<pthread.h>using callback = void (*) (void* arg);// 任务结构体
template <typename T>
struct Task
{   Task<T>() {function = nullptr;arg = nullptr;}Task<T>(callback f, void* arg) {this->arg = (T*)arg;function = f;}callback function; // 函数指针T* arg;
};template <typename T>
class TaskQueue
{
private:std::queue <Task<T>> m_taskQ;pthread_mutex_t m_mutex; // 互斥锁
public:TaskQueue(/* args */);~TaskQueue();// 添加任务void addTask(Task<T> task);void addTask(callback f, void* arg);// 取出一个任务Task<T> takeTask();// 获取当前任务的个数inline size_t taskNumber() { // 没有if判断什么的,比较简单的直接写成内联函数比较好return m_taskQ.size();}
};

任务队列定义

#include "TaskQueue.h"template <typename T>
TaskQueue<T>::TaskQueue(/* args */)
{pthread_mutex_init(&m_mutex, NULL);
}template <typename T>
TaskQueue<T>::~TaskQueue()
{pthread_mutex_destroy(&m_mutex);
}template <typename T>
void TaskQueue<T>::addTask(Task<T> task)
{pthread_mutex_lock(&m_mutex);m_taskQ.push(task);pthread_mutex_unlock(&m_mutex);
}template <typename T>
void TaskQueue<T>::addTask(callback f, void* arg)
{pthread_mutex_lock(&m_mutex);m_taskQ.push(Task<T>(f, arg));pthread_mutex_unlock(&m_mutex);
}template <typename T>
Task<T> TaskQueue<T>::takeTask() 
{Task<T> task;pthread_mutex_lock(&m_mutex);if(!m_taskQ.empty()) {task = m_taskQ.front();m_taskQ.pop();}pthread_mutex_unlock(&m_mutex);return task;
}

线程池声明

#pragma once
#include "TaskQueue.h"
#include "TaskQueue.cpp"template <typename T>
class ThreadPool
{
public:ThreadPool(int min, int max);~ThreadPool();// 添加任务void addTask(Task<T> task);// 获取忙线程的个数int getBusyNumber();// 获取活着的线程个数int getAliveNumber();private:// 工作的线程的任务函数static void* worker(void* arg);// 管理者线程的任务函数static void* manager(void* arg);void threadExit();static const int NUMBER = 2;pthread_mutex_t mutexPool;pthread_cond_t notEmpty;pthread_t* threadIDs;pthread_t managerID;TaskQueue<T>* taskQ;int minNum;int maxNum;int busyNum;int liveNum;int exitNum;bool shutdown = false;
};

线程池定义

#include "ThreadPool.h"
#include <iostream>
#include <string.h>
#include <string>
#include <unistd.h>using namespace std;template <typename T>
ThreadPool<T>::ThreadPool(int min, int max) {// 实例化任务队列do {taskQ = new TaskQueue<T>;if(taskQ == nullptr) {cout << "malloc threadIDs fail...\n";break;}threadIDs = new pthread_t[max];if(threadIDs == nullptr) {cout << "malloc threadIDs fail....\n";break;}memset(threadIDs, 0, sizeof(pthread_t) * max);minNum = min;maxNum = max;busyNum = 0;liveNum = min; // 和最小个数相等exitNum = 0;if(pthread_mutex_init(&mutexPool, NULL) != 0 ||pthread_cond_init(&notEmpty, NULL) != 0) {cout << "mutex or condif init fail....\n";break;}shutdown = false;/*为什么要把this传给manager呢?因为manager是一个静态方法,静态方法它只能访问类里边的静态成员变量,它不能访问类的非静态成员变量。因此如果我们想要访问这些非静态成员变量,就必须要给这个静态方法传进去一个实例化对象,通过传进去的这个实例化对象来访问里边的非静态成员函数和变量*/// 创建管理者线程和工作者线程pthread_create(&managerID, NULL, manager, this); // 第三个参数为管理者线程的任务函数for(int i = 0; i < min; i++) {pthread_create(&threadIDs[i], NULL, worker, this); // 第三个参数为工作的线程的任务函数}return;} while(0);// 释放资源if(threadIDs) delete []threadIDs;if(taskQ) delete taskQ;
}template <typename T>
ThreadPool<T>::~ThreadPool() {// 关闭线程池shutdown = true;// 阻塞回收管理者线程pthread_join(managerID, NULL);// 唤醒阻塞的消费者线程for(int i = 0; i < liveNum; i++) { // 因为到了最后没有任务了,所以活着的线程都需要关闭pthread_cond_signal(&notEmpty);}// 释放堆内存if(taskQ) {delete taskQ;}if(threadIDs) {delete []threadIDs;}pthread_mutex_destroy(&mutexPool);pthread_cond_destroy(&notEmpty);
}template <typename T>
void ThreadPool<T>::addTask(Task<T> task) {if(shutdown) {return;}// 添加任务taskQ->addTask(task);pthread_cond_signal(&notEmpty); // 唤醒阻塞在条件变量里的工作的线程,即生产者生产后唤醒消费者
}template <typename T>
int ThreadPool<T>::getBusyNumber() {pthread_mutex_lock(&mutexPool);int busyNum = this->busyNum;pthread_mutex_unlock(&mutexPool);return busyNum;
}template <typename T>
int ThreadPool<T>::getAliveNumber() {pthread_mutex_lock(&mutexPool);int liveNum = this->liveNum;pthread_mutex_unlock(&mutexPool);return liveNum;
}template <typename T>
void* ThreadPool<T>::worker(void* arg) {ThreadPool* pool = static_cast<ThreadPool*>(arg); // static_cast相当于c里面的强制类型转换while(1) {pthread_mutex_lock(&pool->mutexPool); // 访问线程池之前加锁// 当前任务队列是否为空while(pool->taskQ->taskNumber() == 0 && !pool->shutdown) { // 任务队列为空并且线程池没有被关闭// 阻塞工作线程pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);// 判断是不是要销毁线程if(pool->exitNum > 0) {pool->exitNum--;if(pool->liveNum > pool->minNum) {pool->liveNum--;pthread_mutex_unlock(&pool->mutexPool); // 阻塞的时候已经获得锁,已经锁上,如果不解开就会死锁pool->threadExit();}}}// 判断线程池是否被关闭了if(pool->shutdown) {pthread_mutex_unlock(&pool->mutexPool); // 避免死锁pool->threadExit();}// 从任务队列中取出一个任务Task<T> task = pool->taskQ->takeTask();pool->busyNum++;// 因为任务队列可以无限大,所以生产者可以不用唤醒pthread_mutex_unlock(&pool->mutexPool); // 用完之后解锁printf("thread %ld start working...\n", pthread_self());task.function(task.arg);delete task.arg; // task.arg = nullptr;printf("thread %ld end working...\n", pthread_self());pthread_mutex_lock(&pool->mutexPool);pool->busyNum--;pthread_mutex_unlock(&pool->mutexPool);}return NULL;
}template <typename T>
void* ThreadPool<T>::manager(void* arg) {ThreadPool* pool = static_cast<ThreadPool*>(arg);while(!pool->shutdown) {// 每隔三秒钟检测一次sleep(3);// 取出线程池中任务的数量和当前线程的数量pthread_mutex_lock(&pool->mutexPool);int queueSize = pool->taskQ->taskNumber();int liveNumber = pool->liveNum;int busyNumber = pool->busyNum; // 取出忙的线程的数量pthread_mutex_unlock(&pool->mutexPool);// 添加线程// 任务的个数 大于 存活的线程个数 && 存活的线程数 小于 最大线程数if(queueSize > liveNumber && liveNumber < pool->maxNum) {pthread_mutex_lock(&pool->mutexPool);int count = 0;for(int i = 0; i < pool->maxNum && count < NUMBER && liveNumber < pool->maxNum; i++) {if(pool->threadIDs[i] == 0) {pthread_create(&pool->threadIDs[i], NULL, worker, pool);count++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}// 销毁线程// 忙的线程 * 2 小于 存活的线程数 && 存活的线程 大于 最小线程数// 之所以不用对pool->minNum加锁,是因为这个值是固定的,只需要读不需要写if(busyNumber * 2 < liveNumber && liveNumber > pool->minNum) {pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;pthread_mutex_unlock(&pool->mutexPool);// 让工作的线程自杀for(int i = 0; i < NUMBER; i++) {// 唤醒已经被阻塞的工作的线程,因为工作的线程无事可做,肯定是被阻塞在条件变量中pthread_cond_signal(&pool->notEmpty); }}}
}template <typename T>
void ThreadPool<T>::threadExit() {pthread_t tid = pthread_self(); // 获得当前线程的线程ID。for(int i = 0; i < maxNum; i++) {if(threadIDs[i] == tid) { // tid对应的线程要退出了threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);
}

测试

#include "ThreadPool.h"
#include "ThreadPool.cpp"
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>void taskFunc(void* arg) {int num = *(int*)arg;printf("thread %ld is working, number = %d\n", pthread_self(), num);sleep(1);
}int main() {// 创建线程池ThreadPool<int> pool(3, 10);for(int i = 0; i < 100; i++) {int* num = new int(i + 100);pool.addTask(Task<int>(taskFunc, num));}sleep(20); // 等待子线程把任务处理完毕return 0;
}

g++ main.cpp -o main -lpthread

image-20240923173739851


http://www.ppmy.cn/ops/115833.html

相关文章

Qt系列-1.Qt安装

Qt安装 0 简介 1.安装步骤 1.1 下载 进入qt中文网站:https://www.qt.io/zh-cn/ Qt开源社区版本:https://www.qt.io/download-open-source#source 1.2 安装 chmod +x qt-online-installer-linux-x64-4.8.0.run ./qt-online-installer-linux-x64-4.8.0.run 外网不能下载…

etcd三节点,其中一个坏掉了的恢复办法

一、配置etcdctl环境变量 --------------------------------------------------------------------------------------------- #其中证书实际路径和endpoints,以环境情况为准,查询方式 # ps -ef | grep etcd-cafile # ps -ef | grep etcd-servers export ETCDCTL_API3 export…

41. 如何在MyBatis-Plus中实现批量操作?批量插入和更新的最佳实践是什么?

在 MyBatis-Plus 中&#xff0c;实现批量操作&#xff08;如批量插入、批量更新&#xff09;是非常常见的需求。MyBatis-Plus 提供了对批量操作的良好支持&#xff0c;可以通过多种方式实现高效的批量处理。下面详细介绍批量操作的实现方式以及最佳实践。 1. 批量插入 批量插入…

fo-dicom,第一个基于.NET Standard 2.0 开发的DICOM开源库

1. 简介&#xff1a; fo-dicom是一个基于C#开发的库&#xff0c;用于处理DICOM&#xff08;Digital Imaging and Communications in Medicine&#xff09;格式的数据。DICOM是一种用于医学影像和相关信息的标准格式&#xff0c;广泛应用于医学领域。fo-dicom提供了多平台支持&…

日常生活中喝够水,能帮助预防多种慢性病、提高免疫力,还能改善情绪、提高认知能力!

文章目录 引言多喝水的好处为何人体中水分含量这么高?引言 感到口渴时,人体缺水程度已经在 1%~2% 了。建议大家养成随时喝、少量多次的习惯,在口渴前喝水,避免影响健康。 水分的丢失却无时无刻不在发生,即便没有大量出汗,每天通过呼吸、排泄、皮肤蒸发就能排出 1.5~2 升…

Oracle和JSON结合起来,开发者的福利

Oracle Database 23c在简化开发、应对开发发力了&#xff0c;更多拥抱开发者方面的能力&#xff0c;真的是人惊讶&#xff0c;这几天也是参加Oracle官方组织的AI专家培训&#xff0c;受益非浅&#xff0c;在这里给大家分享一下通过 ORDS操作对外暴露Http Restful服务&#xff0…

【多模态大模型】Qwen2-VL基本原理和推理部署实战

文章目录 Qwen2-VL基本原理Qwen-VL简要回顾Qwen2-VL的高级升级统一视觉处理方式原生动态分辨率处理&#xff08;非大图切分方式&#xff09;多模态旋转位置编码 Qwen2-VL推理实现|代码解析单图推理视觉信息预处理找到能被28整除的最合适size最大最小pixel数边界处理 多模态信息…

mysql怎么让字段从1开始自增?

mysql怎么让字段从1开始自增&#xff1f; 要确保一个 AUTO_INCREMENT 字段从1开始自增&#xff0c;你需要在创建表的时候指定 AUTO_INCREMENT 的起始值为1&#xff0c; 或者在表创建之后手动设置 AUTO_INCREMENT 的值。 1.创建表时指定 当你创建表的时候&#xff0c;可以直接…