linux高性能服务器-线程池实现

ops/2024/10/21 6:01:11/

文章目录

        • 定义
        • 应用场景
        • 任务类型
        • 线程数量
        • 数据结构设计:
          • 任务设计:
          • 队列设计:
          • 线程池设计
        • 接口设计

定义

线程池属于生产消费模型,管理维持固定数量线程的池式结构,避免线程频繁的创建和销毁

应用场景

当一类任务耗时,严重影响当前线程处理其他任务,异步执行

任务类型

耗时任务:

  • CPU密集型
  • IO密集型 ( 网络IO 磁盘IO)
线程数量

n * proc

数据结构设计:
任务设计:
typedef struct task_s {void * next;handler_pt func;void * arg;
} task_t;

生产者线程: 发布任务
消费者线程: 取出任务,执行任务
数据结构为链表

队列设计:

typedef struct task_queue_s {void * head;void **tail;int block; // 是否阻塞 spinlock_t lock; // 自选锁pthread_muxtex_t mutex;pthread_cond_t cond;
}task_queue_t;

队列: 存储任务,调度线程池 ,双端开口,先进先出,在多线程中执行,需要加锁
功能: 调取线程池中的消费者线程, 如果此时队列为空,谁(线程)来取任务,谁阻塞休眠
当允许一个进程进入临界资源(互斥状态)。
自旋锁: 其他线程空转cpu,一直等待进入临界资源
互斥锁:切换cpu, 让出执行权, 线程阻塞住,操作系统调用其他的线程

某个线程持有临界资源的时间 < 线程切换的时间 , 自旋锁 ,时间复杂度为0(1)
生产者新增任务,消费者取出任务 ,0(1),均为移动指针完成(尾插法,头插法)
故使用自旋锁 spinlock_t lock

线程池设计
struct thredpool_t {atomic_int quit;  // 原子变量int thrd_count;pthread_t * threads;task_queue_t task_queue;
};

原子操作:一个线程在执行过程中,其他线程不能执行这个线程的内部操作,只能看到线程执行前或者执行后
应用场景: 某一个基础类型给的变量

