【Flink源码分析】5. Flink1.19源码分析-异步编程(CompletableFuture)

embedded/2025/2/11 23:25:05/

5 CompletableFuture

  1. 实现异步编排;
  2. 获取异步任务执行的结果。

CompletableFuture提供了几十种方法,辅助我们的异步任务场景。这些方法包括创建异步任务、异步任务回调、多个任务组合处理等方面。

5.1 supplyAsync 方法

supplyAsync 执行 CompletableFuture 任务,有返回值。

//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);
}
//自定义线程,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);
}

5.1.1 supplyAsync方法示例

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SupplyAsyncDemo {public static void main(String[] args) throws Exception{/*** 创建一个线程池,该线程池使用提供的固定数量的线程来执行任务,最多5个线程。* 如果所有线程都在工作,那么新提交的任务将在队列中等待,直到有线程变得可用。*/ExecutorService executorService = Executors.newFixedThreadPool(5);/**supplyAsync 异步执行任务,任务有返回值*/CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() ->{System.out.println(Thread.currentThread().getName());return "你好";},executorService);/**supplyAsync 异步执行任务,任务有返回值*/CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() ->{System.out.println(Thread.currentThread().getName());return "你好1";});System.out.println(task1.get());System.out.println(task2.get());}}

5.1.2 执行结果

在这里插入图片描述

5.2 runAsync方法

runAsync 执行 CompletableFuture 任务,没有返回值。

    //使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}//自定义线程,根据supplier构建执行任务public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);}

5.2.1 runAsync方法示例

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class RunAsyncDemo {public static void main(String[] args) throws Exception{/*** 创建一个线程池,该线程池使用提供的固定数量的线程来执行任务,最多5个线程。* 如果所有线程都在工作,那么新提交的任务将在队列中等待,直到有线程变得可用。*/ExecutorService executorService = Executors.newFixedThreadPool(5);/**runAsync 异步执行任务,任务没有返回值*/CompletableFuture<Void> task1 = CompletableFuture.runAsync(() ->{System.out.println(Thread.currentThread().getName());},executorService);CompletableFuture<Void> task2 = CompletableFuture.runAsync(() ->{System.out.println(Thread.currentThread().getName());});System.out.println(task1.get());System.out.println(task2.get());}}

5.2.2 执行结果

在这里插入图片描述

5.3 then* 方法说明

  • 调用then*方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池;
  • 调用then*Async执行第二个任务时,则第一个任务使用的是自己传入的线程池,第二个任务使用的是ForkJoin线程池;
  • ThenAcceptThenAcceptAsyncthenApplythenApplyAsyncthenRunthenRunAsync区别都一样;

thenApplythenApplyAsync为例:

  • 前一个任务执行完,thenApply执行的时候,则main函数线程来执行thenApply
  • 前一个任务没有执行完,则执行前一个任务的线程来执行thenApply这个任务;
  • 这样才能保证顺序。

5.3.1 then 方法示例

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;public class ThenDemoOrder {public static void main(String[] args) throws Exception{/*** 创建一个线程池,该线程池使用提供的固定数量的线程来执行任务,最多5个线程。* 如果所有线程都在工作,那么新提交的任务将在队列中等待,直到有线程变得可用。*/ExecutorService executorService = Executors.newFixedThreadPool(5);Supplier<String> task1 = () ->{System.out.println("1."+Thread.currentThread().getName());return "task1";};Function<String,String> task2 = s -> {System.out.println("2."+Thread.currentThread().getName());return "task2";};/** 创建异步任务*/CompletableFuture<String> future = CompletableFuture.supplyAsync(task1,executorService);//Thread.sleep(1000);/** then* 调用会使用前面任务的线程池 */future.thenApply(task2);//Thread.sleep(1000);System.out.println("haha");}
}

5.3.2 执行结果

在这里插入图片描述

5.4 thenApply

  public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);}

第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

有入参,有返回值

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class FutureThenApplyDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(()->{System.out.println("原始CompletableFuture方法任务");return "1.supplyAsync";});CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> {return a+ ",2.thenApply";});System.out.println(thenApplyFuture.get());}
}

执行结果:

在这里插入图片描述

5.5 thenAccept

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {return uniAcceptStage(null, action);}

