CompletableFuture:supplyAsync与runAsync

ops/2024/11/8 20:47:12/

CompletableFuture是Java 8中引入的一个类,用于简化异步编程和并发操作。它提供了一种方便的方式来处理异步任务的结果,以及将多个异步任务组合在一起执行。CompletableFuture支持链式操作,使得异步编程更加直观和灵活。

在引入CompletableFuture之前,Java已经有了Future接口来表示异步计算的结果,但是它的功能相对有限,无法轻松实现复杂的异步操作链。CompletableFuture通过提供更丰富的方法和操作,使得异步编程变得更加便捷。

CompletableFuture实现了Future接口, CompletionStage接口,成为JDK8多任务协同场景下一个有效利器。

提交有返回值的异步任务

package com.neo;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class DemoCompletableFuture {private static  Logger logger = LoggerFactory.getLogger(DemoCompletableFuture.class);public static void main(String[] args) throws Exception {//提交一个CompletableFuture任务CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {long start = System.currentTimeMillis();try {TimeUnit.SECONDS.sleep(5);} catch (Exception e) {e.printStackTrace();}long end = System.currentTimeMillis();logger.info("@@ 打印执行耗时:" +(end - start)  + " ms");return 1;});logger.info("CompletableFuture.supplyAsync 开始" );//通过get方法阻塞获取任务执行结果logger.info("CompletableFuture.supplyAsync 执行结果: {}", task.get());logger.info("CompletableFuture.supplyAsync 结束");}}

输出结果如下,可以看出CompletableFuture的get方法会阻塞主线程工作,直到得到返回值为止。

13:39:32.976 [main] INFO com.neo.DemoCompletableFuture - CompletableFuture.supplyAsync 开始
13:39:37.985 [ForkJoinPool.commonPool-worker-9] INFO com.neo.DemoCompletableFuture - @@ 打印执行耗时:5011 ms
13:39:37.986 [main] INFO com.neo.DemoCompletableFuture - CompletableFuture.supplyAsync 执行结果: 1
13:39:37.990 [main] INFO com.neo.DemoCompletableFuture - CompletableFuture.supplyAsync 结束

对此我们来看看get方法是如何做到阻塞主线程并等待异步线程任务执行完成的。

从下面这段源码我们可以看到get方法的执行步骤:

/*** Waits if necessary for this future to complete, and then* returns its result.** @return the result value* @throws CancellationException if this future was cancelled* @throws ExecutionException if this future completed exceptionally* @throws InterruptedException if the current thread was interrupted* while waiting*/
public T get() throws InterruptedException, ExecutionException {Object r;return reportGet((r = result) == null ? waitingGet(true) : r);
}

reportGet函数分析

/*** Reports result using Future.get conventions.*/
private static <T> T reportGet(Object r)throws InterruptedException, ExecutionException {if (r == null) // by convention below, null means interruptedthrow new InterruptedException();if (r instanceof AltResult) {Throwable x, cause;if ((x = ((AltResult)r).ex) == null)return null;if (x instanceof CancellationException)throw (CancellationException)x;if ((x instanceof CompletionException) &&(cause = x.getCause()) != null)x = cause;throw new ExecutionException(x);}@SuppressWarnings("unchecked") T t = (T) r;return t;
}

这是CompletableFuture类中的一个私有静态方法reportGet,用于报告异步任务执行的结果,遵循Future.get的约定。让我们逐步分析这个方法:

参数类型

private static <T> T reportGet(Object r)    
throws InterruptedException, ExecutionException 

这是一个泛型方法,接收一个Object类型的参数r,表示异步任务的结果。

判断结果是否为null

if (r == null)     
throw new InterruptedException(); 

如果结果r为null,按照惯例表示任务被中断,此时抛出InterruptedException。

处理AltResult

if (r instanceof AltResult) {     
// ...
} 

如果结果是AltResult类型,说明异步任务执行过程中发生了异常。进入AltResult的处理逻辑。

获取异常信息并抛出相应异常

Throwable x, cause;
if ((x = ((AltResult)r).ex) == null)return null;
if (x instanceof CancellationException)throw (CancellationException)x;
if ((x instanceof CompletionException) &&(cause = x.getCause()) != null)x = cause;
throw new ExecutionException(x);

如果AltResult中的异常ex为null,说明异步任务被取消,返回null。

如果异常是CancellationException,抛出CancellationException。

如果异常是CompletionException,获取它的原因(cause),如果有原因就将异常替换为原因,最终抛出ExecutionException。

类型转换并返回结果

@SuppressWarnings("unchecked") T t = (T) r;
return t; 

最后,将r强制类型转换为泛型类型T,然后返回。

这个方法主要负责处理异步任务执行结果中可能涉及的异常情况,并根据Future.get的约定进行适当的处理。

waitingGet函数分析

/*** Returns raw result after waiting, or null if interruptible and* interrupted.*/
private Object waitingGet(boolean interruptible) {Signaller q = null;boolean queued = false;int spins = -1;Object r;while ((r = result) == null) {if (spins < 0)spins = (Runtime.getRuntime().availableProcessors() > 1) ?1 << 8 : 0; // Use brief spin-wait on multiprocessorselse if (spins > 0) {if (ThreadLocalRandom.nextSecondarySeed() >= 0)--spins;}else if (q == null)q = new Signaller(interruptible, 0L, 0L);else if (!queued)queued = tryPushStack(q);else if (interruptible && q.interruptControl < 0) {q.thread = null;cleanStack();return null;}else if (q.thread != null && result == null) {try {ForkJoinPool.managedBlock(q);} catch (InterruptedException ie) {q.interruptControl = -1;}}}if (q != null) {q.thread = null;if (q.interruptControl < 0) {if (interruptible)r = null; // report interruptionelseThread.currentThread().interrupt();}}postComplete();return r;
}

这是CompletableFuture类中的一个私有方法waitingGet,用于在异步任务完成前等待其结果。让我们逐步分析这个方法:

初始化变量

Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;

这里初始化了一些变量,包括一个Signaller对象q,一个表示是否已经将任务推入栈的标志queued,一个用于自旋等待的计数spins,以及用于存储异步任务结果的变量r

自旋等待任务完成

while ((r = result) == null) {// 自旋等待任务完成
}

在这个循环中,不断检查result是否为null,如果为null,说明任务还未完成,就继续等待。

自旋等待策略

if (spins < 0)spins = (Runtime.getRuntime().availableProcessors() > 1) ?1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {if (ThreadLocalRandom.nextSecondarySeed() >= 0)--spins;
}
  • 如果spins为负值,根据当前系统的处理器数量决定是否使用自旋等待。如果有多个处理器,使用brief spin-wait。
  • 如果spins大于0,且随机数为正,则减少spins,继续自旋等待。

创建和推送Signaller对象

else if (q == null)q = new Signaller(interruptible, 0L, 0L);
else if (!queued)queued = tryPushStack(q);
  • 如果q为null,创建一个Signaller对象。Signaller是用于协调等待的辅助类。
  • 如果q已创建但未推送到栈中,尝试推送到栈中。

处理中断

else if (interruptible && q.interruptControl < 0) {q.thread = null;cleanStack();return null;
}
    • 如果支持中断且q.interruptControl小于0,表示中断发生,清理相关状态并返回null。

使用ManagedBlocker进行等待

 else if (q.thread != null && result == null) {try {ForkJoinPool.managedBlock(q);} catch (InterruptedException ie) {q.interruptControl = -1;}
}
  • 如果q.thread不为null,且任务未完成,使用ForkJoinPool.managedBlock进行等待。这是一种协作式的等待方式。

处理中断和结果

if (q != null) {q.thread = null;if (q.interruptControl < 0) {if (interruptible)r = null; // report interruptionelseThread.currentThread().interrupt();}
}
  • 清理Signaller对象的状态。
  • 如果支持中断,根据中断控制状态设置返回值r为null或者中断当前线程。

完成异步任务后的处理

postComplete();

最后,调用postComplete方法,该方法用于处理异步任务完成后的一些后续操作。

返回结果

return r;

返回异步任务的结果。

这个方法主要负责等待异步任务的完成,使用了一些自旋等待、协作式等待和中断处理的策略,确保在任务完成后能够正确返回结果。

提交无返回值的异步任务

通过runAsync提交一个无返回值的异步任务,这里我们为了实现任务执行完成再关闭主线程用了个get阻塞等待任务完成。

package com.neo;
/*** @Author zhangt* @create 2023/11/10*/import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** @author zhangt* @date 2023年11月10日*/
public class DemoCompletableFuture2 {private static  Logger logger = LoggerFactory.getLogger(DemoCompletableFuture2.class);public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> supplyAsync = CompletableFuture.runAsync(() -> {long start = System.currentTimeMillis();logger.info(Thread.currentThread().getName() + "开始执行时间:" + start);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}logger.info(Thread.currentThread().getName() + "结束总执行时间:" + (System.currentTimeMillis() - start));});logger.info("CompletableFuture.supplyAsync 主线程开始运行" );//get阻塞主线程等待任务结束logger.info("get阻塞主线程等待任务结束 :" + supplyAsync.get());logger.info("CompletableFuture.supplyAsync 主线程运行结束");}}

返回结果

15:29:59.922 [ForkJoinPool.commonPool-worker-9] INFO com.neo.DemoCompletableFuture2 - ForkJoinPool.commonPool-worker-9开始执行时间:1699860599920
15:29:59.922 [main] INFO com.neo.DemoCompletableFuture2 - CompletableFuture.supplyAsync 主线程开始运行
15:30:00.935 [ForkJoinPool.commonPool-worker-9] INFO com.neo.DemoCompletableFuture2 - ForkJoinPool.commonPool-worker-9结束总执行时间:1015
15:30:00.935 [main] INFO com.neo.DemoCompletableFuture2 - get阻塞主线程等待任务结束 :null
15:30:00.935 [main] INFO com.neo.DemoCompletableFuture2 - CompletableFuture.supplyAsync 主线程运行结束

区别

CompletableFuture.supplyAsync和CompletableFuture.runAsync都是用于创建异步任务的方法,但它们在任务的类型和返回值处理上有一些区别。

CompletableFuture.supplyAsync

任务类型: 用于执行有返回值的异步任务。任务由Supplier提供,不接收任何参数,返回一个结果。

方法签名

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

示例

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 有返回值的异步任务return "Hello, CompletableFuture!";
});

CompletableFuture.runAsync

任务类型: 用于执行没有返回值的异步任务。任务由Runnable提供,不返回任何结果。

方法签名

public static CompletableFuture<Void> runAsync(Runnable runnable)

示例

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {// 没有返回值的异步任务System.out.println("Running async task");
});

区别总结

CompletableFuture.supplyAsync用于执行有返回值的异步任务,接收一个Supplier,返回一个CompletableFuture对象,可获取异步任务的结果。

CompletableFuture.runAsync用于执行没有返回值的异步任务,接收一个Runnable,返回一个CompletableFuture对象,表示异步任务执行完毕。

这两个方法都允许通过传递Executor来指定异步任务的执行线程。例如,可以使用

CompletableFuture.supplyAsync(supplier, executor)

CompletableFuture.runAsync(runnable, executor)

来指定特定的线程池。


http://www.ppmy.cn/ops/132027.html

相关文章

C++ vector

1.vector的介绍 vector的文档介绍 2.vector的使用 vector学习时一定要学会查看文档&#xff1a;vector文档介绍&#xff0c;vector在实际中非常的重要。 在实际中 我们熟悉常见的接口就可以&#xff0c;下面列出了哪些接口是要重点掌握的。 2.1vector的定义 2.2vector ite…

C++入门基础知识140—【关于C++ 类构造函数 析构函数】

成长路上不孤单&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a; 【14后&#x1f60a;///C爱好者&#x1f60a;///持续分享所学&#x1f60a;///如有需要欢迎收藏转发///&#x1f60a;】 今日分享关于C 类构造函数 & 析构函数的相关内容…

基于百度飞桨paddle的paddlepaddle2.4.2等系列项目的运行

PPASR 必看&#xff01;&#xff01;&#xff01; PaddleSpeech develop --> PaddlePaddle 2.5.0/2.5.1 PaddleSpeech < 1.4.1 --> PaddlePaddle < 2.4.2 1.创建虚拟环境 conda create --name test python3.10 2.激活环境&#xff0c;安装ppasr的paddlepaddl…

基于OpenCV的相机捕捉视频进行人脸检测--米尔NXP i.MX93开发板

本篇测评由优秀测评者“eefocus_3914144”提供。 本文将介绍基于米尔电子MYD-LMX93开发板&#xff08;米尔基于NXP i.MX93开发板&#xff09;的基于OpenCV的人脸检测方案测试。 OpenCV提供了一个非常简单的接口&#xff0c;用于相机捕捉一个视频(我用的电脑内置摄像头) 1、安…

【WebRTC】视频采集模块流程的简单分析

目录 1.从摄像头获取视频帧&#xff08;CaptureInputPin::Receive()&#xff09;2.处理摄像头获取的帧&#xff08;CaptureSinkFilter::ProcessCaptureFrame())3.处理Windows层帧信息并发送到帧处理器&#xff08;VideoCaptureImpl::IncomingFrame())4.帧处理器&#xff08;Vid…

【系统架构设计师】2021年真题论文: 论面向方面的编程技术及其应用(AOP)(包括解题思路和素材)

更多内容请见: 备考系统架构设计师-专栏介绍和目录 文章目录 真题题目(2021年 试题1)解题思路面向方面的编程(AOP)技术概念和原理AOP的核心机制AOP 在软件系统开发中的应用场景AOP 应用案例分析论文素材参考真题题目(2021年 试题1) 面向过程编程是一种自顶向下的编程方…

2024MoonBit全球编程创新挑战赛参赛作品“飞翔的小鸟”技术开发指南

本文转载自 CSDN&#xff1a;https://blog.csdn.net/m0_61243965/article/details/143510089作者&#xff1a;言程序plus 实战开发基于moonbit和wasm4的飞翔的小鸟游戏 游戏中&#xff0c;玩家需要通过上下左右按键控制Bird&#xff0c;在不断移动的障碍pipe之间穿梭&#xf…

网络技术---网络通信概述

网络通信概述 网络通信概述一、网络通信的层次二、OSI七层网络模型三、TCP/IP五层模型1. 物理层1.1 物理层作用1.2 物理层概述1.3 信号调制1.4 信道复用技术 2. 数据链路层2.1 数据链路层作用2.2 点对点数据链路层三个基本问题2.3 滑动窗口协议2.4 以太网 3. 网络层3.1 概述3.2…