CompletableFuture如何优雅处理异步任务超时!妙就完了

news/2024/9/22 16:44:50/

文章目录

  • 1. 主要解决哪些业务痛点?
  • 2. 流程分析
  • 3. 上代码
  • 4. 总结一波

1. 主要解决哪些业务痛点?

小强最近一直没打黑神话悟空,闷闷不乐的,我问咋回事,最近有啥烦心事么?

他不爽的跟我说了当他CompletableFuture进行任务编排时,会发现一个问题,当一个子线程去执行任务,如果任务执行时间很长,导致后面的任务一直阻塞,他在想有没有一种办法,让子线程具有等待超时的特性。

小强对编程的热情确实是高,那我们给他一起分析一下!!

其实在CompletableFuture中,提供了CompletableFuture.get(long timeout, TimeUnit unit) 方法,可以设置超时等待时间,但是这个是对于主线程而言的,java8中,子线程是没有办法去设置等待超时时间的

其实通俗来讲就是:

就是调用CompletableFuture.supplyAsync()相关方法时,不能够传入子线程的等待时间,因为在很多时候会遇到使用上的一些拘束:

为了让大家了解更清楚,我们带着这个问题去看一个场景题:

接下来看一个场景:

2. 流程分析

流程大体如下:

  1. 我们的主线程会同时起一个异步线程1和异步线程2,并且异步线程1会执行任务A,异步线程2会执行B。
  2. 任务A和任务B的执行时间时不确定的,可能是1秒或者是20秒。但是我们的异步线程只会等待两秒中,如果没执行完,对于A会执行兜底任务B,对于C会执行兜底任务D。

可以思考下:

那我们如何实现子线程的等待超时,可能一下子会去想到CompletableFuture.get(),但是这个是对于主线程而言的,阻塞的粒度太粗了,那如何把超时等待下放到每一个子线程去独立控制呢?

整体思路:

  1. 异步的超时控制,比如定时3s钟,肯定在3秒钟会检查任务是否执行完,肯定会有一个定时器对象到3秒钟去检查。
  2. 检查完之后,如果任务还没完成,不需要等待,直接返回默认值,走接下来的逻辑。

3. 上代码

代码搞起:

并发引擎工具类如下:

java">public class CompletableFutureTimeoutEngine {static ScheduledThreadPoolExecutor delayer;static ExecutorService executor = Executors.newCachedThreadPool();//定义一个延迟任务器,用来设置执行超时时间后需要执行的任务
static {delayer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);t.setName("CompletableFutureDelayScheduler");return t;}});delayer.setRemoveOnCancelPolicy(true);
}//当超时时间到时,需要抛出超时异常
public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {CompletableFuture<T> result = new CompletableFuture<T>();delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);return result;
}public static <T> CompletableFuture<T> completeOnTimeout(Supplier<T> supplier, long timeout, TimeUnit unit, Function<Throwable, T> function) {return completeOnTimeout(supplier, timeout, unit, function, null);
}/*** @param t                超时默认返回值* @param supplier         任务* @param timeout          超时时间* @param unit             事件单位* @param throwableHandler 异常处理* @param <T>              任务的类型* @return 任务的返回CompletableFuture<T>*/
public static <T> CompletableFuture<T> completeOnTimeout(Supplier<T> supplier, long timeout,TimeUnit unit, Function<Throwable, T> throwableHandler, T t) {return CompletableFuture.supplyAsync(supplier, executor).applyToEither(timeoutAfter(timeout, unit), Function.identity()).exceptionally((throwable) -> {Throwable cause = throwable.getCause();if (cause instanceof TimeoutException) {return t;}return throwableHandler.apply(cause);});
}}

上面主要定义了一个延迟任务器,主要用来执行超时时间后需要执行的任务,在指定的超时时间到达时,会在timeoutAfter方法抛出超时异常,主方法completeOnTimeout用来捕获是否是超过超时时间抛出的超时异常,如果是,则返回默认值,如果不是,执行正常任务的返回。

接下来,我们去调用一下:

java">public class TimeoutTest {private static void sleep(Long time) {try {Thread.sleep(time);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<Integer> completableFutureA = CompletableFutureTimeoutEngine.completeOnTimeout(() -> {sleep(4000L);return 1;}, 2, TimeUnit.SECONDS, (throwable -> {throw new BusinessException("111");}), null);CompletableFuture<Integer> completableFutureB = completableFutureA.thenApply(s -> {if (s == null) {//处理B任务System.out.println("任务A超时,执行B");sleep(1000L);return 1;}return s;});CompletableFuture<Integer> completableFutureC = CompletableFutureTimeoutEngine.completeOnTimeout(() -> {sleep(5000L);return 10;}, 3, TimeUnit.SECONDS, (throwable -> {throw new BusinessException("111");}), null);CompletableFuture<Integer> completableFutureD = completableFutureC.thenApply(s -> {if (s == null) {//处理D任务System.out.println("任务C超时,执行D");sleep(1000L);return 2;}return s;});CompletableFuture<Integer> future = completableFutureB.thenCombine(completableFutureD, Integer::sum);Integer result = future.get(6, TimeUnit.SECONDS);System.out.println("sum = " + result);}}