表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

有入参,没有返回值

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class FutureThenAcceptDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(()->{return "1.supplyAsync";});CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> {System.out.println(a);});System.out.println(thenAcceptFuture.get());}
}

执行结果:

在这里插入图片描述

5.6 thenRun

public CompletableFuture<Void> thenRun(Runnable action) {return uniRunStage(null, action);}

执行完第一个任务后,执行第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值。

无入参,无返回值

package com.demo.annn;
import java.util.concurrent.CompletableFuture;
public class FutureThenRunDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(()->{System.out.println("1.supplyAsync");return "1.supplyAsync";});CompletableFuture thenRunFuture = orgFuture.thenRun(() -> {System.out.println("2.thenRun");});System.out.println(thenRunFuture.get());}
}

执行结果:

在这里插入图片描述

5.7 thenCompose

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(null, fn);
}

thenCompose用于链接两个异步操作,其中第二个操作依赖于第一个操作的结果。thenCompose方法接受一个函数作为参数,该函数接受第一个CompletableFuture的结果,并返回一个新的CompletableFuture
thenCompose的主要优势在于,它允许你以流程的方式组合多个异步操作,并将他们链接在一起,从而创建一个异步操作链。这对于构建复杂的异步逻辑非常有用。
用途:组合多个异步操作,创建一个异步操作链。返回新的CompletableFuture可以继续调用下去。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class ThenComposeDemo {public static void main(String[] args) throws Exception {/** 创建一个异步任务 */CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 模拟耗时操作} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "First operation result";});/** 接收future1中异步任务的运行结果 ,内部再次创建异步任务返回新的CompletableFuture*/CompletableFuture<String> future2 = future1.thenCompose(result1 -> {// 这个lambda表达式将使用future1的结果,并返回一个新的CompletableFuturereturn CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 第二个异步操作,也模拟耗时} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Second operation result based on " + result1;});});// 获取最终的异步操作结果String finalResult = future2.join();System.out.println(finalResult); // 输出: Second operation result based on First operation result}
}

执行结果:

在这里插入图片描述

注意:thenComposethenApplythenApplyAsync方法有些相似,但它们之间存在关键差异。thenApplythenApplyAsync接收一个函数,该函数应用于前一个CompletableFuture的结果,并立即返回一个新值(而不是另一个CompletableFuture)。而thenCompose则允许你链接一个返回CompletableFuture的函数,从而可以构建更复杂的异步操作链。

thenCompose任务允许传进去一个CompletableFuture类型的函数。

5.8 thenCombine

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(null, other, fn);
}
  • 它用于组合两个CompletableFuture对象的结果,并应用一个函数来生成一个新的结果。当两个CompletableFuture对象都完成时,thenCombine方法将这两个结果作为参数传递给提供的函数,并返回一个新的CompletableFuture对象,该对象包含函数的结果。
  • thenCombine的主要用途是当你有两个异步操作,并且想要基于这两个操作的结果来执行某些操作时。他允许你以一种声明性的方式来表达这种依赖关系,并且以异步的方式处理结果。

总结:组合两个任务结果,两个任务结果作为参数传递给Combine函数,并返回一个新的CompletableFuture,用途基于两个任务操作结果执行某些操作的时候用。

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class ThenCombineDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 20;});CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {// 这里的lambda表达式将接收两个异步操作的结果,并返回一个新的结果return "The sum is: " + (result1 + result2);});// 获取最终的组合结果String finalResult = combinedFuture.join();System.out.println(finalResult); // 输出: The sum is: 30}
}

在这里插入图片描述

注意:

  • thenCombinethenCompose方法的主要区别在于他们处理结果的方式。
  • thenCombine直接应用一个函数来组合两个结果,并返回一个新的CompletableFuture
  • thenCompose则允许你链接一个返回CompletableFuture的函数,以构建更复杂的异步操作链。

5.9 thenAcceptBoth

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) {return biAcceptStage(null, other, action);
}

它用于当两个CompletableFuture对象都完成时,执行一个接受这两个结果的操作,但不返回任何结果(即返回void)。thenAcceptBoth允许你执行一个其他操作,例如记录日志、更新UI等,而不关心操作的返回值。

