【Java高并发】CompletableFuture基础(1):创建不同线程的子任务、子任务链式调用与异常处理

news/2025/2/1 6:22:19/

文章目录

    • 1. 三种实现接口
    • 2. 链式调用:保证链的顺序性与异步性
    • 3. CompletableFuture创建CompletionStage子任务
    • 4. 处理异常
      • a. 创建回调钩子
      • b. 调用handle()方法统一处理异常和结果
    • 5. 如何选择线程池:不同的业务选择不同的线程池

CompletableFuture是JDK 1.8引入的实现类,该类实现了Future和CompletionStage两个接口。该类的实例作为一个异步任务,可以在自己异步执行完成之后触发一些其他的异步任务,从而达到异步回调的效果。

CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会进入另一个阶段。一个阶段可以理解为一个子任务,每一个子任务会包装一个Java函数式接口实例,表示该子任务所要执行的操作。

 

1. 三种实现接口

每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。

这三个常用的函数式接口的特点如下:

被包装接口功能描述
FunctionFunction接口的特点是:有输入、有输出。包装了Function实例的CompletionStage子任务需要一个输入参数,并会产生一个输出结果到下一步
RunnableRunnable接口的特点是:无输入、无输出。包装了Runnable实例的CompletionStage子任务既不需要任何输入参数,又不会产生任何输出。
ConsumerConsumer接口的特点是:有输入、无输出。包装了Consumer实例的CompletionStage子任务需要一个输入参数,但不会产生任何输出。

 

2. 链式调用:保证链的顺序性与异步性

多个CompletionStage构成了一条任务流水线,一个环节执行完成了可以将结果移交给下一个环节(子任务)​。多个CompletionStage子任务之间可以使用链式调用。

下面是一个顺序调用的例子:

使用 CompletionStage 及其方法构建了一个异步任务链,thenApply 用于对前一个阶段的结果进行计算并传递结果,thenAccept 用于消费前一个阶段的结果并执行操作,thenRun 用于执行无输入输出的操作。

java">oneStage//被thenApply包装CompletionStage子任务,由输入输出.thenApply(x -> square(x))  //消耗上游输出,但是没有输出.thenAccept(y -> System.out.println(y)) //不消耗上一个子任务的输出又不产生结果.thenRun(() -> System.out.println()) 

这种链式操作可以方便地将多个异步操作连接起来,同时保证了操作的顺序性和异步性,提高了代码的可维护性和并发性能。

 

接下来是一个异步调用的例子:

在这个例子中,task2 和 task3 都依赖于 task1 完成后执行,但它们可能并行执行,也就是说,task2 和 task3 的执行顺序是不确定的,它们不一定会按照 thenRunAsync 的顺序执行。

java">CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {System.out.println("Task 1");
});CompletableFuture<Void> task2 = task1.thenRunAsync(() -> {System.out.println("Task 2");
});CompletableFuture<Void> task3 = task1.thenRunAsync(() -> {System.out.println("Task 3");
});

 

3. CompletableFuture创建CompletionStage子任务

CompletableFuture定义了一组方法用于创建CompletionStage子任务(或者阶段性任务)​,基础的方法如下:

java">//子任务包装一个Runnable实例,并调用ForkJoinPool.commonPool()线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable)//子任务包装一个Runnable实例,并调用指定的executor线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)//子任务包装一个Supplier实例,并调用ForkJoinPool.commonPool()线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)//子任务包装一个Supplier实例,并使用指定的executor线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

其中主要注意的信息是:

  1. Supplier 表示一个无参数但有返回值的函数,Runnable表示无惨无返回值的函数
  2. 在使用CompletableFuture创建CompletionStage子任务时,如果没有指定Executor线程池,在默认情况下CompletionStage会使用公共的ForkJoinPool线程池。
  3. 它们都会交给线程池执行,get方法会堵塞主线程等待执行结果。

给一个例子:

java">//无返回值异步调用  
@Test  
public void runAsyncDemo() throws Exception {  CompletableFuture<Void> future = CompletableFuture.runAsync(() ->  {  sleepSeconds(1);//模拟执行1秒  Print.tco("run end ...");  });  //等待异步任务执行完成,最多等待2秒  future.get(2, TimeUnit.SECONDS);  
}  //有返回值异步调用  
@Test  
public void supplyAsyncDemo() throws Exception {  CompletableFuture<Long> future = CompletableFuture.supplyAsync(() ->  {  long start = System.currentTimeMillis();  sleepSeconds(1);//模拟执行1秒  Print.tco("run end ...");  return System.currentTimeMillis() - start;  });  //等待异步任务执行完成,现时等待2秒  long time = future.get(2, TimeUnit.SECONDS);  Print.tco("异步执行耗时(秒) = " + time / 1000);  
}

 

4. 处理异常

a. 创建回调钩子

可以为CompletionStage子任务设置特定的回调钩子,当计算结果完成或者抛出异常的时候,执行这些特定的回调钩子。

java">//设置子任务完成时的回调钩子
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)//设置子任务完成时的回调钩子,可能不在同一线程执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)//设置子任务完成时的回调钩子,提交给线程池executor执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action,Executor executor)//设置异常处理的回调钩子
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
java">@Test  
public void whenCompleteDemo() throws Exception {  CompletableFuture<Void> future = CompletableFuture.runAsync(() ->  {  sleepSeconds(1);//模拟执行1秒  Print.tco("抛出异常!");  throw new RuntimeException("发生异常");  //Print.tco("run end ...");  });  //设置执行完成后的回调钩子  future.whenComplete(new BiConsumer<Void, Throwable>() {  @Override  public void accept(Void t, Throwable action) {  Print.tco("执行完成!");  }  });  //设置发生异常后的回调钩子  future.exceptionally(new Function<Throwable, Void>() {  @Override  public Void apply(Throwable t) {  Print.tco("执行失败!" + t.getMessage());  return null;  }  });  future.get();  
}[ForkJoinPool.commonPool-worker-9]:抛出异常!
[main]:执行完成!
[ForkJoinPool.commonPool-worker-9]:执行失败!java.lang.RuntimeException: 发生异常
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 发生异常

有如下几个注意点:

  1. 调用cancel()方法取消CompletableFuture时,任务被视为异常完成,completeExceptionally()方法所设置的异常回调钩子也会被执行。
  2. 如果没有设置异常回调钩子,发生内部异常时会有两种情况发生:
    • 在调用get()时,如果遇到内部异常,get()方法就会抛出ExecutionException(执行异常)​。
    • 在调用join()和getNow(T)启动任务时,如果遇到内部异常,join()和getNow(T)方法就会抛出CompletionException。

 

b. 调用handle()方法统一处理异常和结果

java">//在执行任务的同一个线程中处理异常和结果
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);//可能不在执行任务的**同一个线程**中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);//在指定线程池executor中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
java">@Test  
public void handleDemo() throws Exception {  CompletableFuture<Void> future = CompletableFuture.runAsync(() ->  {  sleepSeconds(1);//模拟执行1秒  Print.tco("抛出异常!");  throw new RuntimeException("发生异常");  //Print.tco("run end ...");  });  //设置执行完成后的回调钩子  future.handle(new BiFunction<Void, Throwable, Void>() {  @Override  public Void apply(Void input, Throwable throwable) {  if (throwable == null) {  Print.tcfo("没有发生异常!");  } else {  Print.tcfo("sorry,发生了异常!");  }  return null;  }  });  future.get();  
}//
//[ForkJoinPool.commonPool-worker-1]:抛出异常! 
//[ForkJoinPool.commonPool-worker-1|CompletableFutureDemo$3.apply]: sorry,发生了异常!

 

5. 如何选择线程池:不同的业务选择不同的线程池

默认情况下,通过静态方法runAsync()、supplyAsync()创建的CompletableFuture任务会使用公共的ForkJoinPool线程池,默认的线程数是CPU的核数。当然,它的线程数可以通过以下JVM参数设置

java">     option:-Djava.util.concurrent.ForkJoinPool.common.parallelism

 

如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的IO操作,就会导致线程池中的所有线程都阻塞在IO操作上,造成线程饥饿,进而影响整个系统的性能。所以,强烈建议大家根据不同的业务类型创建不同的线程池,以避免互相干扰。


http://www.ppmy.cn/news/1568359.html

相关文章

使用CSS实现一个加载的进度条

文章目录 使用CSS实现一个加载的进度条一、引言二、步骤一&#xff1a;HTML结构与CSS基础样式1、HTML结构2、CSS基础样式 三、步骤二&#xff1a;添加动画效果1、使用CSS动画2、结合JavaScript控制动画 四、使用示例五、总结 使用CSS实现一个加载的进度条 一、引言 在现代网页…

力扣017_最小覆盖字串题解----C++

题目描述 我们可以用滑动窗口的思想解决这个问题。在滑动窗口类型的问题中都会有两个指针&#xff0c;一个用于「延伸」现有窗口的 r 指针&#xff0c;和一个用于「收缩」窗口的 l 指针。在任意时刻&#xff0c;只有一个指针运动&#xff0c;而另一个保持静止。我们在 s 上滑动…

Vue.js 比较 Composition API 和 Options API

Vue.js 比较 Composition API 和 Options API 今天我们来聊聊 Vue.js 中的两种编写组件的方式&#xff1a;Options API 和 Composition API。如果你对这两者的区别感到困惑&#xff0c;或者不知道在什么情况下选择哪种方式&#xff0c;那么这篇文章将为你解答。 Options API …

JavaScript - Web APIs(上)

Web API 介绍 严格意义上讲&#xff0c;我们在 JavaScript 阶段学习的知识绝大部分属于 ECMAScript 的知识体系&#xff0c;ECMAScript 简称 ES 它提供了一套语言标准规范&#xff0c;如变量、数据类型、表达式、语句、函数等语法规则都是由 ECMAScript 规定的。浏览器将 ECM…

linux环境变量配置文件区别 /etc/profile和~/.bash_profile

在 Linux 系统中&#xff0c;环境变量可以定义用户会话的行为&#xff0c;而这些变量的加载和配置通常涉及多个文件&#xff0c;如 ~/.bash_profile 和 /etc/profile。这些文件的作用和加载时机各有不同。以下是对它们的详细区别和用途的说明&#xff1a; 文章目录 1. 环境变量…

B站吴恩达机器学习笔记

机器学习视频地址&#xff1a; 4.5 线性回归中的梯度下降_哔哩哔哩_bilibili 损失函数学习地址&#xff1a; 损失函数选择 选凸函数的话&#xff0c;会收敛到全局最小值。证明凸函数用Hessian矩阵。凸函数定义&#xff1a;两点连线比线上所有点都大。 batch理解&#xff1…

【面试】【详解】计算机网络(TCP 三次握手,四次挥手)

一、计算机网络详解 &#xff08;一&#xff09;计算机网络概述 定义&#xff1a;计算机网络是通过传输介质将多台计算机连接起来&#xff0c;以实现数据通信和资源共享的系统。 功能&#xff1a; (1) 数据通信&#xff1a;实现不同设备之间的数据传输。 (2) 资源共享&#…

电子电气架构 --- 在智能座舱基础上定义人机交互

我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 简单&#xff0c;单纯&#xff0c;喜欢独处&#xff0c;独来独往&#xff0c;不易合同频过着接地气的生活…