Java基础-CompletableFuture

embedded/2024/10/20 9:48:48/

CompletableFuture 是 Java 8 中引入的一个实现异步编程类。提供了一组丰富的方法来处理异步操作和多个任务的结果。

执行任务

可以使用CompletableFuture.supplyAsync()或者CompletableFuture.runAsync创建CompletableFuture对象,并执行任务。

supplyAsync

<U> CompletableFuture<U> CompletableFuture.supplyAsync(Supplier<U> supplier)方法用于执行具有返回值的任务,并在任务完成时返回结果。

java">public class CompletableFutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {long start=System.currentTimeMillis();CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();});System.out.printf("任务启动耗时:%d\n",System.currentTimeMillis()-start);//阻塞等待到任务执行完毕System.out.printf("任务执行结果:%s\n",supplyAsync.get());System.out.printf("任务执行耗时:%d\n",System.currentTimeMillis()-start);}
}
//结果
任务启动耗时:31
任务执行结果:org.example.study.CompletableFutureTest
任务执行耗时:3044

任务会在默认的 ForkJoinPool 中异步执行。CompletableFuture.supplyAsync(Supplier<U> supplier):

java">public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {private static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1);private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);}static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {if (f == null) throw new NullPointerException();CompletableFuture<U> d = new CompletableFuture<U>();e.execute(new AsyncSupply<U>(d, f));return d;}
}

<U> CompletableFuture<U> CompletableFuture.supplyAsync(Supplier<U> supplier,Executor executor)在自定义线程池中异步执行:

java">public class CompletableFutureTest {private static final Executor executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {long start = System.currentTimeMillis();CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();}, executor);System.out.printf("任务启动耗时:%d\n", System.currentTimeMillis() - start);//阻塞等待直到任务执行完毕System.out.printf("任务执行结果:%s\n", supplyAsync.get());System.out.printf("任务执行耗时:%d\n", System.currentTimeMillis() - start);//关闭线程executor.shutdown();}
}

runAsync

CompletableFuture<Void> CompletableFuture.runAsync(Runnable runnable) 方法用于执行没有返回值的任务。默认在 ForkJoinPool 中异步执行。

java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {runAsync();}private static void runAsync() throws Exception{long start = System.currentTimeMillis();//Runnable接口run方法CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(CompletableFutureTest.class.getName());});System.out.printf("任务启动耗时:%d\n", System.currentTimeMillis() - start);//阻塞等待直到任务执行完毕//阻塞等待直到任务执行完毕System.out.printf("任务执行结果:%s\n", runAsync.get());System.out.printf("任务执行耗时:%d\n", System.currentTimeMillis() - start);}
}
//结果
任务启动耗时:29
org.example.study.CompletableFutureTest
任务执行结果:null
任务执行耗时:3041

CompletableFuture<Void> CompletableFuture.runAsync(Runnable runnable,Executor executor) 方法也可以执行在自定义线程池中:

java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void runAsync() throws Exception{long start = System.currentTimeMillis();//Runnable接口run方法CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(CompletableFutureTest.class.getName());},executor);System.out.printf("任务启动耗时:%d\n", System.currentTimeMillis() - start);//阻塞等待直到任务执行完毕System.out.printf("任务执行结果:%s\n", runAsync.get());System.out.printf("任务执行耗时:%d\n", System.currentTimeMillis() - start);executor.shutdown();}
}
//结果
任务启动耗时:30
org.example.study.CompletableFutureTest
任务执行结果:null
任务执行耗时:3044

获取结果

get

T get() 方法是 CompletableFuture 类提供的一种获取任务结果的方式,它会阻塞当前线程,直到任务完成并返回结果。

java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {long start = System.currentTimeMillis();CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();}, executor);System.out.printf("任务启动耗时:%d\n", System.currentTimeMillis() - start);//阻塞等待直到任务执行完毕System.out.printf("任务执行结果:%s\n", supplyAsync.get());System.out.printf("任务执行耗时:%d\n", System.currentTimeMillis() - start);executor.shutdown();}
}
//结果
任务启动耗时:29
任务执行结果:org.example.study.CompletableFutureTest
任务执行耗时:3032

T get(long timeout, TimeUnit unit)在指定时间timeout内未完成任务,抛TimeoutException

java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {try {long start = System.currentTimeMillis();CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();}, executor);System.out.printf("任务启动耗时:%d\n", System.currentTimeMillis() - start);//超时抛异常System.out.printf("任务执行结果:%s\n", supplyAsync.get(2000,TimeUnit.MILLISECONDS));System.out.printf("任务执行耗时:%d\n", System.currentTimeMillis() - start);} catch (Exception e) {e.printStackTrace();} finally {executor.shutdown();}}
}
//结果
任务启动耗时:29
Exception in thread "main" java.util.concurrent.TimeoutExceptionat java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)at org.example.study.CompletableFutureTest.supplyAsync(CompletableFutureTest.java:51)at org.example.study.CompletableFutureTest.main(CompletableFutureTest.java:15)