总结:两个CompletableFuture对应的任务都完成时,执行一个接收两个结果的操作,但是没有返回值。

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class AcceptBothDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 20;});CompletableFuture<Void> combinedFuture = future1.thenAcceptBoth(future2, (result1, result2) -> {// 当两个future都完成时,执行这里的操作System.out.println("Future 1 result: " + result1);System.out.println("Future 2 result: " + result2);System.out.println("add result: " + result2 + result1);});// 获取最终的组合结果// 主线程等待,防止程序立即退出Thread.sleep(2000);}
}

在这里插入图片描述

5.10 runAfterBoth

    public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action) {return biRunStage(null, other, action);}

它用于两个CompletableFuture对象都完成(无论是正常完成还是异常完成)之后执行某个动作。与thenAcceptBoth不同,runAfterBoth不接受任何结果作为参数,并且也不返回任何值(返回void)。它纯粹用于执行一个需要在两个异步操作完成后执行其他操作。

总结:两个CompletableFuture对应的任务都完成时,执行一个一系列的操作,但不接受参数值,也没有返回值。

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class RunAfterBothDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 20;});future1.runAfterBoth(future2, () -> {// 当两个future都完成时,执行这里的操作System.out.println("Both futures have completed.");});// 主线程等待,防止程序立即退出Thread.sleep(2000);}
}

在这里插入图片描述

5.11 runAfterEither

    public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action) {return orRunStage(null, other, action);}

它用于两个CompletableFuture对象中的任意一个完成时执行某个动作。这个方法接受两个参数:另一个CompletableFuture对象和一个Runnable 。当这两个CompletableFuture对象中的任意一个完成时(无论正常完成还是异常完成),Runnable 中的代码将会被执行。

两个异步操作,任何一个完成了就会走runAfterEither定义的逻辑,没有参数,也没有返回值。

runAfterEither与runAfterBoth的主要区别在于,它不需要两个CompletableFuture都完成;只需要其中一个完成就会执行Runnable 。

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class RunAfterEitherDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);System.out.println("1000");} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);System.out.println("500");} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 20;});future1.runAfterEither(future2, () -> {// 当两个future中的任意一个完成时,执行这里的操作System.out.println("One of the futures has completed.");});// 主线程等待,防止程序立即退出Thread.sleep(1500);}
}

在这里插入图片描述

5.12 applyToEither

    public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {return orApplyStage(null, other, fn);}

runAfterEither类似,CompletableFuture类也提供了一个applyToEither方法。applyToEither方法也是用于在两个CompletableFuture对象中的任意一个完成时执行一个操作,但与runAfterEither不同的是,applyToEither允许你提供一函数(Function),该函数将应用于已完成CompletableFuture的结果(如果有的话),并返回一个新的CompletableFuture

两个异步操作,任何一个完成了就会走applyToEither定义的逻辑,有返回值。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.function.Function;public class ApplyToEitherDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 2";});// 当任意一个future完成时,应用一个函数并返回一个新的CompletableFutureCompletableFuture<String> resultFuture = future1.applyToEither(future2, Function.identity());// 处理结果resultFuture.thenAccept(result -> {System.out.println("Result: " + result);// 在这里进行后续操作});// 主线程等待,防止程序立即退出resultFuture.join();}
}

在这里插入图片描述

5.12 acceptEither

    public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {return orAcceptStage(null, other, action);}

CompletableFuture类中的acceptEither方法与applyToEither方法类似,但是他们的使用场景和目的有所不同。acceptEither用于在任意一个CompletableFuture完成时执行一个动作,但这个操作是一个Consumer函数,它接受已完成CompletableFuture的结果作为参数,但不返回任何值或新的CompletableFuture

总结: 类似,区别acceptEither接收ConsumerapplyToEither接收 Function

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class AcceptEitherDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 2";});// 当任意一个future完成时,使用Consumer函数处理结果CompletableFuture<Void> resultFuture = future1.acceptEither(future2, (result) -> {System.out.println("Result: " + result);// 在这里进行基于结果的操作,但不返回新的CompletableFuture});// 主线程等待,防止程序立即退出CompletableFuture.anyOf(future1, future2).join();}
}

在这里插入图片描述

5.13 get()、get(long timeout, TimeUnit unit)、getNow

