本篇围绕线程池在不同场景下的使用,各种参数调优&配置,包括线程池的关闭等;
将结合场景、源码和示例代码来说明其中关键。
目录
1. 创建线程池
1.1 corePoolSize
1.2 maximumPoolSize
1.3 workQueue
1.3.1 ArrayBlockingQueue
1.3.2 LinkedBlockingQueue
1.3.3 SynchronousQueue
1.4 TimeUnit和keepAliveTime
1.5 ThreadFactory
1.6 RejectedExecutionHandler
2. 使用线程池
2.1 execute()提交任务
2.2 submit(Runnable) 无返回值
2.3 submit(Runnable task, T result) 有"返回值"
2.4 submit(Callable) 真有返回值
3.关闭线程池
3.1 关闭方法对比
3.2 何时关闭线程池呢?
4. 合理配置线程池
4.1 阻塞队列
4.2 饱和策略
4.3 线程数
1. 创建线程池
之前分析过使用Executors创建线程池的弊端了,它们正是通过new ThreadPoolExecutor(...)
的方式来创建的。其实我们只要通过合理配置该构造函数的参数,就能得到一个稳定可控的池了。
以上就是线程池的全参构造函数,可以看到前面两个 if() 都是参数合法性校验。
1.1 corePoolSize
定义了线程池中最少要保持的线程数,即使线程池中没有任务,核心线程也会一直存活,直到线程池被关闭;
在创建线程池后,可以通过调用 prestartAllCoreThreads()方法提前创建线程等待任务;
也可以通过设置 allowCoreThreadTimeOut(true)来控制核心线程数是否受超时配置的影响。
java"> public void allowCoreThreadTimeOut(boolean value) {if (value && keepAliveTime <= 0)throw new IllegalArgumentException("Core threads must have nonzero keep alive times");if (value != allowCoreThreadTimeOut) {allowCoreThreadTimeOut = value;if (value)interruptIdleWorkers();}}
1.2 maximumPoolSize
允许的最大线程数,如果队列满了,并且已创建的线程数小于最大线程数时新建线程;
如果使用了无界队列的话这个参数就失效了,这点要特别注意。
1.3 workQueue
图中比较多我们只需要关注 java.util.concurrent 包下的几个就可以了。
1.3.1 ArrayBlockingQueue
基于数组结构四号线的有界阻塞队列,按照FIFO原则进行排序;内部通过ReentrantLock实现线程安全,可以通过构造方法参数fair 实现公平锁。
1.3.2 LinkedBlockingQueue
基于链表结构实现的阻塞队列,按照FIFO原则排序,当指定容量时就是有界,反之则无界;由于其入队和出队分别两把锁,所以吞吐量通常高于ArrayBlockingQueue。Executors.newFixedThreadPool()就使用了此队列。
1.3.3 SynchronousQueue
不存储任何元素的阻塞队列,适合用于两个线程之间的协作,通常用于生产者和消费者之间直接的交换;Executors.newCachedThreadPool()就使用了此队列。
其他队列不在此说了,有机会分享下并发类容器。
1.4 TimeUnit和
keepAliveTime
TimeUnit是个枚举类,表示超时的时间单位,例如毫秒、秒、分、小时、天等;
keepAliveTime标示空闲线程的最大存活时间,这两个参数是一起的。如果不设置allowCoreThreadTimeOut(true)的话只对非核心线程有作用。
如果任务很多,且每个任务执行时间较短,执行频率也很高,可以适当调大时间,提高线程利用率。相反,如果任务不多,且执行频率很低,适当调小可回收空闲线程节省资源。
1.5 ThreadFactory
线程工厂,给当前线程池创建线程的工厂类;
我们可以使用google的工具类自定义线程名称、是否守护线程等,代码及控制台打印如下:
java">package org.springblade.test;import com.google.common.util.concurrent.ThreadFactoryBuilder;import java.util.concurrent.*;/*** @Auther: liuzujie* @Date: 2025/1/1 21:45* @Desc: 测试类*/
public class ThreadPoolTest {//自定义线程工厂private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-thread-%d") // 为线程设置自定义名称.setDaemon(true) // 是否守护线程.setPriority(Thread.NORM_PRIORITY) // 线程优先级.build();private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),threadFactory, // 使用自定义的线程工厂new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) {executor.execute(() -> System.out.println("Task executed by: " + Thread.currentThread().getName()));executor.execute(() -> System.out.println("Task executed by: " + Thread.currentThread().getName()));executor.execute(() -> System.out.println("Task executed by: " + Thread.currentThread().getName()));executor.shutdown();}}
1.6 RejectedExecutionHandler
指定饱和策略,当队列和线程池都满了,就必须采用一种策略来处理新提交的任务。同样只需要关注 java.util.concurrent 包下的四个实现类就可以;
如默认策略无法满足,也可以根据场景需求通过 implements RejectedExecutionHandler自定义策略。如将任务信息记录到日志,或者持久化到某存储,然后做重试、人工介入等处理。
1)AbortPolicy :直接抛出异常
2) DiscardPolicy: 不做任何处理,丢弃掉
3) CallerRunsPolicy: 用调用者所在的线程来运行该任务,如果线程池已关闭则丢弃
4)DiscardOldestPolicy:丢弃队列中最近的一个任务,并执行当前任务。
java"> /*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.*/public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}
2. 使用线程池
有两种方法可以向线程池中提交任务,execute() 和 submit();
execute只能接受Runnable,而submit却可以接受Runnable和Callable两种,下图是源码。
2.1 execute()提交任务
用于不需要返回值的场景,无法判断任务的状态(提交成功的无法获知执行结果)。
为减小篇幅,后面的示例只提供main方法里的demo。
java">package org.springblade.test;import com.google.common.util.concurrent.ThreadFactoryBuilder;import java.util.concurrent.*;/*** @Auther: liuzujie* @Date: 2025/1/1 21:45* @Desc: 测试类*/
public class ThreadPoolTest {//自定义线程工厂private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-thread-%d") // 为线程设置自定义名称.setDaemon(true) // 是否守护线程.setPriority(Thread.NORM_PRIORITY) // 线程优先级.build();private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),threadFactory, // 使用自定义的线程工厂new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) {executor.execute(new Runnable() {@Overridepublic void run() {System.out.println("TODO.....");}});executor.shutdown();}}
2.2 submit(Runnable) 无返回值
由于Runnable没有返回值,所以Future除了没有返回值,其他的接口还是都可以用的,比如get() 阻塞获取结果、中断任务等,看下所有Api吧;
方法 | 描述 |
---|---|
get() | 获取结果(会阻塞等待) |
get(long timeout, TimeUnit unit) | 在指定的时间内获取结果,如果超时,会抛异常并退出等待状态 |
cancel(boolean mayInterruptIfRunning) | 参数为true时,尝试中断任务的执行,false表示不中断; 中断成功返回true,反之false。 |
isCancelled() | 判断任务是否已取消 |
isDone() | 判断任务是否已完成 |
java">public static void main(String[] args) {Future<?> future = executor.submit(new Runnable() {@Overridepublic void run() {try {Thread.sleep(500L);System.out.println("Runnable Task Completed!");} catch (InterruptedException e) {e.printStackTrace();}}});try {System.err.println("阻塞,直到任务完成:" + future.get()); //} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}executor.shutdown();}
2.3 submit(Runnable task, T result) 有"返回值"
虽然 Runnable
本身并没有返回值,但该Api 方法的允许你为 Runnable
任务指定一个额外的“结果”(T result
)。这里的“返回值”并不是 Runnable
任务本身的执行结果,而是 submit
方法的一个附加功能,上代码
java">public static void main(String[] args) {Future<String> future = executor.submit(new Runnable() {@Overridepublic void run() {try {Thread.sleep(500L);System.out.println("Runnable Task Completed!");} catch (InterruptedException e) {e.printStackTrace();}}}, "Task Result");try {System.out.println("submit的返回结果:" + future.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}executor.shutdown();}
2.4 submit(Callable) 真有返回值
Callable不仅有返回值,还可以抛出异常
java"> public static void main(String[] args) {Future<String> future = executor.submit(new Callable<String>() {@Overridepublic String call() throws Exception { //可以抛异常Thread.sleep(500L);return "Callable Task Completed!"; //返回值}});try {System.out.println("正儿八经有返回值:" + future.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}executor.shutdown();}
3.关闭线程池
3.1 关闭方法对比
shutdown()方法
- 作用:会把正在执行以及阻塞队列中等待的任务执行完,在此期间拒绝接受新任务并执行饱和策略。
- 使用场景:希望优雅地结束当下池中所有任务,常用于服务停止、应用关闭等。
- 原理:将线程池状态设置成SHUTDOWN,然后interrupt()中断所有正在等待任务的线程。
shutdownNow()
方法
- 作用:立即停止线程池中所有的活动任务,并尝试停止所有正在等待执行的任务,并返回等待任务的列表。
- 使用场景:如果你希望尽快停止线程池的所有活动,可以使用该方法,特别是当你希望强制停止正在执行的任务时。
- 原理:将线程池设置成STOP,然后尝试关闭所有正在执行或暂停的任务线程。
我们可以使用
isShutdown()、
isTerminating()、isTerminated()来监控线程池状态。
3.2 何时关闭线程池呢?
在项目编码中,经常看到execute()后紧接着就调用shutdown()甚至shutdownNow(),结合代码前后逻辑,经常发现这样是有问题的(比如shutdownNow()很可能导致任务丢失等风险)。
关闭线程池的时机通常取决于应用程序的生命周期和线程池的使用场景,此处大概总结几种建议是否关闭线程池的场景:
1、应用程序要退出,进行资源清理,这时是要关闭线程池释放资源的;
2、如果线程池只在一天内某个点集中处理一些任务,那么考虑闲置时关闭以节省资源;
3、如果源源不断的向线程池中丢任务,一直处于工作状态,那是不建议每丢完一次就关闭一次的,因为线程池的销毁和创建也是一笔可观的损耗,都涉及到全局锁和自旋锁等操作;
4. 合理配置线程池
总概:
1)按任务性质来看的话,一般有CPU密集型、IO密集型、混合型。通常性质不同的任务建议用不同的线程池分开处理;
如果混合型任务可以拆成CPU和IO密集型两个任务,且两个任务耗时相差不大,也可以分解后执行以提高吞吐量。
2)如果任务有优先级的话,则考虑使用优先级队列PriorityBlockingQueue,需要注意的是如果一直有优先级高的任务入队,那么优先级低的任务可能永远都不会执行;
3)根据任务的执行时间长短,也可以考虑用优先级队列优先处理时间短的;
也可以按耗时长短使用不同的线程池,分而治之(XXL-JOB中就使用了快慢线程池);
4.1 阻塞队列
强烈建议使用有界队列,能有效地提高系统可控性和稳定性,避免任务堆积造成的OOM等
4.2 饱和策略
一定要根据业务场景需要来选择饱和策略。要求较高不允许丢任务时自定义饱和策略,写入消息队列或数据库,进行自动重试或人工介入是不错的选择。
4.3 线程数
A) 如果是CPU密集型应尽可能配置较小的线程数,因为过多的线程上下文切换对性能的损耗不容小觑,如配置成CPU数+1个线程。
B) 如果是IO密集型,则通常不会过多占用CPU,都在IO操作阻塞,通常可以设置为 2 到 3 倍的 CPU 核心数。
C)混合型的话需要根据负载的比例来配置线程池。
D) 如果任务以来数据库或者外部调用等,则可适当增加线程数,因为这类操作等待越长则CPU闲置越长。
核心线程数和最大线程数的配置没有银弹,想获得高性能和稳定性的最优线程数,就必须在实际场景下进行阶梯式测试,才能稳如老狗啊 。