并发编程 之 线程池实战配置建议(二)

news/2025/1/6 7:23:17/

本篇围绕线程池在不同场景下的使用,各种参数调优&配置,包括线程池的关闭等;

将结合场景、源码和示例代码来说明其中关键。

目录

1.  创建线程池

      1.1  corePoolSize

      1.2  maximumPoolSize

      1.3  workQueue

        1.3.1 ArrayBlockingQueue

        1.3.2 LinkedBlockingQueue

        1.3.3 SynchronousQueue

      1.4 TimeUnit和keepAliveTime

      1.5  ThreadFactory

      1.6 RejectedExecutionHandler

2. 使用线程池

         2.1 execute()提交任务

         2.2 submit(Runnable) 无返回值

         2.3 submit(Runnable task, T result) 有"返回值"

         2.4 submit(Callable) 真有返回值

3.关闭线程池

       3.1 关闭方法对比

       3.2 何时关闭线程池呢? 

4. 合理配置线程池

       4.1 阻塞队列

       4.2 饱和策略

       4.3 线程数


1.  创建线程池

        之前分析过使用Executors创建线程池的弊端了,它们正是通过new ThreadPoolExecutor(...)

的方式来创建的。其实我们只要通过合理配置该构造函数的参数,就能得到一个稳定可控的池了。

线程池构造函数

        以上就是线程池的全参构造函数,可以看到前面两个 if() 都是参数合法性校验。

      1.1  corePoolSize

         定义了线程池最少要保持的线程数,即使线程池中没有任务,核心线程也会一直存活,直到线程池被关闭;

        在创建线程池后,可以通过调用  prestartAllCoreThreads()方法提前创建线程等待任务;

        也可以通过设置  allowCoreThreadTimeOut(true)来控制核心线程数是否受超时配置的影响。 

java">    public void allowCoreThreadTimeOut(boolean value) {if (value && keepAliveTime <= 0)throw new IllegalArgumentException("Core threads must have nonzero keep alive times");if (value != allowCoreThreadTimeOut) {allowCoreThreadTimeOut = value;if (value)interruptIdleWorkers();}}

      1.2  maximumPoolSize

        允许的最大线程数,如果队列满了,并且已创建的线程数小于最大线程数时新建线程;

        如果使用了无界队列的话这个参数就失效了,这点要特别注意。

      1.3  workQueue

         图中比较多我们只需要关注 java.util.concurrent 包下的几个就可以了。

          1.3.1 ArrayBlockingQueue

             基于数组结构四号线的有界阻塞队列,按照FIFO原则进行排序;内部通过ReentrantLock实现线程安全,可以通过构造方法参数fair 实现公平锁。

        1.3.2 LinkedBlockingQueue

        基于链表结构实现的阻塞队列,按照FIFO原则排序,当指定容量时就是有界,反之则无界由于其入队和出队分别两把锁,所以吞吐量通常高于ArrayBlockingQueue。Executors.newFixedThreadPool()就使用了此队列。

        1.3.3 SynchronousQueue

        不存储任何元素的阻塞队列,适合用于两个线程之间的协作,通常用于生产者和消费者之间直接的交换;Executors.newCachedThreadPool()就使用了此队列。

        其他队列不在此说了,有机会分享下并发类容器。

      1.4 TimeUnitkeepAliveTime

   TimeUnit是个枚举类,表示超时的时间单位,例如毫秒、秒、分、小时、天等;

         keepAliveTime标示空闲线程的最大存活时间,这两个参数是一起的。如果不设置allowCoreThreadTimeOut(true)的话只对非核心线程有作用。

        如果任务很多,且每个任务执行时间较短,执行频率也很高,可以适当调大时间,提高线程利用率。相反,如果任务不多,且执行频率很低,适当调小可回收空闲线程节省资源。

  1.5  ThreadFactory

    线程工厂,给当前线程池创建线程的工厂类;

   我们可以使用google的工具类自定义线程名称、是否守护线程等,代码及控制台打印如下:

java">package org.springblade.test;import com.google.common.util.concurrent.ThreadFactoryBuilder;import java.util.concurrent.*;/*** @Auther: liuzujie* @Date: 2025/1/1 21:45* @Desc: 测试类*/
public class ThreadPoolTest {//自定义线程工厂private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-thread-%d")  // 为线程设置自定义名称.setDaemon(true)                // 是否守护线程.setPriority(Thread.NORM_PRIORITY)  // 线程优先级.build();private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),threadFactory,                 // 使用自定义的线程工厂new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) {executor.execute(() -> System.out.println("Task executed by: " + Thread.currentThread().getName()));executor.execute(() -> System.out.println("Task executed by: " + Thread.currentThread().getName()));executor.execute(() -> System.out.println("Task executed by: " + Thread.currentThread().getName()));executor.shutdown();}}

         1.6 RejectedExecutionHandler

                指定饱和策略,当队列和线程池都满了,就必须采用一种策略来处理新提交的任务。同样只需要关注 java.util.concurrent 包下的四个实现类就可以;

                如默认策略无法满足,也可以根据场景需求通过 implements RejectedExecutionHandler自定义策略。如将任务信息记录到日志,或者持久化到某存储,然后做重试、人工介入等处理。

                     1)AbortPolicy :直接抛出异常         

                     2)  DiscardPolicy: 不做任何处理,丢弃掉

                     3)  CallerRunsPolicy:  用调用者所在的线程来运行该任务,如果线程池已关闭则丢弃

                     4)DiscardOldestPolicy:丢弃队列中最近的一个任务,并执行当前任务。

java">    /*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.*/public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}

2. 使用线程池

        有两种方法可以向线程池中提交任务,execute() 和 submit();

        execute只能接受Runnable,而submit却可以接受Runnable和Callable两种,下图是源码。

         2.1 execute()提交任务

            用于不需要返回值的场景,无法判断任务的状态(提交成功的无法获知执行结果)。

            为减小篇幅,后面的示例只提供main方法里的demo。

java">package org.springblade.test;import com.google.common.util.concurrent.ThreadFactoryBuilder;import java.util.concurrent.*;/*** @Auther: liuzujie* @Date: 2025/1/1 21:45* @Desc: 测试类*/
public class ThreadPoolTest {//自定义线程工厂private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-thread-%d")  // 为线程设置自定义名称.setDaemon(true)                // 是否守护线程.setPriority(Thread.NORM_PRIORITY)  // 线程优先级.build();private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),threadFactory,                 // 使用自定义的线程工厂new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) {executor.execute(new Runnable() {@Overridepublic void run() {System.out.println("TODO.....");}});executor.shutdown();}}

         2.2 submit(Runnable) 无返回值

            由于Runnable没有返回值,所以Future除了没有返回值,其他的接口还是都可以用的,比如get() 阻塞获取结果、中断任务等,看下所有Api吧;

方法描述
get()获取结果(会阻塞等待)
get(long timeout, TimeUnit unit)在指定的时间内获取结果,如果超时,会抛异常并退出等待状态
cancel(boolean mayInterruptIfRunning)

参数为true时,尝试中断任务的执行,false表示不中断;

中断成功返回true,反之false。

isCancelled()判断任务是否已取消
isDone()判断任务是否已完成
java">public static void main(String[] args) {Future<?> future = executor.submit(new Runnable() {@Overridepublic void run() {try {Thread.sleep(500L);System.out.println("Runnable Task Completed!");} catch (InterruptedException e) {e.printStackTrace();}}});try {System.err.println("阻塞,直到任务完成:" + future.get()); //} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}executor.shutdown();}
执行结果

         2.3 submit(Runnable task, T result) 有"返回值"