这个demo的实行顺序就是 completableFutureA -> completableFutureB 和 completableFutureC -> completableFutureD 分别并行执行,future最后拿到指定的结果和。

去执行一下看看:

你会发现completableFutureA的超时等待时间是2秒,正常的任务执行是4s,因此返回超时默认值null,用于和completableFutureB做处理,completableFutureB判断传进来是null,返回值为1。

同样的,completableFutureC的超时等待时间是3秒,正常的任务执行是5s,因此返回超时默认值null,用于和completableFutureD做处理,completableFutureD判断传进来是null,返回值为2。

future对completableFutureB和completableFutureD做聚合,值为3。

4. 总结一波

上面的代码就实现了completableFuture子线程具有超时的特性。是不是看完之后恍然大悟,其实很多中间件都是基于java基础做的。

但是这种方式在并发不高的情况下,可以让作为工具类控制并发流转,但是在并发很高的情况下,定时器机制可能是一个瓶颈,需要换成时间轮或者在业务里面处理超时,具体看自己的业务场景。

感觉对您有所启发的话,记得帮忙点赞,收藏加关注,并且分享给需要的小伙伴!!加油!!


http://www.ppmy.cn/news/1528919.html

相关文章

从IPC摄像机读取视频帧解码并转化为YUV数据到转化为Bitmap

前言 本文主要介绍根据IPC的RTSP视频流地址,连接摄像机,并持续读取相机视频流,进一步进行播放实时画面,或者处理视频帧,将每一帧数据转化为安卓相机同格式数据,并保存为bitmap。 示例 val rtspClientListener = object: RtspClient.RtspClientListener {override fun …

python爬虫初体验(二)

在Python中&#xff0c;每个模块都有一个内置的变量 name&#xff0c;用于表示当前模块的名称。当一个Python文件被执行时&#xff0c;Python解释器会首先将该文件作为一个模块导入&#xff0c;并执行其中的代码。此时&#xff0c;__name__的值为模块的名称。 作用 模块可被导…

CSS概览

概述 是什么 cascading style css 层叠样式表 由W3C制定的网页元素定义规则 为什么 美化 怎么办 设置样式 布局 css 引入 内部样式表 在head标签内部使用style标签 <html><head><style>.id{width: 400px;height: 400px;border: 1px solid black;ma…

面试金典题2.1

编写代码&#xff0c;移除未排序链表中的重复节点。保留最开始出现的节点。 示例1: 输入&#xff1a;[1, 2, 3, 3, 2, 1]输出&#xff1a;[1, 2, 3]示例2: 输入&#xff1a;[1, 1, 1, 1, 2]输出&#xff1a;[1, 2]提示&#xff1a; 链表长度在[0, 20000]范围内。链表元素在[0…

jQuery css() 方法

jQuery css() 方法 引言 在网页设计和开发中&#xff0c;样式是至关重要的&#xff0c;它决定了网页的视觉效果和用户体验。jQuery&#xff0c;作为一个广泛使用的JavaScript库&#xff0c;提供了强大的DOM操作能力&#xff0c;其中css()方法便是用于操作和获取元素样式的关键…

Qt优秀开源项目之二十三:QSimpleUpdater

QSimpleUpdater是开源的自动升级模块&#xff0c;用于检测、下载和安装更新。 github地址&#xff1a;https://github.com/alex-spataru/QSimpleUpdater QSimpleUpdater目前Star不多&#xff08;911个&#xff09;&#xff0c;但已在很多开源项目看到其身影&#xff0c;比如Not…

店铺所有商品API接口解析,用JSON格式的示例

以下是一个店铺所有商品接口数据的 JSON 格式示例&#xff1a; { "status": "success", "message": "获取商品列表成功", "data": [ { "product_id": "123456", "name": "商品名称1&qu…

BMC 虚拟i2c访问PCA9545(switch芯片)后面的设备,为什么找不到PCA9545?

1.说明 1.1 背景 无意中看到PCA9545(switch芯片)后面有设备&#xff0c;但是PCA9545设备本身是连接到物理设备i2c上的&#xff0c;然而扫描该物理i2c bus&#xff0c;却找不到该设备。此篇文章主要找一下该原因的。 1.2 参考代码 当前使用的是ast2600芯片&#xff0c;可参考…