在Linux系统中玩线程,使用pthread,这篇博客记录如何创建线程和使用线程和线程的同步与互斥。
还有一份nginx线程池的代码供大家阅读学习!
目录
一、简介
什么是线程
线程的优点、缺点
线程的应用场合
二、线程的使用
1. 创建线程 - pthread_create
2. 线程的终止 - pthread_exit
3. 等待指定线程结束 - pthread_join
4. 父线程与子线程同时执行 - pthread_detach
5. 获取线程id - pthread_self
6. 比较线程id - pthread_equal
7. 杀死线程 - pthread_cancel
8. 使用线程程序的编译
9. 例
例1
例2
三、线程 同步 与 互斥
1. 信号量
1). 什么是信号量
2). 信号量的初始化 - sem_init
3). 信号量的P操作 - sem_wait
4). 信号量的V操作
5). 信号量的删除 - sem_destroy
6). 例
7). 练习
2. 互斥量
1). 什么是互斥量
2). 互斥量的初始化 - pthread_mutex_init
3). 互斥量初始化第二个参数用法 pthread_mutexattr_t *attr
4). 互斥量的获取 - pthread_mutex_lock
5). 互斥量的释放
6). 互斥量的删除 - pthread_mutex_destroy
7). 例一
8). 例二
四、线程的 条件变量
1. 条件变量初始化 - pthread_cond_init
2. 唤醒一个等待线程 - pthread_cond_signal
3. 唤醒所有等待该条件变量的线程 - pthread_cond_broadcast
4. 等待条件变量 | 超时被唤醒 - pthread_cond_timedwait
5. 等待条件变量被唤醒 - pthread_cond_wait
6. 释放/销毁条件变量 - pthread_cond_destroy
7. 例
五、线程池
thread.h
thread_cond.c
thread_mutex.c
thread_pool.h
thread_pool.c
测试:main.c
Makefile
六、总结
一、简介
简单说一下概念。
进程是包含线程的,一个进程可以有多个线程,多个线程(或一个)组合成一个进程。
线程是CPU调度和分派的基本单位,进程是分配资源的基本单位。
一个应用程序就是一个进程,例如运行一个QQ,运行一个微信等等,都是一个进程。
在QQ里面,我一边接收文件,一遍与别人聊天,这就是两个线程在同时运行!
为什么使用线程
- 使用fork创建进程以执行新的任务,该方式的代价很高;
- 多个进程间不会直接共享内存;
- 线程是进程的基本执行单元,一个进程的所有任务都在线程中执行,进程要想执行任务,必须得有线程,进程至少要有一条线程,程序启动会默认开启一条线程,这条线程被称为主线程或 UI 线程。
什么是线程
线程,是进程内部的一个控制序列。
即使不使用线程,进程内部也有一个执行线程。
注意:单核处理器上,同一个时刻只能运行一个线程。
但是对于用户而言,感觉如同同时执行了多个线程一样(各线程在单核CPU上切换,在一段时间内,同时执行了多个线程);
线程的优点、缺点
优点: 创建线程比创建进程,开销要小。
缺点:
1)多线程编程,需特别小心,很容易发生错误;
2)多线程调试很困难;
3)把一个任务划分为两部分,用两个线程在单处理器上运行时,不一定更快;
除非能确定这两个部分能同时执行、且运行在多处理器上。
线程的应用场合
1) 需要让用户感觉在同时做多件事情时, 比如,处理文档的进程,一个线程处理用户编辑,一个线程同时统计用户的字数;
2) 当一个应用程序,需要同时处理输入、计算、输出时,可开3个线程,分别处理输入、计算、输出;让用户感觉不到等待。
3) 高并发编程。
为什么使用多线程
1. 避免阻塞
大家知道,单个进程只有一个主线程,当主线程阻塞的时候,整个进程也就阻塞了,无法再去做其它的一些功能了。
2. 避免CPU空转
应用程序经常会涉及到RPC,数据库访问,磁盘IO等操作,这些操作的速度比CPU慢很多,而在等待这些响应时,CPU却不能去处理新的请求,导致这种单线程的应用程序性能很差。
3. 提升效率
一个进程要独立拥有4GB的虚拟地址空间,而多个线程可以共享同一地址空间,线程的切换比进程的切换要快得多。
二、线程的使用
1. 创建线程 - pthread_create
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
描述:创建一个新的线程;
参数
thread
指向新线程的标识符;
attr
用来设置新线程的属性;一般取默认属性,该参数取NULL即可;
start_routine
该线程的处理函数,该函数的返回值类型和参数类型都是 void *;
arg
线程处理函数,参数三(start_routine)的参数;
返回值
成功:返回0;
失败:返回非0错误编号,并且 *thread 的内容没有定义;
注意:使用fork创建进程后,进程马上就启动,是和父进程同时执行fork后的代码。
使用pthread_create创建线程后,新线程马上就启动,即执行对应的线程处理函数。
例:
// 线程执行函数
void *my_thread_handle(void *arg) {// ***
}pthread_t mythread;
int arg = 100;
int ret;// 创建线程
ret = pthread_create(&mythread, 0, my_thread_handle, &arg);
if (0 != ret) {printf("create thread failed!\n");exit(1);
}
2. 线程的终止 - pthread_exit
#include <pthread.h>
void pthread_exit(void *retval);
描述:在线程执行函数内部调用该函数,终止线程;
参数
retval
线程终止后给pthread_join函数返回的一个值,不能返回局部变量;
返回值
此函数不返回给调用者;
3. 等待指定线程结束 - pthread_join
#include <pthread.h>
int pthread_join(pthread_t thread, void **retval);
描述:父线程等待子线程结束;
参数
thread
指向线程的标识符;
retval
指向该线程函数的返回值,线程函数的返回值类型为void*,所以该参数的类型为void**;
返回值
成功:返回0;
失败:返回一个非0错误编号;
例:
void *thread_return;int ret = pthread_join(mythread, &thread_return);
if (0 != ret) {printf("pthread_join failed!\n");exit(2);
} printf("wait thread end, return value is %d\n", *((int*)thread_return));
4. 父线程与子线程同时执行 - pthread_detach
#include <pthread.h>
int pthread_detach(pthread_t thread);
描述:不等待子线程结束,父线程继续往下执行;如果父线程结束了,子线程也会结束;
参数
thread
指向线程的标识符;
返回值
成功:返回0;
失败:返回一个非0错误编号;
例:
int ret = pthread_detach(mythread);
if (0 != ret) {printf("pthread_detach failed!\n");exit(2);
}
5. 获取线程id - pthread_self
#include <pthread.h>
pthread_t pthread_self(void);
描述:获取调用线程的ID;在父线程调用则返回父线程id,在子线程调用则返回子线程id;
返回值
这个函数总是成功的,返回调用线程的ID;
6. 比较线程id - pthread_equal
#include <pthread.h>
int pthread_equal(pthread_t t1, pthread_t t2);
描述:比较两个线程id是否相等;
参数
t1
需要比较的第一个线程id;
t2
需要比较的第二个线程id;
返回值
相等:返回非0值;
不相等:返回0;
7. 杀死线程 - pthread_cancel
#include <pthread.h>
int pthread_cancel(pthread_t thread);
描述:向线程发送取消请求;强制让子线程停止执行;
参数
thread
指向线程的标识符;
返回值
成功:返回0;
失败:返回一个非0错误编号;
例:
int ret = pthread_cancel(mythread);
if (0 != ret) {printf("pthread_cancel failed!\n");exit(2);
}
8. 使用线程程序的编译
1). 编译时,加上 -D_REENTRANT
功能:告诉编译器,编译时需要可重入功能。
即使得,在编译时,编译部分函数的可重入版本。
注:在单线程程序中,整个程序都是顺序执行的,一个函数在同一时刻只能被一个函数调用,但在多线程中,由于并发性,一个函数可能同时被多个函数调用,此时这个函数就成了临界资源,很容易造成调用函数处理结果的相互影响,如果一个函数在多线程并发的环境中每次被调用产生的结果是不确定的,我们就说这个函数是"不可重入的"/"线程不安全"的。
2). 编译时,指定线程库,加上 -pthread
功能:使用系统默认的NPTL线程库,
即在默认路径中寻找库文件libpthread.so
默认路径为/usr/lib和/usr/local/lib
3). 总结,一般使用如下形式即可:
gcc pthread2.cpp -D_REENTRANT -pthread -o a1
9. 例
例1
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>int my_global;void *my_thread_handle(void *arg) {int val;val = *((int*)arg);printf("new thread begin, arg = %d\n", val);my_global += val;sleep(3);// 线程结束pthread_exit(&my_global);// 这句代码不会执行printf("new thread end!\n");
}int main(void) {pthread_t mythread;int arg;int ret;void *thread_return;arg = 100;my_global = 1000;printf("my_global = %d\n", my_global);printf("ready create thread ...\n");// 创建线程ret = pthread_create(&mythread, 0, my_thread_handle, &arg);if (0 != ret) {printf("create thread failed!\n");exit(1);}printf("wait thread finished...\n");// 等待子线程结束ret = pthread_join(mythread, &thread_return);if (0 != ret) {printf("pthread_join failed!\n");exit(2);} printf("wait thread end, return value is %d\n", *((int*)thread_return));printf("my_global = %d\n", my_global);printf("create thread finished!\n");return 0;
}
例2
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>void *my_thread_handle(void *arg) {for (int i = 0; i < 10; i++) {printf("i = %d\n", i);sleep(1);}
}int main(void) {pthread_t mythread;int ret;void *thread_return;// 创建线程ret = pthread_create(&mythread, 0, my_thread_handle, 0);if (0 != ret) {printf("create thread failed!\n");exit(1);}ret = pthread_detach(mythread);if (0 != ret) {printf("pthread_detach failed!\n");exit(2);} sleep(3);printf("thread end.\n");return 0;
}
因为父线程结束了,所以,即使子线程还没结束,也得强制结束子线程,然后结束程序!
三、线程 同步 与 互斥
1)线程的互斥 - 指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。
2)线程的同步 - 指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。
1. 问题
同一个进程内的各个线程,共享该进程内的全局变量,如果多个线程同时对某个全局变量进行访问时,就可能导致竞态。
解决办法: 对临界区使用信号量、或互斥量。
2. 信号量和互斥量的选择。
对于同步和互斥,使用信号量或互斥量都可以实现。
使用时,选择更符合语义的手段:
如果要求最多只允许一个线程进入临界区,则使用互斥量
如果要求多个线程之间的执行顺序满足某个约束,则使用信号量
1. 信号量
1). 什么是信号量
此时所指的“信号量”是指用于同一个进程内多个线程之间的信号量。
即POSIX信号量,而不是System V信号量(用于进程之间的同步)
用于线程的信号量的原理,与用于进程之间的信号量的原理相同。
都有P操作、V操作。
信号量的表示:sem_t 类型
2). 信号量的初始化 - sem_init
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
描述:对信号量进行初始化;
参数
sem
指向被初始化的信号量;
pshared
0:表示该信号量是该进程内使用的“局部信号量”, 不再被其它进程共享。
非0:该信号量可被其他进程共享,Linux不支持这种信号量
value
信号量的初始值,>= 0;
返回值
成功:返回0;
失败:返回-1,并设置错误标志errno;
例:
sem_t sem; // 信号量标识符,全局变量// 初始化信号量
int ret = sem_init(&sem, 0 , 0); // 信号量值为1,第三个参数控制
if (0 != ret) {printf("sem_init failed!\n");exit(2);
}
3). 信号量的P操作 - sem_wait
#include <semaphore.h>
int sem_wait(sem_t *sem);
描述:执行P操作,锁定一个信号量;
参数
sem
信号量标识符;
返回值
成功:返回0;
失败:返回-1,并设置错误标志errno;
例:
// P操作
if (sem_wait(&sem) != 0) {printf("sem_wait failed!\n");exit(1);
}
4). 信号量的V操作
#include <semaphore.h>
int sem_post(sem_t *sem);
描述:执行V操作,解锁信号量;
参数
sem
信号量标识符;
返回值
成功:返回0;
失败:返回-1,并设置错误标志errno;
例:
// V操作
if (sem_post(&sem) != 0) {printf("sem_post failed!\n");exit(1);
}
5). 信号量的删除 - sem_destroy
#include <semaphore.h>
int sem_destroy(sem_t *sem);
描述:销毁信号量;
参数
sem
信号量标识符;
返回值
成功:返回0;
失败:返回-1,并设置错误标志errno;
例:
// 删除信号量
int ret = sem_destroy(&sem);
if (0 != ret) {printf("sem_destroy failed!\n");exit(1);
}
6). 例
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>#define BUFF_SIZE 1024char buff[BUFF_SIZE] = { '\0' };
sem_t sem; // 信号量标识符static void *str_thread_handle(void *arg) {while(1) {// P操作if (sem_wait(&sem) != 0) {printf("sem_wait failed!\n");exit(1);}printf("string is : %s len = %ld\n", buff, strlen(buff));if (strncmp(buff, "end", 3) == 0) {break;}}pthread_exit(NULL);
}int main(int argc, char **argv) {int ret;pthread_t str_thread;void *thread_return;// 初始化信号量ret = sem_init(&sem, 0 , 0); // 信号量值为1,第三个参数控制if (0 != ret) {printf("sem_init failed!\n");exit(2);}// 创建线程ret = pthread_create(&str_thread, 0, str_thread_handle, NULL);if (0 != ret) {printf("pthread_create failed!\n");exit(3);}while (1) {// 输入一行字符串fgets(buff, sizeof(buff), stdin);// V操作if (sem_post(&sem) != 0) {printf("sem_post failed!\n");exit(4);}if (strncmp(buff, "end", 3) == 0) {break;}}// 父线程等待子线程ret = pthread_join(str_thread, &thread_return);if(0 != ret) {printf("pthread_join failed!\n");exit(5);}// 删除信号量ret = sem_destroy(&sem);if (0 != ret) {printf("sem_destroy failed!\n");exit(6);}return 0;
}
7). 练习
创建2个线程(共有主线程、线程1、线程2共3个线程);
主线程阻塞式等待用户输入字符串;
主线程每接收到一个字符串之后, 线程1就马上对该字符串进行处理;
线程1的处理逻辑为:统计该字符串的个数,并记录当时的时间;
线程1把该字符串处理完后,线程2马上就把处理结果写入文件result.txt;
直到用户输入exit.
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <time.h>#define BUFF_SIZE 1024char buff[BUFF_SIZE] = { '\0' };
sem_t sem; // 信号量标识符
int count = 0;
char time_[BUFF_SIZE] = { '\0' };static void *str_thread_handle1(void *arg) {while(1) {// P操作if (sem_wait(&sem) != 0) {printf("sem_wait failed!\n");exit(1);}count = strlen(buff);time_t timep; time (&timep); sprintf(time_, "%s", asctime( gmtime(&timep) ));// V操作if (sem_post(&sem) != 0) {printf("sem_post failed!\n");exit(4);}if (strncmp(buff, "exit", 4) == 0) {break;}}pthread_exit(NULL);
}static void *str_thread_handle2(void *arg) {FILE *fp = 0; //定义文件指针fp// 以只读的方式打开文件 if ( (fp = fopen("./result.txt","a")) == 0 ) {printf("文件打开失败\n");return (void*)-1;}while(1) {// P操作if (sem_wait(&sem) != 0) {printf("sem_wait failed!\n");exit(1);}char str[BUFF_SIZE] = { '\0' };sprintf(str, "内容:%s大小:%d \t 时间:%s\n", buff, count, time_);if (strncmp(buff, "exit", 4) == 0) {break;} // 写入文件fwrite(str, strlen(str), 1, fp);}fclose(fp);pthread_exit(NULL);
}int main(int argc, char **argv) {int ret;pthread_t str_thread1;pthread_t str_thread2;void *thread_return;// 初始化信号量ret = sem_init(&sem, 0 , 0); // 信号量值为1,第三个参数控制if (0 != ret) {printf("sem_init failed!\n");exit(2);}// 创建线程1ret = pthread_create(&str_thread1, 0, str_thread_handle1, NULL);if (0 != ret) {printf("pthread_create failed!\n");exit(3);}// 创建线程2ret = pthread_create(&str_thread2, 0, str_thread_handle2, NULL);if (0 != ret) {printf("pthread_create failed!\n");exit(3);}while (1) {// 输入一行字符串fgets(buff, sizeof(buff), stdin);// V操作if (sem_post(&sem) != 0) {printf("sem_post failed!\n");exit(4);}if (strncmp(buff, "exit", 4) == 0) {break;}}// 父线程等待子线程ret = pthread_join(str_thread1, &thread_return);if(0 != ret) {printf("pthread_join failed!\n");exit(5);}ret = pthread_join(str_thread2, &thread_return);if(0 != ret) {printf("pthread_join failed!\n");exit(5);}// 删除信号量ret = sem_destroy(&sem);if (0 != ret) {printf("sem_destroy failed!\n");exit(6);}return 0;
}
2. 互斥量
1). 什么是互斥量
效果上等同于初值为1的信号量
互斥量的使用:类型为 pthread_mutex_t
2). 互斥量的初始化 - pthread_mutex_init
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
描述:初始化互斥对象;
参数
mutex
指向被初始化的互斥量;
attr
指向互斥量的属性,一般取默认值(当一个线程已获取互斥量后,该线程再次获取该信号量,将导致死锁!);
返回值
成功:返回0;
失败:返回非0错误码;
例:
pthread_mutex_t lock; // 定义互斥锁,全局变量// 初始化互斥锁
int ret = pthread_mutex_init(&lock, 0);
if (0 != ret) {printf("pthread_mutex_ini failed!\n");exit(1);
}
3). 互斥量初始化第二个参数用法 pthread_mutexattr_t *attr
将attr值设置为:PTHREAD_MUTEX_ERRORCHECK
即如果使用给互斥锁上锁两次,第二次上锁时就会报错!
例:可以先看完下面的用法之后再返回来看这里的代码
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>pthread_mutex_t lock; // 定义互斥锁static void *str_thread_handle(void *arg) {// do something...// 退出子线程pthread_exit(NULL);
}int main(int argc, char **argv) {int ret; pthread_t tid; // the thread identifier void *thread_return;pthread_mutexattr_t attr;// 初始化属性ret = pthread_mutexattr_init(&attr);if (ret != 0) {fprintf(stderr, "pthread_mutexattr_init() failed, reason: %s\n",strerror(errno));exit(1);}// 设置如果给互斥锁上锁两次,就会报错!ret = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);if (ret != 0) {fprintf(stderr, "pthread_mutexattr_settype(PTHREAD_MUTEX_ERRORCHECK) failed, reason: %s\n",strerror(errno));exit(1);}// 初始化互斥锁ret = pthread_mutex_init(&lock, &attr);if (0 != ret) {printf("pthread_mutex_ini failed!\n");exit(1);} else {// 销毁属性err = pthread_mutexattr_destroy(&attr);if (err != 0) {fprintf(stderr,"pthread_mutexattr_destroy() failed, reason: %s\n",strerror(errno));}} // 创建线程ret = pthread_create(&tid, 0, str_thread_handle, 0);if (0 != ret) {printf("pthread_create failed!\n");exit(2);}// 第一次上锁ret = pthread_mutex_lock(&lock);if (0 != ret) {printf("pthread_mutex_lock failed!\n");exit(1);}printf("-----------------------------------start two lock.\n");// 没有解锁,进行第二次上锁ret = pthread_mutex_lock(&lock); // 这里会返回错误if (0 != ret) {printf("pthread_mutex_lock failed!\n");exit(1);}// 等待子线程ret = pthread_join(tid, &thread_return);if (0 != ret) {printf("pthread_join failed!\n");exit(3);}// 销毁互斥锁ret = pthread_mutex_destroy(&lock);if (0 != ret) {printf("pthread_mutex_destroy failed!\n");exit(4);}return 0;
}
4). 互斥量的获取 - pthread_mutex_lock
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
描述:锁定互斥量;
参数
mutex
定义的全局互斥量;
返回值
成功:返回0;
失败:返回非0错误码;
例:
// 上锁
int ret = pthread_mutex_lock(&lock);
if (0 != ret) {printf("pthread_mutex_lock failed!\n");exit(1);
}
5). 互斥量的释放
#include <pthread.h>
int pthread_mutex_unlock(pthread_mutex_t *mutex);
描述:解锁互斥锁;
参数
mutex
定义的全局互斥量;
返回值
成功:返回0;
失败:返回非0错误码;
例:
// 解锁
int ret = pthread_mutex_unlock(&lock);
if (0 != ret) {printf("pthread_mutex_unlock failed!\n");exit(1);
}
6). 互斥量的删除 - pthread_mutex_destroy
#include <pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex);
描述:销毁互斥对象;
参数
mutex
定义的全局互斥量;
返回值
成功:返回0;
失败:返回非0错误码;
例:
// 销毁互斥锁
int ret = pthread_mutex_destroy(&lock);
if (0 != ret) {printf("pthread_mutex_destroy failed!\n");exit(1);
}
7). 例一
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>int global_value = 0;
pthread_mutex_t lock; // 定义互斥锁static void *str_thread_handle(void *arg) {int i = 0;for (i = 0; i < 10; i++) {// 上锁pthread_mutex_lock(&lock);sleep(1);global_value++;printf("【Child pthread】 :global_value = %d\n", global_value); // 解锁pthread_mutex_unlock(&lock); sleep(1);}// 退出子线程pthread_exit(NULL);
}int main(int argc, char **argv) {int ret; pthread_t tid; // the thread identifier void *thread_return;int i;// 初始化互斥锁ret = pthread_mutex_init(&lock, 0);if (0 != ret) {printf("pthread_mutex_ini failed!\n");exit(1);}ret = pthread_create(&tid, 0, str_thread_handle, 0);if (0 != ret) {printf("pthread_create failed!\n");exit(2);}for (i = 0; i < 10; i++) {// 上锁ret = pthread_mutex_lock(&lock);if (0 != ret) {printf("pthread_mutex_lock failed!\n");exit(1);}sleep(1);global_value++;printf("【Father pthread】 :global_value = %d\n", global_value);// 解锁ret = pthread_mutex_unlock(&lock); if (0 != ret) {printf("pthread_mutex_unlock failed!\n");exit(1);} sleep(1);}// 等待子线程ret = pthread_join(tid, &thread_return);if (0 != ret) {printf("pthread_join failed!\n");exit(3);}// 销毁互斥锁ret = pthread_mutex_destroy(&lock);if (0 != ret) {printf("pthread_mutex_destroy failed!\n");exit(4);}return 0;
}
8). 例二
主线程输入字符串,子线程统计字符串个数
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#include <errno.h>void *thread_function(void *arg);
pthread_mutex_t work_mutex; // 定义互斥锁#define WORK_SIZE 1024
char work_area[WORK_SIZE] = { '\0' };int main(int argc, char **argv) {int res;pthread_t a_thread;void *thread_result;// 初始化互斥锁res = pthread_mutex_init(&work_mutex, NULL);if (0 != res) {perror("Mutex initialization failed!\n");exit(EXIT_FAILURE);}// 创建线程res = pthread_create(&a_thread, NULL, thread_function, NULL);if (0 != res) {perror("Thread creation failed!\n");exit(EXIT_FAILURE);}printf("Input some text. Enter 'end' to finish!\n");while (1) {// 上锁pthread_mutex_lock(&work_mutex);fgets(work_area, WORK_SIZE, stdin); // 输入一行字符串if (strncmp("end", work_area, 3) == 0) {break;}// 解锁pthread_mutex_unlock(&work_mutex);sleep(1);}// break 后需要解锁pthread_mutex_unlock(&work_mutex);printf("\nWaiting for thread to finish...\n");// 等待子线程res = pthread_join(a_thread, &thread_result);if (0 != res) {perror("Thread join failed!\n");exit(EXIT_FAILURE);}printf("Thread joined!\n");// 销毁互斥锁res = pthread_mutex_destroy(&work_mutex);if (0 != res) {perror("pthread_mutex_destroy failed!\n");exit(EXIT_FAILURE);}exit(EXIT_SUCCESS);
}void *thread_function(void *arg) {sleep(1);while (1) {// 上锁pthread_mutex_lock(&work_mutex);if (strncmp("end", work_area, 3) == 0) {break;}printf("Uou input %ld characters\n", strlen(work_area) - 1);// 解锁pthread_mutex_unlock(&work_mutex);sleep(1);}// break 后需要解锁pthread_mutex_unlock(&work_mutex);pthread_exit(0);
}
四、线程的 条件变量
与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。
条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使"条件成立"(给出条件成立信号)。
条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。如果两进程共享可读写的内存,条件变量可以被用来实现这两进程间的线程同步。
安装文档手册命令:apt-get install manpages-posix-dev
1. 条件变量初始化 - pthread_cond_init
#include <pthread.h>
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
描述:初始化条件变量;
参数
cond
条件变量指针;
attr
条件变量高级属性,设置为NULL即可;
返回值
成功:返回0;
失败:返回一个错误数字;
例:
pthread_cond_t cond; // 条件变量,全局int ret = pthread_cond_init(&cond, NULL);
if (0 != ret) {printf("pthread_cond_init failed!\n");exit(1);
}
2. 唤醒一个等待线程 - pthread_cond_signal
#include <pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);
描述:发出条件信号;通知条件变量,唤醒一个等待的线程;
参数
cond
条件变量指针;
返回值
成功:返回0;
失败:返回一个错误数字;
例:
int ret = pthread_cond_signal(&cond);
if (0 != ret) {printf("pthread_cond_signal failed!\n");exit(1);
}
3. 唤醒所有等待该条件变量的线程 - pthread_cond_broadcast
#include <pthread.h>
int pthread_cond_broadcast(pthread_cond_t *cond);
描述:广播条件变量;即所有等待的线程都可以收到;
参数
cond
条件变量指针;
返回值
成功:返回0;
失败:返回一个错误数字;
例:
int ret = pthread_cond_broadcast(&cond);
if (0 != ret) {printf("pthread_cond_broadcast failed!\n");exit(1);
}
4. 等待条件变量 | 超时被唤醒 - pthread_cond_timedwait
#include <pthread.h>
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
描述:等待条件变量被唤醒;或者绝对时间abstime到了唤醒该线程;
参数
cond
条件变量指针;
mutex
互斥量指针;
abstime
等待被唤醒的绝对超时时间;
struct timespec {
time_t tv_sec; // seconds
long tv_nsec; // and nanoseconds
};
返回值
成功:返回0;
失败:返回一个错误数字;
5. 等待条件变量被唤醒 - pthread_cond_wait
#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
描述:等待条件变量cond被唤醒(由一个信号或这广播唤醒);
参数
cond
条件变量指针;
mutex
互斥量指针;
返回值
成功:返回0;
失败:返回一个错误数字;
例:
pthread_mutex_t mutex; // 互斥锁,全局变量
pthread_cond_t cond; // 条件变量,全局变量int ret = pthread_cond_wait(&cond, &mutex);
if (0 != ret) {printf("pthread_cond_wait failed!\n");exit(1);
}
为什么需要传入互斥量指针呢?
pthread_cond_wait 执行过程:
1). 挂起当前线程;
2). 使用互斥量进行解锁;
3). 当有信号通知后;
4). 唤醒线程;
5). 使用互斥量进行上锁;
6). 才会往下执行其他代码;
由此可知,有多个线程执行等待时,是可以同时等待的,但如果只有一个信号过来,也就只能有一个线程被唤醒;注意,即使使用pthread_cond_broadcast 广播发送信号去唤醒全部线程,线程的执行也只能一个一个的去执行,因为有互斥量的缘故!
6. 释放/销毁条件变量 - pthread_cond_destroy
#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond);
描述:销毁条件变量;
参数
cond
条件变量指针;
返回值
成功:返回0;
失败:返回一个错误数字;
例:
int ret = pthread_cond_destroy(&cond);
if (0 != ret) {printf("pthread_cond_destroy failed!\n");exit(1);
}
7. 例
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>pthread_mutex_t mutex; // 互斥锁
pthread_cond_t cond; // 条件变量void *thread1(void *arg) {while (1) {printf("thread1 is running\n");// 上锁pthread_mutex_lock(&mutex);// 条件变量,等待信号触发往下执行pthread_cond_wait(&cond, &mutex);printf("thread1 applied the condition\n\n");// 解锁pthread_mutex_unlock(&mutex);sleep(4);}
}void *thread2(void *arg) {while (1) {printf("thread2 is running\n");// 上锁pthread_mutex_lock(&mutex);// 条件变量,等待信号触发往下执行pthread_cond_wait(&cond, &mutex);printf("thread2 applied the condition\n\n");// 解锁pthread_mutex_unlock(&mutex);sleep(2);}
}int main(int argc, char **argv) {pthread_t thid1, thid2; // 线程变量printf("condition variable study!\n");// 互斥锁初始化pthread_mutex_init(&mutex, NULL);// 条件变量初始化pthread_cond_init(&cond, NULL);// 创建线程pthread_create(&thid1, NULL, thread1, NULL);pthread_create(&thid2, NULL, thread2, NULL);int i = 0;do {pthread_cond_signal(&cond);sleep(1);} while(i++ < 10);int ret = pthread_cond_destroy(&cond);if (0 != ret) {printf("pthread_cond_destroy failed!\n");exit(1);}return 0;
}
五、线程池
以下代码时NginX的线程池代码;
写的还是挺不错的,B格挺高的!
有兴趣的可以阅读一下!
另外,可以直接复制到自己的项目中去使用!
thread.h
#ifndef _DEMO_THREAD_H_INCLUDED_
#define _DEMO_THREAD_H_INCLUDED_#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/types.h>
#include <pthread.h>
#include <errno.h>
#include <string.h>typedef intptr_t int_t;
typedef uintptr_t uint_t;#define OK 0
#define ERROR -1int thread_mutex_create(pthread_mutex_t *mtx);
int thread_mutex_destroy(pthread_mutex_t *mtx);
int thread_mutex_lock(pthread_mutex_t *mtx);
int thread_mutex_unlock(pthread_mutex_t *mtx);int thread_cond_create(pthread_cond_t *cond);
int thread_cond_destroy(pthread_cond_t *cond);
int thread_cond_signal(pthread_cond_t *cond);
int thread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx);#endif /* _DEMO_THREAD_H_INCLUDED_ */
thread_cond.c
#include "thread.h"int
thread_cond_create(pthread_cond_t *cond)
{int err;err = pthread_cond_init(cond, NULL);if (err == 0) {return OK;}fprintf(stderr, "pthread_cond_init() failed, reason: %s\n",strerror(errno));return ERROR;
}int
thread_cond_destroy(pthread_cond_t *cond)
{int err;err = pthread_cond_destroy(cond);if (err == 0) {return OK;}fprintf(stderr, "pthread_cond_destroy() failed, reason: %s\n",strerror(errno));return ERROR;
}int
thread_cond_signal(pthread_cond_t *cond)
{int err;err = pthread_cond_signal(cond);if (err == 0) {return OK;}fprintf(stderr, "pthread_cond_signal() failed, reason: %s\n",strerror(errno));return ERROR;
}int
thread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mtx)
{int err;err = pthread_cond_wait(cond, mtx);if (err == 0) {return OK;}fprintf(stderr, "pthread_cond_wait() failed, reason: %s\n",strerror(errno));return ERROR;
}
thread_mutex.c
#include "thread.h"int
thread_mutex_create(pthread_mutex_t *mtx)
{int err;pthread_mutexattr_t attr;err = pthread_mutexattr_init(&attr);if (err != 0) {fprintf(stderr, "pthread_mutexattr_init() failed, reason: %s\n",strerror(errno));return ERROR;}err = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);if (err != 0) {fprintf(stderr, "pthread_mutexattr_settype(PTHREAD_MUTEX_ERRORCHECK) failed, reason: %s\n",strerror(errno));return ERROR;}err = pthread_mutex_init(mtx, &attr);if (err != 0) {fprintf(stderr,"pthread_mutex_init() failed, reason: %s\n",strerror(errno));return ERROR;}err = pthread_mutexattr_destroy(&attr);if (err != 0) {fprintf(stderr,"pthread_mutexattr_destroy() failed, reason: %s\n",strerror(errno));}return OK;
}int
thread_mutex_destroy(pthread_mutex_t *mtx)
{int err;err = pthread_mutex_destroy(mtx);if (err != 0) {fprintf(stderr,"pthread_mutex_destroy() failed, reason: %s\n",strerror(errno));return ERROR;}return OK;
}int
thread_mutex_lock(pthread_mutex_t *mtx)
{int err;err = pthread_mutex_lock(mtx);if (err == 0) {return OK;}fprintf(stderr,"pthread_mutex_lock() failed, reason: %s\n",strerror(errno));return ERROR;
}int
thread_mutex_unlock(pthread_mutex_t *mtx)
{int err;err = pthread_mutex_unlock(mtx);#if 0ngx_time_update();
#endifif (err == 0) {return OK;}fprintf(stderr,"pthread_mutex_unlock() failed, reason: %s\n",strerror(errno));return ERROR;
}
thread_pool.h
#ifndef _THREAD_POOL_H_INCLUDED_
#define _THREAD_POOL_H_INCLUDED_#include "thread.h"#define DEFAULT_THREADS_NUM 4 // 线程池中线程个数
#define DEFAULT_QUEUE_NUM 65535 // 任务队列最大值typedef unsigned long atomic_uint_t;
typedef struct thread_task_s thread_task_t; // 任务
typedef struct thread_pool_s thread_pool_t; // 线程池struct thread_task_s {thread_task_t *next;uint_t id;void *ctx; // 参数地址的位置void (*handler)(void *data);
};typedef struct {thread_task_t *first;thread_task_t **last;
} thread_pool_queue_t;#define thread_pool_queue_init(q) \(q)->first = NULL; \(q)->last = &(q)->firststruct thread_pool_s {pthread_mutex_t mtx; // 互斥量,与条件变量一起使用thread_pool_queue_t queue; // 任务队列int_t waiting; // 任务队列等待的个数pthread_cond_t cond; // 条件变量,与互斥量一同使用char *name; // 名字,可不指定uint_t threads; // 线程池中的线程个数 int_t max_queue; // 任务队列最大值
};thread_task_t *thread_task_alloc(size_t size);
void thread_task_free(thread_task_t *task);
int_t thread_task_post(thread_pool_t *tp, thread_task_t *task);
thread_pool_t* thread_pool_init();
void thread_pool_destroy(thread_pool_t *tp);#endif /* _THREAD_POOL_H_INCLUDED_ */
thread_pool.c
#include "thread_pool.h"static void thread_pool_exit_handler(void *data);
static void *thread_pool_cycle(void *data);
static int_t thread_pool_init_default(thread_pool_t *tpp, char *name);static uint_t thread_pool_task_id;static int debug = 0;thread_pool_t* thread_pool_init()
{int err;pthread_t tid;uint_t n;pthread_attr_t attr;thread_pool_t *tp=NULL;tp = calloc(1,sizeof(thread_pool_t));if(tp == NULL){fprintf(stderr, "thread_pool_init: calloc failed!\n");}thread_pool_init_default(tp, NULL);thread_pool_queue_init(&tp->queue);// 初始化互斥量if (thread_mutex_create(&tp->mtx) != OK) {free(tp);return NULL;}// 初始化条件变量if (thread_cond_create(&tp->cond) != OK) {(void) thread_mutex_destroy(&tp->mtx);free(tp);return NULL;}err = pthread_attr_init(&attr);if (err) {fprintf(stderr, "pthread_attr_init() failed, reason: %s\n",strerror(errno));free(tp);return NULL;}// PTHREAD_CREATE_DETACHED:创建的子线程与主线程分离,即主线程不等待子线程结束,各自运行各自的err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);if (err) {fprintf(stderr, "pthread_attr_setdetachstate() failed, reason: %s\n",strerror(errno));free(tp);return NULL;}for (n = 0; n < tp->threads; n++) {err = pthread_create(&tid, &attr, thread_pool_cycle, tp);if (err) {fprintf(stderr, "pthread_create() failed, reason: %s\n",strerror(errno));free(tp);return NULL;}}(void) pthread_attr_destroy(&attr);return tp;
}void thread_pool_destroy(thread_pool_t *tp)
{uint_t n;thread_task_t task;volatile uint_t lock;memset(&task,'\0', sizeof(thread_task_t));task.handler = thread_pool_exit_handler;task.ctx = (void *) &lock;for (n = 0; n < tp->threads; n++) {lock = 1;if (thread_task_post(tp, &task) != OK) {return;}while (lock) {sched_yield();}//task.event.active = 0;}(void) thread_cond_destroy(&tp->cond);(void) thread_mutex_destroy(&tp->mtx);free(tp);
}static void
thread_pool_exit_handler(void *data)
{uint_t *lock = data;*lock = 0;pthread_exit(0);
}thread_task_t *
thread_task_alloc(size_t size)
{thread_task_t *task;// 分配的内存 加上 线程运行时传入的参数的位置大小task = calloc(1,sizeof(thread_task_t) + size);if (task == NULL) {return NULL;}task->ctx = task + 1; // task + 1:就指向了参数的起始位置return task;
}void thread_task_free(thread_task_t *task) {if (task) {free(task);task = NULL;}
}int_t
thread_task_post(thread_pool_t *tp, thread_task_t *task)
{if (thread_mutex_lock(&tp->mtx) != OK) {return ERROR;}if (tp->waiting >= tp->max_queue) {(void) thread_mutex_unlock(&tp->mtx);fprintf(stderr,"thread pool \"%s\" queue overflow: %ld tasks waiting\n",tp->name, tp->waiting);return ERROR;}//task->event.active = 1;task->id = thread_pool_task_id++;task->next = NULL;if (thread_cond_signal(&tp->cond) != OK) {(void) thread_mutex_unlock(&tp->mtx);return ERROR;}*tp->queue.last = task; // 链接到尾部tp->queue.last = &task->next; // 指向尾部tp->waiting++;(void) thread_mutex_unlock(&tp->mtx);if(debug)fprintf(stderr,"task #%lu added to thread pool \"%s\"\n",task->id, tp->name);return OK;
}static void *
thread_pool_cycle(void *data)
{thread_pool_t *tp = data;int err;thread_task_t *task;if(debug)fprintf(stderr,"thread in pool \"%s\" started\n", tp->name);for ( ;; ) {if (thread_mutex_lock(&tp->mtx) != OK) {return NULL;}tp->waiting--;while (tp->queue.first == NULL) {if (thread_cond_wait(&tp->cond, &tp->mtx)!= OK){(void) thread_mutex_unlock(&tp->mtx);return NULL;}}task = tp->queue.first;tp->queue.first = task->next;if (tp->queue.first == NULL) {tp->queue.last = &tp->queue.first;}if (thread_mutex_unlock(&tp->mtx) != OK) {return NULL;}if(debug) fprintf(stderr,"run task #%lu in thread pool \"%s\"\n",task->id, tp->name);task->handler(task->ctx);if(debug) fprintf(stderr,"complete task #%lu in thread pool \"%s\"\n",task->id, tp->name);task->next = NULL;thread_task_free(task);//notify }
}static int_t
thread_pool_init_default(thread_pool_t *tpp, char *name)
{if(tpp){tpp->threads = DEFAULT_THREADS_NUM;tpp->max_queue = DEFAULT_QUEUE_NUM;tpp->name = strdup(name?name:"default");if(debug)fprintf(stderr,"thread_pool_init, name: %s ,threads: %lu max_queue: %ld\n",tpp->name, tpp->threads, tpp->max_queue);return OK;}return ERROR;
}
测试:main.c
#include "thread_pool.h"
#include <unistd.h>// 线程参数
struct test{int arg1;int arg2;
};void task_handler1(void* data){static int index = 0;printf("Hello, this is 1th test.index=%d\r\n", index++);}void task_handler2(void* data){static int index = 0;printf("Hello, this is 2th test.index=%d\r\n", index++);}void task_handler3(void* data){static int index = 0;struct test *t = (struct test *) data;printf("Hello, this is 3th test.index=%d\r\n", index++);printf("arg1: %d, arg2: %d\n", t->arg1, t->arg2);}int
main(int argc, char **argv)
{thread_pool_t* tp = NULL;int i = 0;tp = thread_pool_init(); //sleep(1);thread_task_t * test1 = thread_task_alloc(0);thread_task_t * test2 = thread_task_alloc(0);thread_task_t * test3 = thread_task_alloc(sizeof(struct test));test1->handler = task_handler1;test2->handler = task_handler2;test3->handler = task_handler3;((struct test*)test3->ctx)->arg1 = 666;((struct test*)test3->ctx)->arg2 = 888;//for(i=0; i<10;i++){ thread_task_post(tp, test1);thread_task_post(tp, test2);thread_task_post(tp, test3);//}sleep(5);thread_pool_destroy(tp);
}
Makefile
CXX := gcc
TARGET = threadpool
RMRF := rm -rf
SRC = $(wildcard *.cpp *.c) # 获取当前路径下的所有.cpp 和 .c 文件
OBJ = $(patsubst %.cpp %.c, %.o, $(SRC)) # 把$(SRC)中符合.cpp 和 .c 的所有文件转换成.o文件CXXFLAGS = -c -WallLIBS = -lpthread$(TARGET): $(OBJ)$(CXX) $^ $(LIBS) -o $@%.o: %cpp %c$(CXX) $(CXXFLAGS) $< -o $@.PHONY: clean
clean:$(RMRF) *.o
cleanall:$(RMRF) *.o $(TARGET)
六、总结
线程的操作已经完毕;个人觉得互斥量是线程经常使用到的,所以得更加专注的去学习那一块知识点!
另外,线程池这部分代码,效率还是挺高的,所以也得去学习一下大佬的写法!