public T get() throws InterruptedException, ExecutionException {Object r;return reportGet((r = result) == null ? waitingGet(true) : r);
}public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {Object r;long nanos = unit.toNanos(timeout);return reportGet((r = result) == null ? timedGet(nanos) : r);
}public T getNow(T valueIfAbsent) {Object r;return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}
  • get 阻塞等待结果
  • get(long timeout, TimeUnit unit)超时等待结果
  • getNow立刻获取结果,如果任务没有执行完则返回 valueIfAbsent
package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class GetDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {//模拟一个耗时操作try {TimeUnit.SECONDS.sleep(2);}catch (InterruptedException e){Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "5";});//等待异步计算完成,并获取结果String result = future.get();System.out.println("异步计算的结果是:"+result);String result2 = future.get(3,TimeUnit.SECONDS);System.out.println("异步计算的结果2是:"+result2);Thread.sleep(4000);String result3 = future.getNow("valueIfAbsent");System.out.println("异步计算的结果3是:"+result3);}}

在这里插入图片描述

5.14 exceptionally

    public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {return uniExceptionallyStage(fn);}

用于处理异步任务执行过程中出现的异常。当CompletableFuture计算的异步任务抛出异常时,你可以使用exceptionally方法来提供一个函数,该函数会接收异常作为参数,并返回一个新的结果或抛出一个新的异常。
exceptionally方法非常有用,因为它允许你在异步任务失败时执行一些特定的逻辑,而不是让异常直接传播到调用者。这样,你可以控制异常的处理方式,比如记录日志、返回默认值或抛出更具体的异常。

总结:抛出异常的时候exceptionally定义一个函数接收异常,好处是异步任务失败的时候可以做一些特定逻辑,而不是直接返回给调用者。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class ExceptionallyDemo {public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("异步任务失败");});CompletableFuture<String> exceptionalFuture = future.exceptionally(throwable -> {// 处理异常,返回一个新的结果或抛出新的异常System.err.println("捕获到异常: " + throwable.getMessage());return "回退结果";});try {String result = exceptionalFuture.get();System.out.println("最终结果是: " + result);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}
}

在这里插入图片描述

5.15 completeExceptionally

    public boolean completeExceptionally(Throwable ex) {if (ex == null) throw new NullPointerException();boolean triggered = internalComplete(new AltResult(ex));postComplete();return triggered;}

CompletableFuturecompleteExceptionally方法用于在异步计算完成之前,以异常的方式标记这个CompletableFuture为已完成状态。这意味着任何等待这个CompletableFuture完成的操作(例如通过get()方法)将会立即抛出这个异常。

总结:程序执行结束的时候,通过completeExceptionally来标记任务为已完成的状态并且抛出异常。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CmpleteExceptionallyDemo {public static void main(String[] args) {CompletableFuture<String> future = new CompletableFuture<>();// 模拟异步操作失败,使用 completeExceptionally 来标记 future 为完成状态并附带异常future.completeExceptionally(new RuntimeException("异步操作失败"));try {// 尝试获取 future 的结果,将会抛出 ExecutionException,因为 future 是以异常完成的String result = future.get();System.out.println(result);} catch (InterruptedException | ExecutionException e) {// 打印出原始异常信息e.printStackTrace();}}
}

在这里插入图片描述

5.16 whenComplete

    public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(null, action);}

用于在异步操作完成后执行一个回调函数。无论异步操作时正常完成还是抛出异常,whenComplete都会执行其提供的回调函数。这个方法非常有用,它允许你在异步任务结束时进行清理工作、记录日志、处理结果或异常,而不会阻塞主线程。

总结:无论任务是否抛出异常都会调用whenComplete回调函数,Flink内部经常用来关闭资源等操作。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class WhenCompleteDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result of the async operation";});future.whenComplete((result, throwable) -> {if (throwable != null) {// 处理异步操作抛出的异常System.err.println("Async operation failed: " + throwable.getMessage());} else {// 处理异步操作的结果System.out.println("Async operation completed successfully with result: " + result);}});// 主线程等待异步任务完成future.get();}
}

在这里插入图片描述

5.17 allOf

    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {return andTree(cfs, 0, cfs.length - 1);}

