文章目录
- FutureTask分析
- FutureTask介绍
- FutureTask应用
- FutureTask源码分析
- CompletableFuture
- CompletableFuture介绍
- 场景应用
- 源码分析
FutureTask分析
FutureTask介绍
FutureTask 是 Java 并发包 (java.util.concurrent) 中的一个 可取消的异步计算任务,它实现了 Runnable 和 Future 接口,可以用于 异步任务执行 和 获取结果。
FutureTask应用
java"> Callable<Integer> callable = () -> {System.out.println("任务开始执行...");Thread.sleep(2000);return 10;};// 包装成 FutureTaskFutureTask<Integer> futureTask = new FutureTask<>(callable);// 启动线程执行任务new Thread(futureTask).start();System.out.println("主线程可以做其他事情...");// 获取任务执行结果(如果任务未完成,会阻塞)Integer result = futureTask.get();System.out.println("任务执行结果: " + result);
结合线程池执行FutureTask
java">import java.util.concurrent.*;public class FutureTaskWithThreadPool {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executor = Executors.newFixedThreadPool(2);FutureTask<Integer> futureTask = new FutureTask<>(() -> {Thread.sleep(2000);return 200;});// 提交 FutureTask 到线程池executor.submit(futureTask);System.out.println("主线程继续执行...");Integer result = futureTask.get(); // 阻塞等待结果System.out.println("异步任务结果: " + result);executor.shutdown();}
}
FutureTask源码分析
FutureTask的run流程和get流程图
了解FutureTask的枚举值
java"> /*** NEW -> COMPLETING -> NORMAL 任务正常执行,并且返回结果也正常返回* NEW -> COMPLETING -> EXCEPTIONAL 任务正常执行,但是结果是异常* NEW -> CANCELLED 任务被取消 * NEW -> INTERRUPTING -> INTERRUPTED 任务被中断*///状态标识private volatile int state;//初始状态private static final int NEW = 0;//中间状态private static final int COMPLETING = 1;//正常执行完成private static final int NORMAL = 2;//任务报错private static final int EXCEPTIONAL = 3;//任务取消private static final int CANCELLED = 4;//任务中断执行中private static final int INTERRUPTING = 5;//中断完成private static final int INTERRUPTED = 6;/** 需要执行的任务,会被赋值到这个全局变量 */private Callable<V> callable;/** 任务执行结果,也会被赋值到这个全局对象中 */private Object outcome; // non-volatile, protected by state reads/writes/** 执行任务的线程 */private volatile Thread runner;/** 等待返回结果的线程WaiteNode */private volatile WaitNode waiters;static final class WaitNode {//线程volatile Thread thread;volatile WaitNode next;//有参构造WaitNode() { thread = Thread.currentThread(); }}
有参构造及run()方法的源码解析
java"> public FutureTask(Callable<V> callable) {//健壮性校验if (callable == null)throw new NullPointerException();//给callable赋值this.callable = callable;//将当前的状态置为NEWthis.state = NEW; // ensure visibility of callable}//run方法的执行流程public void run() {//判断state==new,并且cas将runnerOffset赋值给当前线程if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {//将全局变量赋值给局部变量Callable<V> c = callable;//健壮性校验,dclif (c != null && state == NEW) {V result;//执行成功的标志,默认falseboolean ran;try {//执行callable中的call()方法result = c.call();//将标志位设置为trueran = true;} catch (Throwable ex) {//任务执行报错,将结果置为nullresult = null;//任务执行完成设置为falseran = false;//设置报错信息setException(ex);}//任务成功执行完成if (ran)//设置结果set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}//任务报错,设置报错信息protected void setException(Throwable t) {//CAS,先将当前状态从NEW设置为COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//全局结果设置为toutcome = t;//将当前的state,设置为EXCEPTIONALUNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state//唤醒下一个节点finishCompletion();}}//任务成功执行完成protected void set(V v) {//cas 将stateOffset从new设置为completingif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//设置结果outcome = v;//设置为normalUNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state//唤醒此节点的后续节点finishCompletion();}}//唤醒下一个节点的操作private void finishCompletion() {// 设置局部变量q,并且给局部变量赋值waitersfor (WaitNode q; (q = waiters) != null;) {//将waiterOffset从q设置为nullif (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//死循环唤醒后续所有的节点for (;;) {Thread t = q.thread;if (t != null) {//help GCq.thread = null;//唤醒此节点LockSupport.unpark(t);}//获取到下一个节点WaitNode next = q.next;//如果后续节点为nullif (next == null)//跳出此节点,结束break;//去除上一个节点,help gcq.next = null; // unlink to help gc//将下一个节点,赋值给qq = next;}//结束break;}}//待实现done();//将callable置为nullcallable = null; // to reduce footprint}
get()方法的源码分析
java"> public V get() throws InterruptedException, ExecutionException {//将局部变量设置为全局变量int s = state;//判读那状态if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}//awaitDone方法private int awaitDone(boolean timed, long nanos)throws InterruptedException {//如果有超时时间,设置超时时间final long deadline = timed ? System.nanoTime() + nanos : 0L;//声明局部变量WaitNode q = null;//声明标志位boolean queued = false;//死循环for (;;) {//线程中断if (Thread.interrupted()) {//移除当前节点removeWaiter(q);//抛出异常throw new InterruptedException();}//将全局变量赋值给局部变量int s = state;//判断当前任务是否执行if (s > COMPLETING) {//健壮性校验if (q != null)//help gcq.thread = null;//返回状态return s;}//状态为completingelse if (s == COMPLETING) // cannot time out yet//当前线程让出cpuThread.yield();//q为null,封装新的waitNodeelse if (q == null)q = new WaitNode();//第一次进来else if (!queued)// 没放队列的话,直接放到waiters的前面queued = UNSAFE.compareAndSwapObject(this, waitersOffset,//判断是否有超时限制 q.next = waiters, q);else if (timed) {//判断剩余超时时间nanos = deadline - System.nanoTime();//已经超时if (nanos <= 0L) {//移除节点removeWaiter(q);//返回statereturn state;}//挂起当前线程,设置超时时间LockSupport.parkNanos(this, nanos);}//如果没有超时时间限制,直接将当前线程挂起elseLockSupport.park(this);}}//封装当前的结果private V report(int s) throws ExecutionException {//将全局的返回结果,赋值给局部变量Object x = outcome;//判断是否是正常执行完结束if (s == NORMAL)//返回结果return (V)x;//非正常执行我那结束,手动取消if (s >= CANCELLED)throw new CancellationException();//抛出异常throw new ExecutionException((Throwable)x);}//移除waiterNode节点private void removeWaiter(WaitNode node) {//健壮性校验if (node != null) {//help GCnode.thread = null;retry://死循环for (;;) { // restart on removeWaiter race//pred 前置节点 q 当前节点 s: next的节点for (WaitNode pred = null, q = waiters, s; q != null; q = s) {//赋值s=q.nexts = q.next;if (q.thread != null)//前置节点赋值pred = q;//前置节点不为nullelse if (pred != null) {//移除q节点pred.next = s;//判断pred.thread是否wieldnullif (pred.thread == null) // check for racecontinue retry;}//q节点置为selse if (!UNSAFE.compareAndSwapObject(this, waitersOffset,//失败重试 q, s))continue retry;}break;}}}
FutureTask的cacel()方法源码分析和流程图
源码分析
java"> public boolean cancel(boolean mayInterruptIfRunning) {// 查看任务的状态是否是NEW,如果NEW状态,就基于传入的参数mayInterruptIfRunning// 决定任务是直接从NEW转换为CANCEL,还是从NEW转换为INTERRUPTINGif (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try { // 如果mayInterruptIfRunning为true,需要中断线程if (mayInterruptIfRunning) {try {//将全局变量赋值给局部变量Thread t = runner;//健壮性校验if (t != null)//中断线程t.interrupt();} finally { // final state//cas将状态改为中断UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {//唤醒后续的线程finishCompletion();}//返回结果return true;}
CompletableFuture
CompletableFuture介绍
CompletableFuture 是 Java 8 引入的 异步编程工具,提供了更强大的功能来处理异步任务、组合任务、并行计算,并支持非阻塞编程。
supplyAsync() 适用于有返回值的异步任务。
runAsync() 适用于没有返回值的异步任务。
thenApply() 接收前一个任务的返回值,然后转换返回新值。
thenAccept() 只消费结果,但不返回新值。
thenRun() 不接收前面任务的返回值,只是在任务完成后执行某些操作。
thenCombine() 合并两个 CompletableFuture 的结果。
allOf() 等待所有 CompletableFuture 任务完成,但不会收集返回值。
anyOf() 只要有一个 CompletableFuture 任务完成,就返回结果。
exceptionally() 捕获异常,并提供默认值。
handle() 可同时处理成功和失败情况,更灵活。
场景应用
有返回值的场景
java"> CompletableFuture completableFuture=CompletableFuture.supplyAsync(()->{System.out.println("task1开始执行");return "abc";}).thenApply(result->{System.out.println("task1的结果:"+result+",开始执行task2");return "任务完成";});System.out.println("获取task2的返回结果:"+completableFuture.get());
无返回值的场景:
java"> CompletableFuture completableFuture=CompletableFuture.runAsync(()->{System.out.println("task1开始执行");return ;}).thenRun(()->{System.out.println("开始执行task2");return ;});
源码分析
我们源码分析,主要分析runAsync()方法和thenRun()方法
流程图
分析runAsync()源码
流程图:
源码分析
java">//传入Runnable方法public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}//异步执行static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {//健壮性校验if (f == null) throw new NullPointerException();//封装返回的结果CompletableFuture<Void> d = new CompletableFuture<Void>();//扔到线程池中执行e.execute(new AsyncRun(d, f));//返回结果return d;}//AsyncRun()方法static final class AsyncRun extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {//声明全局变量CompletableFuture<Void> dep; Runnable fn;//有参构造AsyncRun(CompletableFuture<Void> dep, Runnable fn) {//给全局变量赋值this.dep = dep; this.fn = fn;}public final Void getRawResult() { return null; }public final void setRawResult(Void v) {}public final boolean exec() { run(); return true; }//在线程池中执行此方法public void run() {//声明局部变量CompletableFuture<Void> d; Runnable f;//健壮性校验,并且给局部变量赋值if ((d = dep) != null && (f = fn) != null) {//将之前的全部变量置为null,help GCdep = null; fn = null;//判断当前任务是否执行if (d.result == null) {try {//任务未执行,此时执行run方法f.run();//封装返回结果d.completeNull();} catch (Throwable ex) {//封装报错信息d.completeThrowable(ex);}}//触发后续任务d.postComplete();}}}//后续任务源码分析final void postComplete() { //声明局部变量CompletableFuture<?> f = this; Completion h;//进入循环,并且给h赋值,最后进行参数校验while ((h = f.stack) != null ||//任务栈被改变,需要重新办检查(f != this && (h = (f = this).stack) != null)) {//声明两个局部变量CompletableFuture<?> d; Completion t;//cas 将h节点换成h.next,给t赋值为h.nextif (f.casStack(h, t = h.next)) {//健壮性校验if (t != null) {//如果栈发生了新的改变if (f != this) {//将h节点重新压入栈中pushStack(h);//跳过continue;}// help gch.next = null; // detach}//执行 `Completion` 任务f = (d = h.tryFire(NESTED)) == null ? this : d;}}}//执行UniRun中的tryFire方法static final class UniRun<T> extends UniCompletion<T,Void> {//声明变量Runnable fn;//有参构造src:前置任务,fn:Runnable方法UniRun(Executor executor, CompletableFuture<Void> dep,CompletableFuture<T> src, Runnable fn) {super(executor, dep, src); this.fn = fn;}final CompletableFuture<Void> tryFire(int mode) {//声明两个局部变量CompletableFuture<Void> d; CompletableFuture<T> a;//给局部变量d赋值,并且进行健壮性校验if ((d = dep) == null ||//执行失败,直接返回null!d.uniRun(a = src, fn, mode > 0 ? null : this))return null;//将变量全部赋值为null help gcdep = null; src = null; fn = null;//清理任务栈 stack,并调用 postComplete() 处理后续任务。return d.postFire(a, mode);}}//执行UniRun方法final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {//声明局部变量Object r; Throwable x;//校验前置任务信息,和执行结果if (a == null || (r = a.result) == null || f == null)return false;//当前任务还没有执行if (result == null) {//判断前置任务的执行是否报错if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)//封装错误信息completeThrowable(x, r);elsetry {//校验健壮性 && 异步执行任务if (c != null && !c.claim())return false;//执行run()方法f.run();//封装执行结果completeNull();} catch (Throwable ex) {//报错的话,也需要封装报错信息completeThrowable(ex);}}//返回结果return true;}
// 异步的线程池处理任务
final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {// 只要有线程池对象,不为nullif (e == null)return true;executor = null; // disable// 基于线程池的execute去执行任务e.execute(this);}return false;
}final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {// 如果 `a` 存在,并且 `a.stack` 还有未执行的回调任务if (a != null && a.stack != null) {if (mode < 0 || a.result == null) // mode < 0 表示异步执行a.cleanStack(); // 清理 `stack`elsea.postComplete(); // 处理 `stack` 中的任务}// 如果当前 `CompletableFuture` 已经有结果,并且 `stack` 还有未执行任务if (result != null && stack != null) {if (mode < 0) // 如果 mode < 0,返回当前 `CompletableFuture`return this;elsepostComplete(); // 触发 `postComplete()` 继续执行}return null;
}
分析threnRun()源码
java"> //调用thenRun()方法,传入Runnablepublic CompletableFuture<Void> thenRun(Runnable action) {//调用此方法return uniRunStage(null, action);}//uniRunStage源码分析
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {// 后继任务不能为null,健壮性判断if (f == null) throw new NullPointerException();// 创建CompletableFuture对象d,与后继任务f绑定CompletableFuture<Void> d = new CompletableFuture<Void>();// 如果线程池不为null,代表异步执行,将任务压栈// 如果线程池是null,先基于uniRun尝试下,看任务能否执行if (e != null || !d.uniRun(this, f, null)) {// 如果传了线程池,这边需要走一下具体逻辑// e:线程池// d:后继任务的CompletableFuture// this:前继任务的CompletableFuture// f:后继任务UniRun<T> c = new UniRun<T>(e, d, this, f);// 将封装好的任务,push到stack栈结构// 只要前继任务没结束,这边就可以正常的将任务推到栈结构中// 放入栈中可能会失败push(c);// 无论压栈成功与否,都要尝试执行以下。c.tryFire(SYNC);}// 无论任务执行完毕与否,都要返回后继任务的CompletableFuturereturn d;
}