【Flink源码分析】5. Flink1.19源码分析-异步编程(CompletableFuture)

news/2025/2/12 2:28:34/

5 CompletableFuture

  1. 实现异步编排;
  2. 获取异步任务执行的结果。

CompletableFuture提供了几十种方法,辅助我们的异步任务场景。这些方法包括创建异步任务、异步任务回调、多个任务组合处理等方面。

5.1 supplyAsync 方法

supplyAsync 执行 CompletableFuture 任务,有返回值。

//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);
}
//自定义线程,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);
}

5.1.1 supplyAsync方法示例

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SupplyAsyncDemo {public static void main(String[] args) throws Exception{/*** 创建一个线程池,该线程池使用提供的固定数量的线程来执行任务,最多5个线程。* 如果所有线程都在工作,那么新提交的任务将在队列中等待,直到有线程变得可用。*/ExecutorService executorService = Executors.newFixedThreadPool(5);/**supplyAsync 异步执行任务,任务有返回值*/CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() ->{System.out.println(Thread.currentThread().getName());return "你好";},executorService);/**supplyAsync 异步执行任务,任务有返回值*/CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() ->{System.out.println(Thread.currentThread().getName());return "你好1";});System.out.println(task1.get());System.out.println(task2.get());}}

5.1.2 执行结果

在这里插入图片描述

5.2 runAsync方法

runAsync 执行 CompletableFuture 任务,没有返回值。

    //使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}//自定义线程,根据supplier构建执行任务public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);}

5.2.1 runAsync方法示例

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class RunAsyncDemo {public static void main(String[] args) throws Exception{/*** 创建一个线程池,该线程池使用提供的固定数量的线程来执行任务,最多5个线程。* 如果所有线程都在工作,那么新提交的任务将在队列中等待,直到有线程变得可用。*/ExecutorService executorService = Executors.newFixedThreadPool(5);/**runAsync 异步执行任务,任务没有返回值*/CompletableFuture<Void> task1 = CompletableFuture.runAsync(() ->{System.out.println(Thread.currentThread().getName());},executorService);CompletableFuture<Void> task2 = CompletableFuture.runAsync(() ->{System.out.println(Thread.currentThread().getName());});System.out.println(task1.get());System.out.println(task2.get());}}

5.2.2 执行结果

在这里插入图片描述

5.3 then* 方法说明

  • 调用then*方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池;
  • 调用then*Async执行第二个任务时,则第一个任务使用的是自己传入的线程池,第二个任务使用的是ForkJoin线程池;
  • ThenAcceptThenAcceptAsyncthenApplythenApplyAsyncthenRunthenRunAsync区别都一样;

thenApplythenApplyAsync为例:

  • 前一个任务执行完,thenApply执行的时候,则main函数线程来执行thenApply
  • 前一个任务没有执行完,则执行前一个任务的线程来执行thenApply这个任务;
  • 这样才能保证顺序。

5.3.1 then 方法示例

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;public class ThenDemoOrder {public static void main(String[] args) throws Exception{/*** 创建一个线程池,该线程池使用提供的固定数量的线程来执行任务,最多5个线程。* 如果所有线程都在工作,那么新提交的任务将在队列中等待,直到有线程变得可用。*/ExecutorService executorService = Executors.newFixedThreadPool(5);Supplier<String> task1 = () ->{System.out.println("1."+Thread.currentThread().getName());return "task1";};Function<String,String> task2 = s -> {System.out.println("2."+Thread.currentThread().getName());return "task2";};/** 创建异步任务*/CompletableFuture<String> future = CompletableFuture.supplyAsync(task1,executorService);//Thread.sleep(1000);/** then* 调用会使用前面任务的线程池 */future.thenApply(task2);//Thread.sleep(1000);System.out.println("haha");}
}

5.3.2 执行结果

在这里插入图片描述

5.4 thenApply

  public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);}

第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

有入参,有返回值

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class FutureThenApplyDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(()->{System.out.println("原始CompletableFuture方法任务");return "1.supplyAsync";});CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> {return a+ ",2.thenApply";});System.out.println(thenApplyFuture.get());}
}

执行结果:

在这里插入图片描述

5.5 thenAccept

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {return uniAcceptStage(null, action);}

表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