所有任务都执行后才执行下一个任务。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class AllOfFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {try {Thread.sleep(1000); // 模拟耗时操作} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}System.out.println("Future 1 completed");});CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {try {Thread.sleep(2000); // 模拟另一个耗时操作} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}System.out.println("Future 2 completed");});CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);// 阻塞直到所有future都完成allFutures.get();System.out.println("All futures completed");}
}

在这里插入图片描述

5.18 anyOf

    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {return orTree(cfs, 0, cfs.length - 1);}

任意一个任务执行后就可以执行下一个任务了。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class AnyOfFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000); // 模拟耗时操作System.out.println(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000); // 模拟另一个耗时操作System.out.println(2000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 2";});CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1.toCompletableFuture(), future2.toCompletableFuture());// 阻塞直到任意一个future完成Object result = anyFuture.get();System.out.println("A future completed with result: " + result);}
}

在这里插入图片描述


http://www.ppmy.cn/embedded/161452.html

相关文章

< 自用文儿 > Linux / Unix 的 VI 编辑器 快捷命令集 看到安装包叫 vim

vi 编辑器 在我学习 Unix/Linux时&#xff0c;编辑器有&#xff1a; sed, awk, 还有这个 vi。 前两命令要对 “正则表达式” 熟悉&#xff0c;配合着使用&#xff0c;效率攻倍。 但有大部分时间直接编辑文件会更加方便&#xff0c;我推荐使用 vi&#xff0c;所有操作都有快捷键…

Axure PR 9 中继器 01 创建数据表

大家好&#xff0c;我是大明同学。 中继器在Axure中一直是个比较重要也比较难的元件&#xff0c;我大概会分几期来学习。 这期内容&#xff0c;我们来了解一下怎么用Axure中继器创建图表。 预览地址&#xff1a;https://qsyz49.axshare.com 创建数据表 1.打开一个RP 文件 2…

高性能 :OpenAI Triton Open-source GPU programming Language LINUX 环境配置

目录 配置triton环境cudabuild-essential带有pip的python环境直接安装pipanaconda 安装 triton 环境pip install tritonpip install torch 运行test示例vector-add.pylaunch.json 配置triton环境 cuda wget http://developer.download.nvidia.com/compute/cuda/11.0.2/local_…

Sentinel——Spring Boot 应用接入 Sentinel 后内存开销增长计算方式

接入 Sentinel 对 Spring Boot 应用的内存消耗影响主要取决于 规则数量、资源数量、监控粒度、并发量 等因素。 1. 核心内存消耗来源 (1) Sentinel 核心库 默认依赖&#xff1a;Sentinel Core 本身占用较小&#xff0c;通常在 10~50MB&#xff08;取决于资源数量和规则复杂度…

Visual Studio踩过的坑

统计Unity项目代码行数 编辑-查找和替换-在文件中查找 查找内容输入 b*[^:b#/].*$ 勾选“使用正则表达式” 文件类型留空 也有网友做了指定&#xff0c;供参考 !*\bin\*;!*\obj\*;!*\.*\*!*.meta;!*.prefab;!*.unity 打开Unity的项目 注意&#xff1a;只是看&#xff0…

如何保证系统上线不出现bug?

如何保证系统上线不出现bug&#xff1f;这个问题看起来挺常见的&#xff0c;但实际解决起来可能比较复杂。首先&#xff0c;我需要考虑用户的具体背景。可能是一个项目经理或者开发团队的成员&#xff0c;他们可能刚经历过一次上线失败&#xff0c;导致出现了很多bug&#xff0…

python实现比对两个json串的方法

记录瞬间 前段时间为了解决一些实际问题&#xff0c;引出了要对json字符串进行比对的需求。 觉得有意义&#xff0c;作以简单记录。 # 比对数据 def compare_data(set_key, src_data, dst_data, noise_data, num):if isinstance(src_data, dict) and isinstance(dst_data, d…

解释和对比“application/octet-stream“与“application/x-protobuf“

介绍 在现代 Web 和分布式系统的开发中&#xff0c;数据的传输和交换格式扮演着关键角色。为了确保数据在不同系统之间的传输过程中保持一致性&#xff0c;MIME 类型&#xff08;Multipurpose Internet Mail Extensions&#xff09;被广泛应用于描述数据的格式和内容类型。在 …