接口设计
static task_queue_t * __taksqueue_create() {task_queue_t  * queue = malloc(sizeof(*queue));int ret;ret = pthrad_mutex_init(&mutex);if(ret == 0) {ret = pthread_cond_init(&cond);if(ret == 0) {queue->head = NULL;queue->tail = &(queue->head);queue->block = 1;return queue;}pthread_cond_destory(&queue);}pthread_muext_destory(&queue->mutex);return NULL;
}static void __add_task(task_queue_t  * queue, void * task) {void **link = (void **)task; // malloc*link = NULL; // task->next = NULL;spinlock_lock(&queue->lock);*queue->tail = link; // 末尾添加新的节点queue->tail = link // tail 指向新的尾节点spinlock_unlock(&qeuue->lock);pthread_cond_signal(&queue->cond); // 有任务,唤醒休眠的线程
}static task_t * __pop__task(task_queue_t * queue) {spinlock_lock(&queue->lock);if(queue->head) {spinlock_unclock(&queue->lock);return NULL;}task_t *task;task = queue->head;queue->head = task->next;if(queue-head == NULL) {queue->tail = &queue->head; // &NULL}spinlock_unlock(&queue->lock);return task;
} static void * __get_task (task_queue_t *queue) {task_t *task;while((task = __pop_task(queue))== NULL) {pthread_mutex_lock(&queue->lock);if(queue->block == 0) {rerurn NULL;}// pthread_cond_wait 执行过程:// 1. 先unlock(&mutex)// 2. cond 休眠// 3, 生产者 发送signal// 4. cond 唤醒// 5. 既然上clock(&mutex)pthead_cond_wait(&queue->cond,&queue->mutex);  //休眠pthread_mutex_unlock(&queue->mutex);} return task;
}static void __taskqueue_destroy(task_queue_t * queue) {task_t *task;while((task) = _pop_task(queue)) {free(ptr:task);}spinlock_destroy(&queue->lock);pthread_cond_destory(&queue->cond);pthread_mutex_destory(&queue->mutex);free(ptr:queue);
}// 消费者线程 ,取出任务,执行任务
static void * __thrdpoll_worker(void *arg) {thrdpool_t *pool = (thrdpool *)arg;task_t *task;void *ctx;while(atomic_load(&pool->quit) == 0) {  //原子读task = (task *) __get_task(poll->task_queue);if(!task) {break;}handler_pt func = task->func;ctx = taks->arg;free(task);func(ctx);}return NULL;
}
// 设置队列为非阻塞,并唤醒所有的线程
static void __nonblock(task_queue_t *queue) {pthread->mutex_lock =&queue->lock;queue->block = 0;pthread_mutex_unclock(&queue->mutex);pthread_cond_broadcast(queue->cond); 
}
// 创建线程,回滚式创建对象
static int __threads_create(thrdpool * pool, size_t thrd_count) {pthread_attr_t attr;int ret;ret = pthread_attr_init(&attr); //初始化线程参数if (ret == 0) {pool_>threads = (pthread_t *)malloc(sizeof(pthread_t) * thrd_count);if(pool_threads) {int i = 0;for(;i < thrd_count; i++) {if(pthread_create(&pool->threads[i),&attr,start_routine(),NULL);break; // 创建线程失败,返回}pool->thrd_count  = i; pthread_attr_destory(&attr);if( i == thrd_count)reurn 0;__threads_terminante(pool); // 如果创建的线程数量不等于thrd_count,把创建的线程全部销毁free(pool->threads); // 释放堆空间}}return ret;
}// 创建线程池
static thrdpool_t *  thrdpool_create(int thrd_count) {thrdpool_t * pool;poll = (thrdpool_t *)malloc(sizeof(poll);if(!pool) return NULL;task_queue_t  *queue = __taskqueue_create();if(queue) {pool->task_queue = queue;atomic_init(&pool->quit, 0);if(__threads_create(pool,thrd_count) == 0 ) {return pool;}__taskqueue_destory(pool->taks_queue);}free(pool);return NULL;
}static void __threads_terminate(thrdpool_t * pool) {atomic_store(&queue->quit,1); //原子写__nonblock(pool->task_queue); // 设置非阻塞队列,唤醒所有的线程int i;for(i=0; i<pool->thrd_count;i++) {pthread_join(pool->thread[i],NULL);}
}// 生产者创建任务
static int thrdpool_post(thrdpool_t  * pool, handler_pt func, void *arg) {if (atomic_load(&pool->quit) == 1 ) {  //判断线程池是否标记退出return -1;}task * task = (task_t *)malloc(sizeof(task_t));if(!task)  return -1;task->func = func; // 初始化task->arg = arg;__add_task(pool->task_queue,task); // 添加任务return 0;
}//等待所有线程结束,释放资源
static thrdpool_wait(thrdpool_t *pool) {int i;for(i=0; i<poll->thrd_count;i++) {pthread_join(pool->thread[i],NULL);}__taskqueue_destory(pool->taks_queue);free(pool->threads);free(pool);
}

http://www.ppmy.cn/ops/37618.html

相关文章

【C++随记4】C++二进制位操作运算符

在C中&#xff0c;二进制位操作运算符允许你直接对整数类型的变量的位进行操作。这些运算符包括&#xff1a; 按位与&#xff08;Bitwise AND&#xff09;: & 按位或&#xff08;Bitwise OR&#xff09;: | 按位异或&#xff08;Bitwise XOR&#xff09;: ^ 按位取反&…

docker 方式 elasticsearch 8.13 简单例子

安装 docker 虚拟机安装 elastic search 安装本地 # 创建 elastic 的网络 docker network create elastic # 用镜像的方式创建并启动容器 docker run -d --name es --net elastic -p 9200:9200 -p 9300:9300 -e "discovery.typesingle-node" -e "xpack.secur…

【零基础】system generator①设置卡解析

1.在matlab中我们输入的是双精度浮点型数据&#xff0c;经过gateway后变成定点型。十六位十四个小数位&#xff0c;整个数据有十六位&#xff0c;其中十四位给了小数 2.fixed-point定点型&#xff1b;signed有符号&#xff1b;2’s comp补码 3.量化误差 truncate&#xff0c;舍…

【BUUCTF】Crypto_RSA(铜锁/openssl使用系列)

【BUUCTF】Crypto_RSA&#xff08;铜锁/openssl使用系列&#xff09; 1、题目 在一次RSA密钥对生成中&#xff0c;假设p473398607161&#xff0c;q4511491&#xff0c;e17 求解出d作为flga提交 2、解析 RSA加密过程&#xff1a; 1&#xff09;选择素数&#xff1a;选择两个不…

24上软考5月新通知:今年可能是最简单的一次

备考24上软考的小伙伴注意啦&#xff01;软考新变动&#xff0c;今年软考高级科目通过率&#xff0c;可能会大增!!! 根据辽宁省软考办最新通知&#xff1a; 一、考试时间&#xff1a;5月25日-26日 解读&#xff1a;早前官方公告2024上半年软考考试时间为5月25日-28日&#xff…

八股Day1 集合

1.List Map Set的区别 2.说下集合继承关系 3.Iterator是什么&#xff1f;基本方法&#xff1f; 4.哪些集合线程不安全&#xff1f;怎么解决的&#xff1f; 5.ArrlyList和Vector的区别 6.ArrlyList和LinedList的区别 7.说下ArrayList的扩容机制 8.Set接口中 Comparator和Compara…

用HAL库改写江科大的stm32入门例子_9-1 串口发送

设置串口&#xff1a; 选项说明&#xff1a; 写一个串口发送函数&#xff1a; // serial send string function void serial_send_string(char *str) {HAL_UART_Transmit(&huart1, (uint8_t *)str, strlen(str), 1000); } main函数中调用发送信息&#xff1a; uint8_t dat…

ROS服务器通信

目录 一、角色 二、流程 注意 三、例子描述 四、srv文件 编译配置文件 vscode配置 五、Server.cpp编写例子 编写CMakeList 六、观察server的效果 七、Client编写例子 编写CMakeList 八、观察Client的结果 九、Client优化&#xff08;动态输入&#xff09; 了解argc…