有入参,没有返回值

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class FutureThenAcceptDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(()->{return "1.supplyAsync";});CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> {System.out.println(a);});System.out.println(thenAcceptFuture.get());}
}

执行结果:

在这里插入图片描述

5.6 thenRun

public CompletableFuture<Void> thenRun(Runnable action) {return uniRunStage(null, action);}

执行完第一个任务后,执行第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值。

无入参,无返回值

package com.demo.annn;
import java.util.concurrent.CompletableFuture;
public class FutureThenRunDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(()->{System.out.println("1.supplyAsync");return "1.supplyAsync";});CompletableFuture thenRunFuture = orgFuture.thenRun(() -> {System.out.println("2.thenRun");});System.out.println(thenRunFuture.get());}
}

执行结果:

在这里插入图片描述

5.7 thenCompose

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(null, fn);
}

thenCompose用于链接两个异步操作,其中第二个操作依赖于第一个操作的结果。thenCompose方法接受一个函数作为参数,该函数接受第一个CompletableFuture的结果,并返回一个新的CompletableFuture
thenCompose的主要优势在于,它允许你以流程的方式组合多个异步操作,并将他们链接在一起,从而创建一个异步操作链。这对于构建复杂的异步逻辑非常有用。
用途:组合多个异步操作,创建一个异步操作链。返回新的CompletableFuture可以继续调用下去。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class ThenComposeDemo {public static void main(String[] args) throws Exception {/** 创建一个异步任务 */CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 模拟耗时操作} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "First operation result";});/** 接收future1中异步任务的运行结果 ,内部再次创建异步任务返回新的CompletableFuture*/CompletableFuture<String> future2 = future1.thenCompose(result1 -> {// 这个lambda表达式将使用future1的结果,并返回一个新的CompletableFuturereturn CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 第二个异步操作,也模拟耗时} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Second operation result based on " + result1;});});// 获取最终的异步操作结果String finalResult = future2.join();System.out.println(finalResult); // 输出: Second operation result based on First operation result}
}

执行结果:

在这里插入图片描述

注意:thenComposethenApplythenApplyAsync方法有些相似,但它们之间存在关键差异。thenApplythenApplyAsync接收一个函数,该函数应用于前一个CompletableFuture的结果,并立即返回一个新值(而不是另一个CompletableFuture)。而thenCompose则允许你链接一个返回CompletableFuture的函数,从而可以构建更复杂的异步操作链。

thenCompose任务允许传进去一个CompletableFuture类型的函数。

5.8 thenCombine

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(null, other, fn);
}
  • 它用于组合两个CompletableFuture对象的结果,并应用一个函数来生成一个新的结果。当两个CompletableFuture对象都完成时,thenCombine方法将这两个结果作为参数传递给提供的函数,并返回一个新的CompletableFuture对象,该对象包含函数的结果。
  • thenCombine的主要用途是当你有两个异步操作,并且想要基于这两个操作的结果来执行某些操作时。他允许你以一种声明性的方式来表达这种依赖关系,并且以异步的方式处理结果。

总结:组合两个任务结果,两个任务结果作为参数传递给Combine函数,并返回一个新的CompletableFuture,用途基于两个任务操作结果执行某些操作的时候用。

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class ThenCombineDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 20;});CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {// 这里的lambda表达式将接收两个异步操作的结果,并返回一个新的结果return "The sum is: " + (result1 + result2);});// 获取最终的组合结果String finalResult = combinedFuture.join();System.out.println(finalResult); // 输出: The sum is: 30}
}

在这里插入图片描述

注意:

  • thenCombinethenCompose方法的主要区别在于他们处理结果的方式。
  • thenCombine直接应用一个函数来组合两个结果,并返回一个新的CompletableFuture
  • thenCompose则允许你链接一个返回CompletableFuture的函数,以构建更复杂的异步操作链。

5.9 thenAcceptBoth

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) {return biAcceptStage(null, other, action);
}

它用于当两个CompletableFuture对象都完成时,执行一个接受这两个结果的操作,但不返回任何结果(即返回void)。thenAcceptBoth允许你执行一个其他操作,例如记录日志、更新UI等,而不关心操作的返回值。