            虽然 Runnable 本身并没有返回值,但该Api 方法的允许你为 Runnable 任务指定一个额外的“结果”(T result)。这里的“返回值”并不是 Runnable 任务本身的执行结果,而是 submit 方法的一个附加功能,上代码

java">public static void main(String[] args) {Future<String> future = executor.submit(new Runnable() {@Overridepublic void run() {try {Thread.sleep(500L);System.out.println("Runnable Task Completed!");} catch (InterruptedException e) {e.printStackTrace();}}}, "Task Result");try {System.out.println("submit的返回结果:" + future.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}executor.shutdown();}

       2.4 submit(Callable) 真有返回值

            Callable不仅有返回值,还可以抛出异常

java">	public static void main(String[] args) {Future<String> future = executor.submit(new Callable<String>() {@Overridepublic String call() throws Exception { //可以抛异常Thread.sleep(500L);return "Callable Task Completed!"; //返回值}});try {System.out.println("正儿八经有返回值:" + future.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}executor.shutdown();}

3.关闭线程池

       3.1 关闭方法对比

     shutdown()方法

  • 作用:会把正在执行以及阻塞队列中等待的任务执行完,在此期间拒绝接受新任务并执行饱和策略。
  • 使用场景:希望优雅地结束当下池中所有任务,常用于服务停止、应用关闭等。
  • 原理:线程池状态设置成SHUTDOWN,然后interrupt()中断所有正在等待任务的线程。

     shutdownNow() 方法

  • 作用:立即停止线程池中所有的活动任务,并尝试停止所有正在等待执行的任务,并返回等待任务的列表。
  • 使用场景:如果你希望尽快停止线程池的所有活动,可以使用该方法,特别是当你希望强制停止正在执行的任务时。
  • 原理:线程池设置成STOP,然后尝试关闭所有正在执行或暂停的任务线程。

 我们可以使用isShutdown()、isTerminating()、isTerminated()来监控线程池状态。

    3.2 何时关闭线程池呢? 

           在项目编码中,经常看到execute()后紧接着就调用shutdown()甚至shutdownNow(),结合代码前后逻辑,经常发现这样是有问题的(比如shutdownNow()很可能导致任务丢失等风险)。

        关闭线程池的时机通常取决于应用程序的生命周期和线程池的使用场景,此处大概总结几种建议是否关闭线程池的场景:

        1、应用程序要退出,进行资源清理,这时是要关闭线程池释放资源的;

        2、如果线程池只在一天内某个点集中处理一些任务,那么考虑闲置时关闭以节省资源;

        3、如果源源不断的向线程池中丢任务,一直处于工作状态,那是不建议每丢完一次就关闭一次的,因为线程池的销毁和创建也是一笔可观的损耗,都涉及到全局锁和自旋锁等操作;

4. 合理配置线程池

        总概:

        1)按任务性质来看的话,一般有CPU密集型、IO密集型、混合型。通常性质不同的任务建议用不同的线程池分开处理;

             如果混合型任务可以拆成CPU和IO密集型两个任务,且两个任务耗时相差不大,也可以分解后执行以提高吞吐量。

        2)如果任务有优先级的话,则考虑使用优先级队列PriorityBlockingQueue,需要注意的是如果一直有优先级高的任务入队,那么优先级低的任务可能永远都不会执行;

        3)根据任务的执行时间长短,也可以考虑用优先级队列优先处理时间短的;

             也可以按耗时长短使用不同的线程池,分而治之(XXL-JOB中就使用了快慢线程池);

       4.1 阻塞队列

          强烈建议使用有界队列,能有效地提高系统可控性和稳定性,避免任务堆积造成的OOM等

       4.2 饱和策略

          一定要根据业务场景需要来选择饱和策略。要求较高不允许丢任务时自定义饱和策略,写入消息队列或数据库,进行自动重试或人工介入是不错的选择。

       4.3 线程数

          A)  如果是CPU密集型应尽可能配置较小的线程数,因为过多的线程上下文切换对性能的损耗不容小觑,如配置成CPU数+1个线程。

         B) 如果是IO密集型,则通常不会过多占用CPU,都在IO操作阻塞,通常可以设置为 2 到 3 倍的 CPU 核心数。

        C)混合型的话需要根据负载的比例来配置线程池

        D)  如果任务以来数据库或者外部调用等,则可适当增加线程数,因为这类操作等待越长则CPU闲置越长。

