CompletableFuture 是 Java 8 中引入的一个实现异步编程类。提供了一组丰富的方法来处理异步操作和多个任务的结果。
执行任务
可以使用CompletableFuture.supplyAsync()或者CompletableFuture.runAsync创建CompletableFuture对象,并执行任务。
supplyAsync
<U> CompletableFuture<U> CompletableFuture.supplyAsync(Supplier<U> supplier)方法用于执行具有返回值的任务,并在任务完成时返回结果。
java">public class CompletableFutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {long start=System.currentTimeMillis();CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();});System.out.printf("任务启动耗时:%d\n",System.currentTimeMillis()-start);//阻塞等待到任务执行完毕System.out.printf("任务执行结果:%s\n",supplyAsync.get());System.out.printf("任务执行耗时:%d\n",System.currentTimeMillis()-start);} } //结果 任务启动耗时:31 任务执行结果:org.example.study.CompletableFutureTest 任务执行耗时:3044
任务会在默认的 ForkJoinPool 中异步执行。CompletableFuture.supplyAsync(Supplier<U> supplier):
java">public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {private static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1);private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);}static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {if (f == null) throw new NullPointerException();CompletableFuture<U> d = new CompletableFuture<U>();e.execute(new AsyncSupply<U>(d, f));return d;} }
<U> CompletableFuture<U> CompletableFuture.supplyAsync(Supplier<U> supplier,Executor executor)在自定义线程池中异步执行:
java">public class CompletableFutureTest {private static final Executor executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {long start = System.currentTimeMillis();CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();}, executor);System.out.printf("任务启动耗时:%d\n", System.currentTimeMillis() - start);//阻塞等待直到任务执行完毕System.out.printf("任务执行结果:%s\n", supplyAsync.get());System.out.printf("任务执行耗时:%d\n", System.currentTimeMillis() - start);//关闭线程executor.shutdown();} }
runAsync
CompletableFuture<Void> CompletableFuture.runAsync(Runnable runnable) 方法用于执行没有返回值的任务。默认在 ForkJoinPool 中异步执行。
java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {runAsync();}private static void runAsync() throws Exception{long start = System.currentTimeMillis();//Runnable接口run方法CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(CompletableFutureTest.class.getName());});System.out.printf("任务启动耗时:%d\n", System.currentTimeMillis() - start);//阻塞等待直到任务执行完毕//阻塞等待直到任务执行完毕System.out.printf("任务执行结果:%s\n", runAsync.get());System.out.printf("任务执行耗时:%d\n", System.currentTimeMillis() - start);} } //结果 任务启动耗时:29 org.example.study.CompletableFutureTest 任务执行结果:null 任务执行耗时:3041
CompletableFuture<Void> CompletableFuture.runAsync(Runnable runnable,Executor executor) 方法也可以执行在自定义线程池中:
java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void runAsync() throws Exception{long start = System.currentTimeMillis();//Runnable接口run方法CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(CompletableFutureTest.class.getName());},executor);System.out.printf("任务启动耗时:%d\n", System.currentTimeMillis() - start);//阻塞等待直到任务执行完毕System.out.printf("任务执行结果:%s\n", runAsync.get());System.out.printf("任务执行耗时:%d\n", System.currentTimeMillis() - start);executor.shutdown();} } //结果 任务启动耗时:30 org.example.study.CompletableFutureTest 任务执行结果:null 任务执行耗时:3044
获取结果
get
T get() 方法是 CompletableFuture 类提供的一种获取任务结果的方式,它会阻塞当前线程,直到任务完成并返回结果。
java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {long start = System.currentTimeMillis();CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();}, executor);System.out.printf("任务启动耗时:%d\n", System.currentTimeMillis() - start);//阻塞等待直到任务执行完毕System.out.printf("任务执行结果:%s\n", supplyAsync.get());System.out.printf("任务执行耗时:%d\n", System.currentTimeMillis() - start);executor.shutdown();} } //结果 任务启动耗时:29 任务执行结果:org.example.study.CompletableFutureTest 任务执行耗时:3032
T get(long timeout, TimeUnit unit)在指定时间timeout内未完成任务,抛TimeoutException
java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {try {long start = System.currentTimeMillis();CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();}, executor);System.out.printf("任务启动耗时:%d\n", System.currentTimeMillis() - start);//超时抛异常System.out.printf("任务执行结果:%s\n", supplyAsync.get(2000,TimeUnit.MILLISECONDS));System.out.printf("任务执行耗时:%d\n", System.currentTimeMillis() - start);} catch (Exception e) {e.printStackTrace();} finally {executor.shutdown();}} } //结果 任务启动耗时:29 Exception in thread "main" java.util.concurrent.TimeoutExceptionat java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)at org.example.study.CompletableFutureTest.supplyAsync(CompletableFutureTest.java:51)at org.example.study.CompletableFutureTest.main(CompletableFutureTest.java:15)
get()发生异常直接抛出:
java">public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {public T get() throws InterruptedException, ExecutionException {Object r;return reportGet((r = result) == null ? waitingGet(true) : r);}private static <T> T reportGet(Object r)throws InterruptedException, ExecutionException {if (r == null) // by convention below, null means interruptedthrow new InterruptedException();if (r instanceof AltResult) {Throwable x, cause;if ((x = ((AltResult)r).ex) == null)return null;if (x instanceof CancellationException)throw (CancellationException)x;if ((x instanceof CompletionException) &&(cause = x.getCause()) != null)x = cause;throw new ExecutionException(x);}@SuppressWarnings("unchecked") T t = (T) r;return t;} }
join
T join() 方法是 CompletableFuture 类提供的一种获取任务结果的方式,它会阻塞当前线程,直到任务完成并返回结果。
java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {try {long start = System.currentTimeMillis();CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {try {//等待3sThread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();}, executor);System.out.printf("任务启动耗时:%d\n", System.currentTimeMillis() - start);//阻塞等待直到任务执行完毕System.out.printf("任务执行结果:%s\n", supplyAsync.join());System.out.printf("任务执行耗时:%d\n", System.currentTimeMillis() - start);} catch (Exception e) {e.printStackTrace();} finally {executor.shutdown();}} } //结果 任务启动耗时:29 任务执行结果:org.example.study.CompletableFutureTest 任务执行耗时:3031
join()发生异常时包在CompletionException中抛出:
java">public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {public T join() {Object r;return reportJoin((r = result) == null ? waitingGet(false) : r);}private static <T> T reportJoin(Object r) {if (r instanceof AltResult) {Throwable x;if ((x = ((AltResult)r).ex) == null)return null;if (x instanceof CancellationException)throw (CancellationException)x;if (x instanceof CompletionException)throw (CompletionException)x;throw new CompletionException(x);}@SuppressWarnings("unchecked") T t = (T) r;return t;} }
异步回调
CompletableFuture 提供了一系列方法来处理任务的完成事件,实现异步回调
thenApply
CompletableFuture<U> thenApply( Function<? super T,? extends U> fn):输入参数 T是上一阶段的任务结果类型;返回值 U 是新阶段的任务结果类型。 fn方法对上一阶段的任务结果进行转换操作,并返回一个新的 CompletableFuture 对象。
java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {try {CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(CompletableFutureTest.class::getName, executor);System.out.println(supplyAsync.//参数为string返回一个charthenApply((str) -> str.charAt(0)).//参数是char返回一个intthenApply((chr)->chr*1024).//参数是int返回一个longthenApply((n)->(long)n).get());} catch (Exception e) {e.printStackTrace();} finally {executor.shutdown();}} } //结果 113664
thenAccept
CompletableFuture<Void> thenAccept(Consumer<? super T> action):输入参数 T是上一阶段的任务结果类型。action方法对上一阶段的任务结果进行消费,并返回一个新的 CompletableFuture<Void> 对象。
java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {try {CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(CompletableFutureTest.class::getName, executor);System.out.println(supplyAsync.//参数为string返回一个charthenApply((str) -> str).//打印结果thenAccept((str)-> System.out.println(str)).get());} catch (Exception e) {e.printStackTrace();} finally {executor.shutdown();}} } //结果 org.example.study.CompletableFutureTest null
thenRun
CompletableFuture<Void> thenRun(Runnable action)相当于开启一个新的任务
java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {supplyAsync();}private static void supplyAsync() throws Exception {try {CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(CompletableFutureTest.class::getName, executor);long start = System.currentTimeMillis();CompletableFuture<Void> thenRun = supplyAsync.//参数为string返回一个charthenApply((str) -> str).//参数是char返回一个intthenRun(() -> {try {System.out.printf("回调任务启动耗时:%d\n", System.currentTimeMillis() - start);Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}});//阻塞等待直到任务执行完毕System.out.printf("回调任务执行结果:%s\n", thenRun.get());System.out.printf("回调任务执行耗时:%d\n", System.currentTimeMillis() - start);} catch (Exception e) {e.printStackTrace();} finally {executor.shutdown();}} } //结果 回调任务启动耗时:2 回调任务执行结果:null 回调任务执行耗时:3032
异步组合回调
CompletableFuture 还提供了一些方法来组合多个任务的结果,实现更复杂的异步处理逻辑
thenCombine
<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) other是另一个任务,函数 fn入参类型T是当前任务返回类型,U是任务other的返回类型,fn返回类型为V的新的CompletableFuture 。thenCombine()方法将两个任务的结果进行组合,对他们的结果进行处理并返回新的 CompletableFuture 对象。
java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {thenCombine();}private static void thenCombine() throws Exception {try {CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();}, executor);CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(CompletableFutureTest.class::getName, executor);long start = System.currentTimeMillis();System.out.printf("任务执行thenCombine启动:%d\n", System.currentTimeMillis() - start);CompletableFuture<String> combine = supplyAsync1.thenCombine(supplyAsync2, (str1, str2) -> str1 + str2);System.out.printf("任务执行thenCombine耗时:%d\n", System.currentTimeMillis() - start);System.out.printf("任务执行thenCombine结果:%s\n", combine.get());System.out.printf("任务执行thenCombine耗时:%d\n", System.currentTimeMillis() - start);}catch (Exception e){}finally {executor.shutdown();}} } //结果 任务执行thenCombine启动:0 任务执行thenCombine耗时:24 任务执行thenCombine结果:org.example.study.CompletableFutureTestorg.example.study.CompletableFutureTest 任务执行thenCombine耗时:3013
thenCompose
<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) fn入参T当前任务的执行结果,fn执行完返回一个CompletableFuture<U>对象。thenCompose方法拿到当前任务的结果之后,再对其结果进行异步操作。
java">public class CompletableFutureTest {private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 6000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {thenCompose();}private static void thenCompose(){try {CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return CompletableFutureTest.class.getName();}, executor);long start = System.currentTimeMillis();System.out.printf("任务执行thenCompose启动:%d\n", System.currentTimeMillis() - start);CompletableFuture<String> compose = supplyAsync1.thenCompose((str) -> CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return str.substring(0,5);}));//阻塞等到任务执行完毕System.out.printf("任务执行thenCompose结果:%s\n", compose.get());System.out.printf("任务执行thenCompose耗时:%d\n", System.currentTimeMillis() - start);}catch (Exception e){}finally {executor.shutdown();}} } //结果 任务执行thenCompose启动:0 任务执行thenCompose结果:org.e 任务执行thenCompose耗时:4020