总结:两个CompletableFuture对应的任务都完成时,执行一个接收两个结果的操作,但是没有返回值。

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class AcceptBothDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 20;});CompletableFuture<Void> combinedFuture = future1.thenAcceptBoth(future2, (result1, result2) -> {// 当两个future都完成时,执行这里的操作System.out.println("Future 1 result: " + result1);System.out.println("Future 2 result: " + result2);System.out.println("add result: " + result2 + result1);});// 获取最终的组合结果// 主线程等待,防止程序立即退出Thread.sleep(2000);}
}

在这里插入图片描述

5.10 runAfterBoth

    public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action) {return biRunStage(null, other, action);}

它用于两个CompletableFuture对象都完成(无论是正常完成还是异常完成)之后执行某个动作。与thenAcceptBoth不同,runAfterBoth不接受任何结果作为参数,并且也不返回任何值(返回void)。它纯粹用于执行一个需要在两个异步操作完成后执行其他操作。

总结:两个CompletableFuture对应的任务都完成时,执行一个一系列的操作,但不接受参数值,也没有返回值。

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class RunAfterBothDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 20;});future1.runAfterBoth(future2, () -> {// 当两个future都完成时,执行这里的操作System.out.println("Both futures have completed.");});// 主线程等待,防止程序立即退出Thread.sleep(2000);}
}

在这里插入图片描述

5.11 runAfterEither

    public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action) {return orRunStage(null, other, action);}

它用于两个CompletableFuture对象中的任意一个完成时执行某个动作。这个方法接受两个参数:另一个CompletableFuture对象和一个Runnable 。当这两个CompletableFuture对象中的任意一个完成时(无论正常完成还是异常完成),Runnable 中的代码将会被执行。

两个异步操作,任何一个完成了就会走runAfterEither定义的逻辑,没有参数,也没有返回值。

runAfterEither与runAfterBoth的主要区别在于,它不需要两个CompletableFuture都完成;只需要其中一个完成就会执行Runnable 。

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class RunAfterEitherDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);System.out.println("1000");} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 10;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);System.out.println("500");} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return 20;});future1.runAfterEither(future2, () -> {// 当两个future中的任意一个完成时,执行这里的操作System.out.println("One of the futures has completed.");});// 主线程等待,防止程序立即退出Thread.sleep(1500);}
}

在这里插入图片描述

5.12 applyToEither

    public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {return orApplyStage(null, other, fn);}

runAfterEither类似,CompletableFuture类也提供了一个applyToEither方法。applyToEither方法也是用于在两个CompletableFuture对象中的任意一个完成时执行一个操作,但与runAfterEither不同的是,applyToEither允许你提供一函数(Function),该函数将应用于已完成CompletableFuture的结果(如果有的话),并返回一个新的CompletableFuture

两个异步操作,任何一个完成了就会走applyToEither定义的逻辑,有返回值。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.function.Function;public class ApplyToEitherDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 2";});// 当任意一个future完成时,应用一个函数并返回一个新的CompletableFutureCompletableFuture<String> resultFuture = future1.applyToEither(future2, Function.identity());// 处理结果resultFuture.thenAccept(result -> {System.out.println("Result: " + result);// 在这里进行后续操作});// 主线程等待,防止程序立即退出resultFuture.join();}
}

在这里插入图片描述

5.12 acceptEither

    public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {return orAcceptStage(null, other, action);}

CompletableFuture类中的acceptEither方法与applyToEither方法类似,但是他们的使用场景和目的有所不同。acceptEither用于在任意一个CompletableFuture完成时执行一个动作,但这个操作是一个Consumer函数,它接受已完成CompletableFuture的结果作为参数,但不返回任何值或新的CompletableFuture

总结: 类似,区别acceptEither接收ConsumerapplyToEither接收 Function

package com.demo.annn;import java.util.concurrent.CompletableFuture;public class AcceptEitherDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {// 模拟另一个耗时操作try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 2";});// 当任意一个future完成时,使用Consumer函数处理结果CompletableFuture<Void> resultFuture = future1.acceptEither(future2, (result) -> {System.out.println("Result: " + result);// 在这里进行基于结果的操作,但不返回新的CompletableFuture});// 主线程等待,防止程序立即退出CompletableFuture.anyOf(future1, future2).join();}
}

在这里插入图片描述

5.13 get()、get(long timeout, TimeUnit unit)、getNow