get()发生异常直接抛出:

java">public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {public T get() throws InterruptedException, ExecutionException {Object r;return reportGet((r = result) == null ? waitingGet(true) : r);}private static <T> T reportGet(Object r)throws InterruptedException, ExecutionException {if (r == null) // by convention below, null means interruptedthrow new InterruptedException();if (r instanceof AltResult) {Throwable x, cause;if ((x = ((AltResult)r).ex) == null)return null;if (x instanceof CancellationException)throw (CancellationException)x;if ((x instanceof CompletionException) &&(cause = x.getCause()) != null)x = cause;throw new ExecutionException(x);}@SuppressWarnings("unchecked") T t = (T) r;return t;}
}

join

T join() 方法是 CompletableFuture 类提供的一种获取任务结果的方式,它会阻塞当前线程,直到任务完成并返回结果。

java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {try {long start = System.currentTimeMillis();CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();}, executor);System.out.printf("任务启动耗时:%d\n", System.currentTimeMillis() - start);//阻塞等待直到任务执行完毕System.out.printf("任务执行结果:%s\n", supplyAsync.join());System.out.printf("任务执行耗时:%d\n", System.currentTimeMillis() - start);} catch (Exception e) {e.printStackTrace();} finally {executor.shutdown();}}
}
//结果
任务启动耗时:29
任务执行结果:org.example.study.CompletableFutureTest
任务执行耗时:3031

join()发生异常时包在CompletionException中抛出:

java">public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {public T join() {Object r;return reportJoin((r = result) == null ? waitingGet(false) : r);}private static <T> T reportJoin(Object r) {if (r instanceof AltResult) {Throwable x;if ((x = ((AltResult)r).ex) == null)return null;if (x instanceof CancellationException)throw (CancellationException)x;if (x instanceof CompletionException)throw (CompletionException)x;throw new CompletionException(x);}@SuppressWarnings("unchecked") T t = (T) r;return t;}
}

异步回调 

 CompletableFuture 提供了一系列方法来处理任务的完成事件,实现异步回调

thenApply

CompletableFuture<U> thenApply( Function<? super T,? extends U> fn):输入参数 T是上一阶段的任务结果类型;返回值 U 是新阶段的任务结果类型。 fn方法对上一阶段的任务结果进行转换操作,并返回一个新的 CompletableFuture 对象。

java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {try {CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(CompletableFutureTest.class::getName, executor);System.out.println(supplyAsync.//参数为string返回一个charthenApply((str) -> str.charAt(0)).//参数是char返回一个intthenApply((chr)->chr*1024).//参数是int返回一个longthenApply((n)->(long)n).get());} catch (Exception e) {e.printStackTrace();} finally {executor.shutdown();}}
}
//结果
113664

thenAccept

CompletableFuture<Void> thenAccept(Consumer<? super T> action):输入参数 T是上一阶段的任务结果类型。action方法对上一阶段的任务结果进行消费,并返回一个新的 CompletableFuture<Void> 对象。

java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {try {CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(CompletableFutureTest.class::getName, executor);System.out.println(supplyAsync.//参数为string返回一个charthenApply((str) -> str).//打印结果thenAccept((str)-> System.out.println(str)).get());} catch (Exception e) {e.printStackTrace();} finally {executor.shutdown();}}
}
//结果
org.example.study.CompletableFutureTest
null

thenRun

CompletableFuture<Void> thenRun(Runnable action)相当于开启一个新的任务

java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {try {CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(CompletableFutureTest.class::getName, executor);long start = System.currentTimeMillis();CompletableFuture<Void> thenRun = supplyAsync.//参数为string返回一个charthenApply((str) -> str).//参数是char返回一个intthenRun(() -> {try {System.out.printf("回调任务启动耗时:%d\n", System.currentTimeMillis() - start);Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}});//阻塞等待直到任务执行完毕System.out.printf("回调任务执行结果:%s\n", thenRun.get());System.out.printf("回调任务执行耗时:%d\n", System.currentTimeMillis() - start);} catch (Exception e) {e.printStackTrace();} finally {executor.shutdown();}}
}
//结果
回调任务启动耗时:2
回调任务执行结果:null
回调任务执行耗时:3032

异步组合回调

CompletableFuture 还提供了一些方法来组合多个任务的结果,实现更复杂的异步处理逻辑

thenCombine

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) other是另一个任务,函数 fn入参类型T是当前任务返回类型,U是任务other的返回类型,fn返回类型为V的新的CompletableFuture 。thenCombine()方法将两个任务的结果进行组合,对他们的结果进行处理并返回新的 CompletableFuture 对象。

java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {thenCombine();}private static void thenCombine() throws Exception {try {CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();}, executor);CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(CompletableFutureTest.class::getName, executor);long start = System.currentTimeMillis();System.out.printf("任务执行thenCombine启动:%d\n", System.currentTimeMillis() - start);CompletableFuture<String> combine = supplyAsync1.thenCombine(supplyAsync2, (str1, str2) -> str1 + str2);System.out.printf("任务执行thenCombine耗时:%d\n", System.currentTimeMillis() - start);System.out.printf("任务执行thenCombine结果:%s\n", combine.get());System.out.printf("任务执行thenCombine耗时:%d\n", System.currentTimeMillis() - start);}catch (Exception e){}finally {executor.shutdown();}}
}
//结果
任务执行thenCombine启动:0
任务执行thenCombine耗时:24
任务执行thenCombine结果:org.example.study.CompletableFutureTestorg.example.study.CompletableFutureTest
任务执行thenCombine耗时:3013

thenCompose

<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) fn入参T当前任务的执行结果,fn执行完返回一个CompletableFuture<U>对象。thenCompose方法拿到当前任务的结果之后,再对其结果进行异步操作。