核心线程数和最大线程数的配置没有银弹,想获得高性能和稳定性的最优线程数,就必须在实际场景下进行阶梯式测试,才能稳如老狗啊 。

        


http://www.ppmy.cn/news/1560593.html

相关文章

SSH 连接远程仓库并推送本地项目

Git 是一个强大的版本控制工具&#xff0c;能够帮助开发者管理项目的代码版本&#xff0c;并便于与他人协作。为了高效地与远程仓库&#xff08;如 GitHub、GitLab 等&#xff09;进行交互&#xff0c;常见的做法是使用 SSH 协议进行连接。在本篇文章中&#xff0c;我们将从零开…

概率论与数理统计

概率论占比更多&#xff0c;三分之二左右 数理统计会少一些 事件之间的概率 ab互斥&#xff0c;不是ab独立 古典概型吃高中基础&#xff0c;考的不会很多 条件概率公式&#xff0c;要记 公式不要全记&#xff0c;很多有名称的公式是通过基础公式转换而来的 目的在于解决一…

[人工智能] 结合最新技术:Transformer、CLIP与边缘计算在提高人脸识别准确率中的应用

随着人工智能的快速发展&#xff0c;特别是深度学习和自然语言处理领域的革命性技术&#xff0c;越来越多的前沿技术被应用于人脸识别中。Transformer架构、CLIP模型以及边缘计算的结合&#xff0c;正成为提升人脸识别准确率和应用效能的关键技术路径。特别是在多样化场景下&am…

node.js之---子线程(child_process)模块

为什么需要子线程&#xff08;child_process&#xff09;模块 Worker Threads 的基本概念 如何使用 Worker Threads Worker Threads 的性能 Worker 线程的优势和限制 进阶用法&#xff1a;共享内存 为什么需要子线程&#xff08;child_process&#xff09;模块 在 Node.js…

条款35:考虑虚函数以外的其它选择(Consider alternatives to virtual functions)

条款35&#xff1a;考虑虚函数以外的其它选择 1.1 提出问题 假设正在制作一款游戏&#xff0c;正在为游戏中的角色设计一个层次结构。 class GameCharacter { public:virtual int healthValue() const; //返回角色的生命值;派生类可以重新定义它... // };1.2 解决办法 让我…

【狂热算法篇】解锁数据潜能:探秘前沿 LIS 算法

嘿&#xff0c;各位编程爱好者们&#xff01;今天带来的 LIS 算法简直太赞啦 无论你是刚入门的小白&#xff0c;还是经验丰富的大神&#xff0c;都能从这里找到算法的奇妙之处哦&#xff01;这里不仅有清晰易懂的 C 代码实现&#xff0c;还有超详细的算法讲解&#xff0c;让你轻…

redis7基础篇2 redis的主从模式1

目录 一 主从模式 1.1 主从复制的作用 1.2 配置常用命令 1.3 主从复制常见问题 1.4 主从复制的缺点 1.5 redis主从复制原理 二 redis主从复制的搭建流程 2.1 注意事项 2.2 redis的主从复制架构图 2.3 以6379.conf配置文件配置为例 2.4 以6380.conf配置文件配置为例 …

数论问题22

题、证明&#xff0c;方程x-yz1具有无限多个正整数解(x&#xff0c;y&#xff0c;z)&#xff0c;其中x&#xff0c;y&#xff0c;z两两不同&#xff0c;且任意两个之积都被第三个整除。 分析与证明:假设方程的正整数解两两不同&#xff0c;并且任意两个之积都被第三个整除。即 …