public T get() throws InterruptedException, ExecutionException {Object r;return reportGet((r = result) == null ? waitingGet(true) : r);
}public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {Object r;long nanos = unit.toNanos(timeout);return reportGet((r = result) == null ? timedGet(nanos) : r);
}public T getNow(T valueIfAbsent) {Object r;return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}
  • get 阻塞等待结果
  • get(long timeout, TimeUnit unit)超时等待结果
  • getNow立刻获取结果,如果任务没有执行完则返回 valueIfAbsent
package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class GetDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {//模拟一个耗时操作try {TimeUnit.SECONDS.sleep(2);}catch (InterruptedException e){Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "5";});//等待异步计算完成,并获取结果String result = future.get();System.out.println("异步计算的结果是:"+result);String result2 = future.get(3,TimeUnit.SECONDS);System.out.println("异步计算的结果2是:"+result2);Thread.sleep(4000);String result3 = future.getNow("valueIfAbsent");System.out.println("异步计算的结果3是:"+result3);}}

在这里插入图片描述

5.14 exceptionally

    public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {return uniExceptionallyStage(fn);}

用于处理异步任务执行过程中出现的异常。当CompletableFuture计算的异步任务抛出异常时,你可以使用exceptionally方法来提供一个函数,该函数会接收异常作为参数,并返回一个新的结果或抛出一个新的异常。
exceptionally方法非常有用,因为它允许你在异步任务失败时执行一些特定的逻辑,而不是让异常直接传播到调用者。这样,你可以控制异常的处理方式,比如记录日志、返回默认值或抛出更具体的异常。

总结:抛出异常的时候exceptionally定义一个函数接收异常,好处是异步任务失败的时候可以做一些特定逻辑,而不是直接返回给调用者。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class ExceptionallyDemo {public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("异步任务失败");});CompletableFuture<String> exceptionalFuture = future.exceptionally(throwable -> {// 处理异常,返回一个新的结果或抛出新的异常System.err.println("捕获到异常: " + throwable.getMessage());return "回退结果";});try {String result = exceptionalFuture.get();System.out.println("最终结果是: " + result);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}
}

在这里插入图片描述

5.15 completeExceptionally

    public boolean completeExceptionally(Throwable ex) {if (ex == null) throw new NullPointerException();boolean triggered = internalComplete(new AltResult(ex));postComplete();return triggered;}

CompletableFuturecompleteExceptionally方法用于在异步计算完成之前,以异常的方式标记这个CompletableFuture为已完成状态。这意味着任何等待这个CompletableFuture完成的操作(例如通过get()方法)将会立即抛出这个异常。

总结:程序执行结束的时候,通过completeExceptionally来标记任务为已完成的状态并且抛出异常。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CmpleteExceptionallyDemo {public static void main(String[] args) {CompletableFuture<String> future = new CompletableFuture<>();// 模拟异步操作失败,使用 completeExceptionally 来标记 future 为完成状态并附带异常future.completeExceptionally(new RuntimeException("异步操作失败"));try {// 尝试获取 future 的结果,将会抛出 ExecutionException,因为 future 是以异常完成的String result = future.get();System.out.println(result);} catch (InterruptedException | ExecutionException e) {// 打印出原始异常信息e.printStackTrace();}}
}

在这里插入图片描述

5.16 whenComplete

    public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {return uniWhenCompleteStage(null, action);}

用于在异步操作完成后执行一个回调函数。无论异步操作时正常完成还是抛出异常,whenComplete都会执行其提供的回调函数。这个方法非常有用,它允许你在异步任务结束时进行清理工作、记录日志、处理结果或异常,而不会阻塞主线程。

总结:无论任务是否抛出异常都会调用whenComplete回调函数,Flink内部经常用来关闭资源等操作。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class WhenCompleteDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result of the async operation";});future.whenComplete((result, throwable) -> {if (throwable != null) {// 处理异步操作抛出的异常System.err.println("Async operation failed: " + throwable.getMessage());} else {// 处理异步操作的结果System.out.println("Async operation completed successfully with result: " + result);}});// 主线程等待异步任务完成future.get();}
}

在这里插入图片描述

5.17 allOf

    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {return andTree(cfs, 0, cfs.length - 1);}

所有任务都执行后才执行下一个任务。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class AllOfFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {try {Thread.sleep(1000); // 模拟耗时操作} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}System.out.println("Future 1 completed");});CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {try {Thread.sleep(2000); // 模拟另一个耗时操作} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}System.out.println("Future 2 completed");});CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);// 阻塞直到所有future都完成allFutures.get();System.out.println("All futures completed");}
}

在这里插入图片描述

5.18 anyOf

    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {return orTree(cfs, 0, cfs.length - 1);}

任意一个任务执行后就可以执行下一个任务了。

package com.demo.annn;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class AnyOfFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000); // 模拟耗时操作System.out.println(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000); // 模拟另一个耗时操作System.out.println(2000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(e);}return "Result from future 2";});CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1.toCompletableFuture(), future2.toCompletableFuture());// 阻塞直到任意一个future完成Object result = anyFuture.get();System.out.println("A future completed with result: " + result);}
}

