CompletableFuture的allOf一定不要乱用!血泪史复盘

server/2024/9/23 7:41:52/

文章目录

  • 1. 到底遇到了什么问题?
  • 2. CountDownLatch搞起?
  • 3. allOf里面的坑
  • 4. 优化建议:

1. 到底遇到了什么问题?

最近看到组里面的同学遇到了这样的业务场景:

主线程需要异步并发调用多个接口,并且主线程需要等待线程全部执行完成之后,返回执行结果,业务流程如下:

这里面有一个注意点:一旦异步线程有一个失败,我主线程就不等待了,这种需求很常见,比如多线程拼装对象数据等等。那我们如何解决呢?

一定要坚持看完!细节往往决定成败,写代码也一样!!收获多多
在这里插入图片描述

2. CountDownLatch搞起?

此时有可能有可能第一时间会想到CountDownLatch,但是CountDownLatch本质内部是采用计数器,主线程一直调用await()阻塞等待,需要等待所有子线程执行完之后(不管成功或者失败)主线程才会往下执行,且主线程和子线程无法传递异常,并且需要注意异常的抛出。

这块可以参考之前写的CountDownLatch文章:

关于CountDownLatch的底层源码和闭坑指南,只看这一篇就够了!!

这里面有常见的坑一栏,可以重点看下,保你必有收获,所以CountDownLatch并不是最优方案。

这块很多同学会选用CompletableFuture来实现,因为CompletableFuture底层有丰富的任务编排和链式调用,并且此时肯定会有很多同学说CompletableFuture.allOf可以轻松实现。

此时如果你不自己去试试的话,八股文一背,此时你就被带偏了!!我们可以去看一下,在接下来会给出一个allOf的使用场景,把这个问题重现一下:

3. allOf里面的坑

场景:当有一批任务交给线程池执行,我们需要获取所有线程的返回结果。

首先定义一个并发执行器类:

java">public class CompletableFutureEngine {private final static ExecutorService executorService = Executors.newFixedThreadPool(4);/*** 创建并行任务并执行** @param list            数据源* @param function        API调用逻辑* @param exceptionHandle 异常处理逻辑* @return 处理结果列表*/public static <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> function, Consumer<Throwable> exceptionHandle) {List<CompletableFuture<T>> completableFutures = list.stream().map(s -> CompletableFuture.supplyAsync(() -> function.apply(s))).collect(Collectors.toList());List<T> results = new ArrayList<>();try {CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();for (CompletableFuture<T> completableFuture : completableFutures) {results.add(completableFuture.get());}} catch (Exception e) {if (e instanceof CompletionException) {if (e.getCause() != null) {exceptionHandle.accept(e.getCause());}}}return results;}}

调用方法:

java">public class EngineDemo {private static void sleep(long sleepTime) {try {Thread.sleep(sleepTime);} catch (InterruptedException e) {e.printStackTrace();}}private static void currentDate(String str) {// 创建一个Date对象,它包含了当前时间Date now = new Date();// 创建一个SimpleDateFormat对象,用于指定输出格式SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 使用format方法将Date对象格式化为字符串String currentTime = dateFormat.format(now);// 打印当前时间System.out.println(str + "是: " + currentTime);}public static void main(String[] args) {currentDate("执行前");List<Integer> numList = CompletableFutureEngine.parallelFutureJoin(Arrays.asList(1, 3, 5),num -> {sleep(num * 1000);if (num == 1) {throw new BusinessException("心别太大");}return num;}, e -> {if (e instanceof BusinessException) {System.out.println("BusinessException =" + e.getMessage());} else {System.out.println("Exception entrance");}});System.out.println(numList);currentDate("执行后");}
}

此时,你会发现,程序的执行时间为5秒钟,感觉似乎和想的不一样,因为在代码中,是根据num决定的休眠时间,因此在第一印象中,应该是第一秒执行完就会抛出异常。尴尬了,现在从日志看程序的执行时间是5秒,那我们看下原因:走起,那我们debug下代码:

你会发现当程序抛出异常的时候,发现传进来的三个CompletableFuture,不管是成功还是失败都执行完了。

这也就说明了当抛出异常后,allOf并不会及时感知异常,而是等所有任务都执行完之后才往下继续运行,那此处会有两种情况:

  1. 使用完allOf之后,还要去做流程编排,不去直接get的话,这种方式没有问题。
  2. 需要汇聚所有的子线程执行结果返回给主线程,这种allOf可能效率会低,因为需要等待所有的子线程执行完才会去返回最终的结果,那如果遇到主要一个子线程失败,执行就失败,就会导致执行时间短的失败了,但是有个任务执行时间很长,返回时间也会变长,从而导致主线程等待的时间变长.

那针对于第二种情况应该如何优化呢!!!继续往下看!!!

4. 优化建议:

可以用CompletableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()) 来代替 allOf。这个方法只要有一个子线程出现异常,主线程就会感知到异常,不用等待其他线程执行完。

优化后的并发执行器如下:

java">public class CompletableFutureEngine2 {/*** 创建并行任务并执行** @param list            数据源* @param function        API调用逻辑* @return 处理结果列表*/public static <S, T> List<T> parallelFutureJoin(Collection<S> list, Function<S, T> function, Consumer<Throwable> exceptionHandle) {List<CompletableFuture<T>> completableFutures = list.stream().map(s -> CompletableFuture.supplyAsync(() -> function.apply(s))).collect(Collectors.toList());List<T> results = null;try {results = completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());} catch (Exception e) {if (e instanceof CompletionException) {if (e.getCause() != null) {exceptionHandle.accept(e.getCause());}}}return results;}
}

调用demo如下:

java">public class EngineDemo {private static void sleep(long sleepTime) {try {Thread.sleep(sleepTime);} catch (InterruptedException e) {e.printStackTrace();}}private static void currentDate(String str) {// 创建一个Date对象,它包含了当前时间Date now = new Date();// 创建一个SimpleDateFormat对象,用于指定输出格式SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 使用format方法将Date对象格式化为字符串String currentTime = dateFormat.format(now);// 打印当前时间System.out.println(str + "是: " + currentTime);}public static void main(String[] args) {currentDate("执行前");List<Integer> numList = CompletableFutureEngine2.parallelFutureJoin(Arrays.asList(1, 3, 5),num -> {sleep(num * 1000);if (num == 1) {throw new BusinessException("心别太大");}return num;}, e -> {if (e instanceof BusinessException) {System.out.println("BusinessException =" + e.getMessage());} else {System.out.println("Exception entrance");}});System.out.println(numList);currentDate("执行后");}
}

运行完之后,我们再去看下执行日志

你会发现,因为子线程的睡眠时间为传进来的num值,当num=1时,触发告警,因此主线程在等待一秒中就会感知到异常,郑如日志打印的时间间隔为1秒钟。

此时再去想想:那此时抛出异常时,num为3,5执行状态时怎么样的,再去debug一下触发异常这块代码:

你会发现当num=1抛出异常的时候,其他两个线程的执行状态还是未完成,这就是和allOf这个方法最大的区别。

是否感觉只有自己试验过才会有更大的收获,今天就先分享到这,后面干货多多!!

感觉对您有所启发的话,记得帮忙点赞,收藏加关注,并且分享给需要的小伙伴!!加油!!


http://www.ppmy.cn/server/120113.html

相关文章

主流卷积神经网络CNN总结

ResNet&#xff08;2015&#xff09;残差神经网络 残差结构 ResNet50具体卷积结构图 ResNeXt&#xff08;2016&#xff09;加入了分组卷积的思想&#xff0c;将原ResNet网络中的block替换成由group分组的block&#xff0c;两者得到的feature map一致&#xff0c;只是参数量更少…

Python语言基础教程(下)4.0

✨博客主页&#xff1a; https://blog.csdn.net/m0_63815035?typeblog &#x1f497;《博客内容》&#xff1a;.NET、Java.测试开发、Python、Android、Go、Node、Android前端小程序等相关领域知识 &#x1f4e2;博客专栏&#xff1a; https://blog.csdn.net/m0_63815035/cat…

果蔬识别系统性能优化之路(五)

目录 前情提要剩下问题 解决方案新建storeFeature表实现ivf的动态增删改查 结语 前情提要 果蔬识别系统性能优化之路&#xff08;四&#xff09; 剩下问题 新建store_feature表&#xff0c;关联storeCode和featureId表&#xff0c;对数据库进行规范化&#xff0c;创建一个新…

Apache Airflow如何使用

Apache Airflow 是一个用于编排和调度任务的开源平台。它适用于创建、调度和监控数据工作流。以下是使用 Airflow 的基本步骤&#xff1a; 1. 安装 Apache Airflow 你可以通过以下命令来安装 Airflow&#xff1a; pip install apache-airflow建议使用虚拟环境来管理 Airflow…

ubuntu 执行定时任务crontab -e 无法输入的问题

界面显示 GNU nano 4.8 /tmp/crontab.l0A1HJ/crontab # Edit this file to introduce tasks to be run by cron. # # Each task to run has to be defined t…

一个基于 laravel 和 amis 开发的后台框架, 友好的组件使用体验,可轻松实现复杂页面(附源码)

前言 随着互联网应用的发展&#xff0c;后台管理系统的复杂度不断增加&#xff0c;对于开发者而言&#xff0c;既要系统的功能完备&#xff0c;又要追求开发效率的提升。然而&#xff0c;传统的开发方式往往会导致大量的重复劳动&#xff0c;尤其是在构建复杂的管理页面时。有…

【乐企】基础版接口代码实现

本文主要是基础版接口声明的实现,具体接口声明见基础版接口声明。具体请求工具类见接口请求工具类 代码如下: 1、服务编码枚举 /*** User: yanjun.hou* Date: 2024/8/30 14:45* Description:乐企服务编码枚举

python中Web开发框架的使用

Python 的 Web 开发框架种类繁多&#xff0c;常见的有 Django 和 Flask 这两个框架。它们各有优点&#xff0c;适合不同类型的 Web 应用开发需求。下面&#xff0c;我将详细介绍这两大主流框架的使用方法&#xff0c;让你快速上手 Python 的 Web 开发。 1. Django Django 是一…