java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {thenCompose();}private static void thenCompose(){try {CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();}, executor);long start = System.currentTimeMillis();System.out.printf("任务执行thenCompose启动:%d\n", System.currentTimeMillis() - start);CompletableFuture<String> compose = supplyAsync1.thenCompose((str) -> CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return str.substring(0,5);}));//阻塞等到任务执行完毕System.out.printf("任务执行thenCompose结果:%s\n", compose.get());System.out.printf("任务执行thenCompose耗时:%d\n", System.currentTimeMillis() - start);}catch (Exception e){}finally {executor.shutdown();}}
}
//结果
任务执行thenCompose启动:0
任务执行thenCompose结果:org.e
任务执行thenCompose耗时:4020


http://www.ppmy.cn/embedded/128961.html

相关文章

MATLAB图片拼接配准系统

应用背景 图像配准现在已成为数字图像处理的研究热点&#xff0c;方法繁多&#xff0c;站在时代的前沿。图像配准多采用基于图像特征点的方法&#xff0c;这种方法易于用计算机处理并且容易实现人机交互&#xff0c;其重点在于如何提取图像上的有效特征点。 对图像拼接技术的…

促进绿色可持续发展 能源环保管理重中之重

在全球经济环境发展、资源逐渐减少的背景下&#xff0c;环境保护已成为全球共识&#xff0c;而工业作为经济发展的重要支柱&#xff0c;其环保监测的实现至关重要。以下是对工业重点环保监测实现方式的详细探讨&#xff1a; 18721098782 WPP 一、构建国家级环境监测网络 …

【优选算法】——双指针(下篇)!

&#x1f308;个人主页&#xff1a;秋风起&#xff0c;再归来~ &#x1f525;系列专栏&#xff1a;C刷题算法总结 &#x1f516;克心守己&#xff0c;律己则安 目录 1、有效三角形的个数 2、查找总价值为目标值的两个商品 3、三数之和 4、四数之和 5、完结散花 1、有…

tensorflow + pygame 手写数字识别的小游戏

起因&#xff0c; 目的: 很久之前&#xff0c;一个客户的作业&#xff0c;我帮忙写的。 今天删项目&#xff0c;觉得比较简洁&#xff0c;发出来给大家看看。 效果图: 1. 训练模型的代码 import sys import tensorflow as tf# Use MNIST handwriting dataset mnist tf.kera…

uiautomatorviewer安卓9以上正常使用及问题处理

一、安卓9以上使用uiautomatorviewer问题现象 打开Unexpected error while obtaining UI hierarchy 问题详情 Unexpected error while obtaining UI hierarchy java.lang.reflect.InvocationTargetException 二、问题处理 需要的是替换对应D:\software\android-sdk-windows…

PicoQuant GmbH公司Dr. Christian Oelsner到访东隆科技

昨日&#xff0c;德国PicoQuant公司的光谱和显微应用和市场专家Dr.Christian Oelsner莅临武汉东隆科技有限公司。会议上Dr. Christian Oelsner就荧光寿命光谱和显微技术的最新研究和应用进行了深入的交流与探讨。此次访问不仅加强了两家公司在高科技领域的合作关系&#xff0c;…

HDLBits参考答案合集

关注 望森FPGA 查看更多FPGA资讯 这是望森的第 26 期分享 作者 | 望森 来源 | 望森FPGA 本节内容是HDLBits参考答案合集的索引。 恭喜HDLBits合集完结~ 敬请关注新的专栏内容“FPGA理论基础” 前言 FPGA新手必用&#xff0c;Verilog HDL编程学习网站推荐 —— HDLBits_veri…

如何在Matlab界面中添加文件选择器?

在Matlab中&#xff0c;为用户提供交互式文件选择功能是非常重要的&#xff0c;尤其是当你需要让用户从文件系统中选择文件进行进一步处理时。Matlab提供了uigetfile函数&#xff0c;允许用户通过图形界面选择文件。以下是如何在Matlab界面中添加文件选择器的详细指南&#xff…