在这里插入图片描述


http://www.ppmy.cn/news/1571299.html

相关文章

【Qt Creator】Qt Creator编辑器打开QT项目后,项目栏的文件全部呈现灰色的原因分析

目录 1、现象描述2、原因分析3、解决方法 1、现象描述 在学习QT过程中&#xff0c;常常会从网络&#xff08;如GitHub&#xff09;上下载QT项目进行学习或借鉴使用&#xff0c;但是使用Qt Creator编辑器打开项目后&#xff0c;往往会出现项目栏的文件全部呈现灰色的问题&#x…

【Uniapp-Vue3】UniCloud云数据库获取指定字段的数据

使用where方法可以获取指定的字段&#xff1a; let db uniCloud.database(); db.collection("数据表").where({字段名1:数据, 字段名2:数据}).get({getOne:true}) 如果我们不在get中添加{getOne:true}&#xff0c;在只获取到一个数据res.result.data将会是一个数组&…

模型压缩中的四大核心技术 —— 量化、剪枝、知识蒸馏和二值化

一、量化 (Quantization) 量化的目标在于将原始以 32 位浮点数表示的模型参数和中间激活,转换为低精度(如 FP16、INT8、甚至更低位宽)的数值表示,从而在减少模型存储占用和内存带宽的同时,加速推理运算,特别适用于移动、嵌入式和边缘计算场景。 1.1 概念与目标 基本思想…

在Linux上部署Jenkins的详细指南

引言 在当今快速迭代的软件开发环境中&#xff0c;持续集成和持续交付&#xff08;CI/CD&#xff09;变得越来越重要。Jenkins作为一个开源自动化服务器&#xff0c;能够帮助开发者更高效地进行代码集成、测试和部署。本文将详细介绍如何在Linux系统上安装和配置Jenkins。 准…

无人机避障——基于ESDF地图的JPS算法前端路径规划

原来是用栅格地图的方式&#xff0c;0表示可通行区域&#xff0c;1表示不可通行区域&#xff0c;然后采用JPS算法做路径规划&#xff0c;从起点到终点规划出一条路径。但是目前我需要做的是将栅格地图更换为ESDF地图&#xff0c;那么JPS算法计算代价的部分是否需要进行变化。 …

2.3-2.9学习周报

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 摘要Abstract一、相关概念1.文本提取(DLE)2.以样本为中心的情境学习&#xff08;SAIL&#xff09;2.1问题公式化2.2文档级文本相似性2.3实体级文本相似性2.4布局相似…

基于 Nginx 的 CDN 基础实现

概览 本文是对基于Nginx的CDN网络的学习笔记&#xff0c;阅读的代码为&#xff1a;https://github.com/leandromoreira/cdn-up-and-running 其中&#xff0c;先确定CDN中的一些基础概念&#xff1a; Balancer&#xff1a;负载均衡&#xff0c;即请求数据的流量最开始打到Bal…

Windows 系统下使用 Ollama 离线部署 DeepSeek - R1 模型指南

引言 随着人工智能技术的飞速发展&#xff0c;各类大语言模型层出不穷。DeepSeek - R1 凭借其出色的语言理解和生成能力&#xff0c;受到了广泛关注。而 Ollama 作为一款便捷的模型管理和部署工具&#xff0c;能够帮助我们轻松地在本地环境中部署和使用模型。本文将详细介绍如…