简介
CompletableFuture是Java 8中引入的一个类,它实现了CompletionStage接口,是Future接口的一个增强版本。它提供了一种灵活、可组合的方式来实现异步计算,同时也提供了异常处理、取消、超时等特性。以下是对CompletableFuture的详细介绍:
一、主要特性
异步执行:
- CompletableFuture允许任务在后台线程中异步执行,不会阻塞主线程,提高了应用程序的响应性和性能。
- 可以使用CompletableFuture.runAsync()或CompletableFuture.supplyAsync()方法来启动异步任务。其中,runAsync()方法用于执行没有返回值的任务,而supplyAsync()方法用于执行有返回值的任务。
链式操作:
- CompletableFuture支持链式操作,可以使用一系列的方法来组合和转换异步任务的结果。
- 这些方法包括thenApply()、thenAccept()、thenCompose()、thenCombine()等,它们允许以流畅的方式处理异步任务的结果。
异常处理:
- CompletableFuture提供了丰富的异常处理方法,可以处理任务执行过程中可能发生的异常,并实现灵活的错误处理和回退机制。
- 这些方法包括exceptionally()、handle()和whenComplete()等。
多任务组合:
- CompletableFuture支持多个任务的并发执行和结果组合。
- 可以使用allOf()、anyOf()、thenCombine()等方法来组合多个异步任务的结果。其中,allOf()方法等待所有任务完成后才执行后续操作,anyOf()方法只要有一个任务完成就执行后续操作。
超时和取消:
- CompletableFuture提供了超时和取消异步任务的机制。
- 可以使用completeOnTimeout()方法设置任务在超时后自动完成,或者使用cancel()方法取消任务的执行。
并发控制:
- CompletableFuture支持对异步任务进行并发控制。
- 例如,可以使用thenApplyAsync()方法指定任务在特定的Executor上执行。
二、基本用法
创建CompletableFuture对象:
- 可以使用CompletableFuture.runAsync()或CompletableFuture.supplyAsync()方法来创建一个新的CompletableFuture对象。
- 还可以使用new CompletableFuture()构造函数来创建一个新的CompletableFuture对象,但此时CompletableFuture还没有任何结果,需要手动调用complete()方法来设置其结果。
获取CompletableFuture任务的结果:
- 可以使用join()方法或get()方法来获取CompletableFuture任务的结果。
- join()方法会阻塞当前线程,直到任务完成并返回结果,且不会抛出InterruptedException和ExecutionException异常,而是将异常包装在CompletionException中抛出。
- get()方法也会阻塞当前线程,直到任务完成并返回结果,但会抛出InterruptedException和ExecutionException异常,需要进行异常处理。
处理任务的完成事件:
- CompletableFuture提供了一系列方法来处理任务的完成事件,实现异步回调。
- 这些方法包括thenApply()、thenAccept()、thenRun()等。
三、示例代码
以下是一些使用CompletableFuture的示例代码:
java">// 异步执行一个任务并返回结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "Hello CompletableFuture";
}); // 阻塞等待任务执行完成并获取结果
String result = future.get();
System.out.println(result); // 异步执行多个任务并合并结果
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "CompletableFuture");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Java"); CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
combinedFuture.get(); String combinedResult = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); System.out.println(combinedResult); // 异步执行一个任务并处理异常
CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Something went wrong");
}).exceptionally(e -> { return "default value";
}); String exceptionResult = exceptionFuture.get();
System.out.println(exceptionResult);
综上所述,CompletableFuture是Java 8中引入的一个强大的异步编程工具类,它提供了丰富的功能和灵活的用法,可以方便地处理异步操作和多个任务的结果。通过合理使用CompletableFuture,可以编写出高效、可维护的异步代码。
实战流程
一、创建异步任务
1.supplyAsync
supplyAsync是创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法
// 带返回值异步请求,默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)// 带返回值的异步请求,可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
测试代码:
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {System.out.println("do something....");return "result";});//等待任务执行完成System.out.println("结果->" + cf.get());
}public static void main(String[] args) throws ExecutionException, InterruptedException {// 自定义线程池ExecutorService executorService = Executors.newSingleThreadExecutor();CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {System.out.println("do something....");return "result";}, executorService);//等待子任务执行完成System.out.println("结果->" + cf.get());
}
2.runAsync
runAsync是创建没有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法
// 不带返回值的异步请求,默认线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)// 不带返回值的异步请求,可以自定义线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
测试代码:
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {System.out.println("do something....");});//等待任务执行完成System.out.println("结果->" + cf.get());
}public static void main(String[] args) throws ExecutionException, InterruptedException {// 自定义线程池ExecutorService executorService = Executors.newSingleThreadExecutor();CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {System.out.println("do something....");}, executorService);//等待任务执行完成System.out.println("结果->" + cf.get());
}
3.获取任务结果的方法
// 如果完成则返回结果,否则就抛出具体的异常
public T get() throws InterruptedException, ExecutionException // 最大时间等待返回结果,否则就抛出具体异常
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException// 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
public T join()// 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
public T getNow(T valueIfAbsent)// 如果任务没有完成,返回的值设置为给定值
public boolean complete(T value)// 如果任务没有完成,就抛出给定异常
public boolean completeExceptionally(Throwable ex)
二、异步回调处理
1.thenApply和thenApplyAsync
thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值。
测试代码:
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Integer> cf2 = cf1.thenApplyAsync((result) -> {System.out.println(Thread.currentThread() + " cf2 do something....");result += 2;return result;});//等待任务1执行完成System.out.println("cf1结果->" + cf1.get());//等待任务2执行完成System.out.println("cf2结果->" + cf2.get());
}public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Integer> cf2 = cf1.thenApply((result) -> {System.out.println(Thread.currentThread() + " cf2 do something....");result += 2;return result;});//等待任务1执行完成System.out.println("cf1结果->" + cf1.get());//等待任务2执行完成System.out.println("cf2结果->" + cf2.get());
}
thenApply和thenApplyAsync区别在于,使用thenApply方法时子任务与父任务使用的是同一个线程,而thenApplyAsync在子任务中是另起一个线程执行任务,并且thenApplyAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。
2.thenAccept和thenAcceptAsync
thenAccep表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值。
测试代码
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Void> cf2 = cf1.thenAccept((result) -> {System.out.println(Thread.currentThread() + " cf2 do something....");});//等待任务1执行完成System.out.println("cf1结果->" + cf1.get());//等待任务2执行完成System.out.println("cf2结果->" + cf2.get());
}public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Void> cf2 = cf1.thenAcceptAsync((result) -> {System.out.println(Thread.currentThread() + " cf2 do something....");});//等待任务1执行完成System.out.println("cf1结果->" + cf1.get());//等待任务2执行完成System.out.println("cf2结果->" + cf2.get());
}
thenAccep和thenAccepAsync区别在于,使用thenAccep方法时子任务与父任务使用的是同一个线程,而thenAccepAsync在子任务中可能是另起一个线程执行任务,并且thenAccepAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。
3.thenRun和thenRunAsync
thenRun表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值。
测试代码:
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Void> cf2 = cf1.thenRun(() -> {System.out.println(Thread.currentThread() + " cf2 do something....");});//等待任务1执行完成System.out.println("cf1结果->" + cf1.get());//等待任务2执行完成System.out.println("cf2结果->" + cf2.get());
}public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {System.out.println(Thread.currentThread() + " cf2 do something....");});//等待任务1执行完成System.out.println("cf1结果->" + cf1.get());//等待任务2执行完成System.out.println("cf2结果->" + cf2.get());
}
thenRun和thenRunAsync区别在于,使用thenRun方法时子任务与父任务使用的是同一个线程,而thenRunAsync在子任务中可能是另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。
4.whenComplete和whenCompleteAsync
whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。
测试代码:
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");int a = 1/0;return 1;});CompletableFuture<Integer> cf2 = cf1.whenComplete((result, e) -> {System.out.println("上个任务结果:" + result);System.out.println("上个任务抛出异常:" + e);System.out.println(Thread.currentThread() + " cf2 do something....");});// //等待任务1执行完成
// System.out.println("cf1结果->" + cf1.get());
// //等待任务2执行完成System.out.println("cf2结果->" + cf2.get());}
whenCompleteAsync和whenComplete区别也是whenCompleteAsync可能会另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。
5.handle和handleAsync
跟whenComplete基本一致,区别在于handle的回调方法有返回值。
测试代码:
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");// int a = 1/0;return 1;});CompletableFuture<Integer> cf2 = cf1.handle((result, e) -> {System.out.println(Thread.currentThread() + " cf2 do something....");System.out.println("上个任务结果:" + result);System.out.println("上个任务抛出异常:" + e);return result+2;});//等待任务2执行完成System.out.println("cf2结果->" + cf2.get());
}
三、多任务组合处理
1.thenCombine、thenAcceptBoth 和runAfterBoth
这三个方法都是将两个CompletableFuture组合起来处理,只有两个任务都正常完成时,才进行下阶段任务。
区别:thenCombine会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
测试代码:
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf2 do something....");return 2;});CompletableFuture<Integer> cf3 = cf1.thenCombine(cf2, (a, b) -> {System.out.println(Thread.currentThread() + " cf3 do something....");return a + b;});System.out.println("cf3结果->" + cf3.get());
}public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf2 do something....");return 2;});CompletableFuture<Void> cf3 = cf1.thenAcceptBoth(cf2, (a, b) -> {System.out.println(Thread.currentThread() + " cf3 do something....");System.out.println(a + b);});System.out.println("cf3结果->" + cf3.get());
}public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf2 do something....");return 2;});CompletableFuture<Void> cf3 = cf1.runAfterBoth(cf2, () -> {System.out.println(Thread.currentThread() + " cf3 do something....");});System.out.println("cf3结果->" + cf3.get());
}
2.applyToEither、acceptEither和runAfterEither
这三个方法和上面一样也是将两个CompletableFuture组合起来处理,当有一个任务正常完成时,就会进行下阶段任务。
区别:applyToEither会将已经完成任务的执行结果作为所提供函数的参数,且该方法有返回值;acceptEither同样将已经完成任务的执行结果作为方法入参,但是无返回值;runAfterEither没有入参,也没有返回值。
测试代码:
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf1 do something....");Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "cf1 任务完成";});CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf2 do something....");Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "cf2 任务完成";});CompletableFuture<String> cf3 = cf1.applyToEither(cf2, (result) -> {System.out.println("接收到" + result);System.out.println(Thread.currentThread() + " cf3 do something....");return "cf3 任务完成";});System.out.println("cf3结果->" + cf3.get());
}public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf1 do something....");Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "cf1 任务完成";});CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf2 do something....");Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "cf2 任务完成";});CompletableFuture<Void> cf3 = cf1.acceptEither(cf2, (result) -> {System.out.println("接收到" + result);System.out.println(Thread.currentThread() + " cf3 do something....");});System.out.println("cf3结果->" + cf3.get());
}public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf1 do something....");Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("cf1 任务完成");return "cf1 任务完成";});CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf2 do something....");Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("cf2 任务完成");return "cf2 任务完成";});CompletableFuture<Void> cf3 = cf1.runAfterEither(cf2, () -> {System.out.println(Thread.currentThread() + " cf3 do something....");System.out.println("cf3 任务完成");});System.out.println("cf3结果->" + cf3.get());
}
2.allOf / anyOf
allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。
测试代码:
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf1 do something....");Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("cf1 任务完成");return "cf1 任务完成";});CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf2 do something....");int a = 1/0;Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("cf2 任务完成");return "cf2 任务完成";});CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf2 do something....");Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("cf3 任务完成");return "cf3 任务完成";});CompletableFuture<Void> cfAll = CompletableFuture.allOf(cf1, cf2, cf3);System.out.println("cfAll结果->" + cfAll.get());
}public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf1 do something....");Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("cf1 任务完成");return "cf1 任务完成";});CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf2 do something....");Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("cf2 任务完成");return "cf2 任务完成";});CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf2 do something....");Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("cf3 任务完成");return "cf3 任务完成";});CompletableFuture<Object> cfAll = CompletableFuture.anyOf(cf1, cf2, cf3);System.out.println("cfAll结果->" + cfAll.get());
}