FutureTask源码

news/2024/11/29 10:01:18/

介绍

在创建线程的方式中,我们可以直接继承Thread和实现Callable接口来创建线程,但是这两种创建线程的方式不能返回执行的结果。于是从JDK1.5开始提供了Callable接口和Future接口,这两种创建线程的方式可以在执行完任务之后返回执行结果。

Future模式可以让线程异步获得执行的结果,而不用等到线程等到run()方法里面执行完再执行之后的逻辑。

public class FutureTask<V> implements RunnableFuture<V>

image-20230622200425251

我们看到FutureTask实现了RunnableFuture接口,其中RunnableFuture接口继承了Runnable和Future接口,因此我们可以说FutureTask实现了Future接口和Runnable接口,但是我们需要注意的是,FutureTask也实现了Callable接口,虽然这并没有在继承体系中看出,但是可以从FutureTask的源码中看出。

FutureTask用于异步执行或取消任务的场景。通过传入Runnable的实现类或Callable的实现类给FutureTask对象,

之后将FutureTask对象传给Thread对象或线程池。可以通过FutureTask对象.get()方法可以异步返回执行结果,

如果任务还没有执行完,则调用get()方法的线程会陷入阻塞,直到任务执行完。无论有多少个线程调用get()方法,

FutureTask中的run()逻辑只会执行一次。我们还可以调用FutureTask对象的cancel()方法取消掉当前任务。

常量&变量

    /*** The run state of this task, initially NEW.  The run state* transitions to a terminal state only in methods set,* setException, and cancel.  During completion, state may take on* transient values of COMPLETING (while outcome is being set) or* INTERRUPTING (only while interrupting the runner to satisfy a* cancel(true)). Transitions from these intermediate to final* states use cheaper ordered/lazy writes because values are unique* and cannot be further modified.** Possible state transitions:* NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*///表示当前任务的状态private volatile int state;//表示新创建状态,任务尚未执行。private static final int NEW          = 0;//表示当前任务即将结束,但是还未完全结束,返回值还未写入,处于一种临界状态。private static final int COMPLETING   = 1;//表示当前任务处于正常结束状态(没有发生异常,中断,取消)。private static final int NORMAL       = 2;// 表示当前任务执行过程中出现了异常,处于非正常结束状态。内部封装的callable.call()向上抛出异常了private static final int EXCEPTIONAL  = 3;//表示当前任务因调用cancel而处于被取消状态。private static final int CANCELLED    = 4;//表示当前任务处于中断中,但是还没有完全中断的阶段。private static final int INTERRUPTING = 5;//表示当前任务处于已完全中断的阶段。private static final int INTERRUPTED  = 6;/** The underlying callable; nulled out after running */// 我们在使用FutureTask对象的时候,会传入一个Callable实现类或Runnable实现类,这个callable存储的就是// 传入的Callable实现类或Runnable实现类(Runnable会被使用修饰者设计模式伪装为)callable//submit(callable/runnable):其中runnable使用了装饰者设计模式伪装成了callableprivate Callable<V> callable;/** The result to return or exception to throw from get() */// 正常情况下,outcome保存的是任务的返回结果// 不正常情况下,outcome保存的是任务抛出的异常private Object outcome; // non-volatile, protected by state reads/writes/** The thread running the callable; CASed during run() */// 保存的是当前任务执行期间,执行任务的线程的引用private volatile Thread runner;/** Treiber stack of waiting threads */// 因为会有很多线程去get结果,这里把线程封装成WaitNode,一种数据结构:栈,头插头取private volatile WaitNode waiters;/*** Simple linked list nodes to record waiting threads in a Treiber* stack.  See other classes such as Phaser and SynchronousQueue* for more detailed explanation.*/static final class WaitNode {// 线程对象volatile Thread thread;// 下一个WaitNode结点volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}

构造方法

    /*** Creates a {@code FutureTask} that will, upon running, execute the* given {@code Callable}.** @param  callable the callable task* @throws NullPointerException if the callable is null* 传入一个callable,调用get返回的结果就是callable返回的结果*/public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;// 设置状态为新创建this.state = NEW;       // ensure visibility of callable}/*** Creates a {@code FutureTask} that will, upon running, execute the* given {@code Runnable}, and arrange that {@code get} will return the* given result on successful completion.** @param runnable the runnable task* @param result the result to return on successful completion. If* you don't need a particular result, consider using* constructions of the form:* {@code Future<?> f = new FutureTask<Void>(runnable, null)}* @throws NullPointerException if the runnable is null* 传入一个runnable,一个result变量,调用get方法返回的结果就是传入的result变量*/public FutureTask(Runnable runnable, V result) {// 装饰者模式,将runnable转化为callable接口this.callable = Executors.callable(runnable, result);// 设置状态为新创建this.state = NEW;       // ensure visibility of callable}

内部类

WaitNode

WaitNode如下所示以内部类的形式存在于FutureTask中,主要是为了优化get方法阻塞主线程的问题,如果没有WaitNode,那么主线程将会一直轮询重试,如果子任务执行时间较长或者获取get结果的主线程数量过多,肯定会大量占用cpu资源,因此引入WaitNode作为所有等待结果的线程链表或队列。

一旦有线程执行get方法获取结果,并且任务并未完成,就需要新建WaitNode节点添加到等待队列中;后续的任务完成,会依次通知等待队列中的所有线程。WaitNode的数据结构也很简单,一个是当前线程对象,一个是等待链表的next指针。

    /*** Simple linked list nodes to record waiting threads in a Treiber* stack.  See other classes such as Phaser and SynchronousQueue* for more detailed explanation.*/static final class WaitNode {// 线程对象volatile Thread thread;// 下一个WaitNode结点volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}

run()方法及与其相关的方法

run()

调用Callable对象run()方法任务的具体逻辑和一些关于任务状态返回结果的逻辑。

    public void run() {// 当前任务状态不为new或者runner旧值不为null,说明已经启动过了,直接返回,// 这里也说明了run()里面的具体逻辑只会被执行一次。if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;// 只有当任务状态为new并且runner旧值为null才会执行到这里try {// 传入的callable任务Callable<V> c = callable;// 当任务不为null并且当前任务状态为新建时才会往下执行// 条件1:防止空指针异常// 条件2:防止外部线程cacle掉当前任务if (c != null && state == NEW) {// 储存任务的返回结果V result;// 储存执行是否成功boolean ran;try {// 调用callable.run()并返回结果result = c.call();// 正常执行设置ran为trueran = true;} catch (Throwable ex) {// callable的run()方法抛出了异常// 设置结果为nullresult = null;// 执行失败设置ran为falseran = false;// 内部设置outcome为抛出的异常,//并且更新任务状态为EXCEPTIONAL(执行过程中出现了异常)并且唤醒阻塞的线程setException(ex);}// 如果执行成功(正常执行)if (ran)// 内部设置outcome为callable执行的结果,并且更新任务的状态为NORMAL(任务正常执行)并且唤醒阻塞的线程set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()// 将当前任务的线程设置为nullrunner = null;// state must be re-read after nulling runner to prevent// leaked interrupts// 当前任务的状态int s = state;// 如果state>=INTERRUPTING,说明当前任务处于中断中或已中断状态if (s >= INTERRUPTING)// 如果当前任务处于中,则执行这个方法线程会不断让出cpu直到任务处于已中断状态handlePossibleCancellationInterrupt(s);}}

setException(Throwable t)

如果在执行run()方法的过程中出现了异常会执行这个方法,里面具体的逻辑是:

  1. 设置任务的状态为EXCEPTIONAL(表示因为出现异常而非正常完成)
  2. 设置outcome(返回结果)为Callable对象的run()方法抛出的异常
  3. 执行finishCompletion()方法唤醒因为调用get()方法而陷入阻塞的线程。
    /*** Causes this future to report an {@link ExecutionException}* with the given throwable as its cause, unless this future has* already been set or has been cancelled.** <p>This method is invoked internally by the {@link #run} method* upon failure of the computation.** @param t the cause of failure*/protected void setException(Throwable t) {// 如果当前任务的状态是新建状态,则设置任务状态为临界状态(即将完成)if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 设置outcome(结果)为callable.run()抛出的异常outcome = t;// 设置当前任务的状态为出现中断异常UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state// 唤醒调用get()的所有等待的线程并清空栈finishCompletion();}}

set(V v)

如果执行run()方法正常结束(没有出现异常)会执行这个方法,里面的具体逻辑是:

  1. 设置任务的状态为NORMAL(表示正常结束)
  2. 设置outcome(返回结果)为Callable对象调用run()方法返回的结果
  3. 唤醒因为调用get()方法陷入阻塞的线程
    /*** Sets the result of this future to the given value unless* this future has already been set or has been cancelled.** <p>This method is invoked internally by the {@link #run} method* upon successful completion of the computation.** @param v the value*/protected void set(V v) {// 如果当前任务的状态为新建状态,则设置当前任务的状态为临界状态(即将完成)if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 设置outcome(结果)为callable.run()返回的结果outcome = v;// 设置当前任务的状态为正常结束UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state// 唤醒调用get()的所有等待的线程并清空栈finishCompletion();}}

finishCompletion()

任务执行完成(正常结束和非正常结束都代表任务执行完成)会调用这个方法来唤醒所有因调用get()方法陷入阻塞的线程

    /*** Removes and signals all waiting threads, invokes done(), and* nulls out callable.*/private void finishCompletion() {// assert state > COMPLETING;// 如果条件成立,说明当前有陷入阻塞的线程for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {// 获取当前节点封装的threadThread t = q.thread;// 防止空指针异常if (t != null) {// 设置q.thread为null,方便GCq.thread = null;// 唤醒当前节点封装的线程LockSupport.unpark(t);}// 获取下一个WaitNode节点WaitNode next = q.next;// 如果next为空,说明栈现在已经为空,调用get()陷入阻塞的线程已经全部唤醒,直接breakif (next == null)break;// 执行到这里说明还有因调用get()而陷入阻塞的线程,自旋接着唤醒// 这里q.next设置为null帮助GC(垃圾回收)q.next = null; // unlink to help gc// 下一个WaitNode节点q = next;}// 中断break;}}// 空方法,子类可以重写done();// 将callable设置为null,方便GCcallable = null;        // to reduce footprint}

handlePossibleCancellationInterrupt(int s)

这个方法在run()方法里可能会执行,当任务的状态为中断中时,抢到cpu的线程会释放cpu资源,直到任务状态更新为已中断状态。

    /*** Ensures that any interrupt from a possible cancel(true) is only* delivered to a task while in run or runAndReset.*/private void handlePossibleCancellationInterrupt(int s) {// It is possible for our interrupter to stall before getting a// chance to interrupt us.  Let's spin-wait patiently.if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield(); // wait out pending interrupt// assert state == INTERRUPTED;// We want to clear any interrupt we may have received from// cancel(true).  However, it is permissible to use interrupts// as an independent mechanism for a task to communicate with// its caller, and there is no way to clear only the// cancellation interrupt.//// Thread.interrupted();}

get()方法及与其相关的方法

get()

get()方法获取的是任务执行完后返回的结果。对于空参的get()方法来说,如果任务还没有执行完就有线程调用get()方法获取结果,则该线程会陷入阻塞,阻塞的具体方法是awaitDone方法

    /*** @throws CancellationException {@inheritDoc}*/public V get() throws InterruptedException, ExecutionException {int s = state;// 判断当前任务的状态是否小于COMPLETING,如果成立说明当前任务的状态要么为新建状态要么为临界状态if (s <= COMPLETING)// 条件成立会调用awaitDone方法自旋等待直到任务完成s = awaitDone(false, 0L);return report(s);}

对于指定时间含参的get方法来说,如果在指定时间没有返回结果,则会抛出时间超时异常(TimeoutException)

    /*** @throws CancellationException {@inheritDoc}*/public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);}

awaitDone(boolean timed, long nanos)

这个方法是用来阻塞所有因调用get()方法获取结果但是FutureTask任务还没有执行完的线程awaitDone方法在**get()**方法里面会被调用。

awaitDone用来阻塞线程时需要满足的条件:

  1. 任务还没有执行完
  2. 线程调用了**get()**方法
    /*** Awaits completion or aborts on interrupt or timeout.** @param timed true if use timed waits* @param nanos time to wait, if timed* @return state upon completion* 这个方法的作用是等待任务被完成(正常完成或出现异常完成都算完成),被中断,或是被超时*/private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;// 这个WaitNode其实存储的是当前线程WaitNode q = null;// 表示当前线程代表的WaitNode对象是否入栈成功boolean queued = false;for (;;) {// 如果当前线程出现中断异常,则将该线程代表的WaitNode结点移出栈并抛出中断异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}// 获取当前任务的状态int s = state;// 如果当前任务状态大于COMPLETING,说明当前任务已经有结果了(任务完成、中断、取消),直接返回任务状态if (s > COMPLETING) {if (q != null)// 设置q.thread为null,方便GCq.thread = null;return s;}// 当前任务处于临界状态,即将完成,则当前线程释放cpuelse if (s == COMPLETING) // cannot time out yetThread.yield();// 第一次自旋,如果当前WitNode为null,new一个WaitNode结点else if (q == null)q = new WaitNode();// 第二次自旋,如果当前WaitNode节点没有入队,则尝试入队else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 第三次自旋,到这里表示是否定义了超时时间else if (timed) {nanos = deadline - System.nanoTime();// 超出了指定时间,就移除当前节点并返回任务状态if (nanos <= 0L) {removeWaiter(q);return state;}// 未超出时间,挂起当前线程一定时间LockSupport.parkNanos(this, nanos);}else// 挂起当前线程,该线程会休眠(什么时候该线程会继续执行呢?除非有其他线程调用unpark()或者中断该线程)LockSupport.park(this);}}

removeWaiter

出现中断时,清空栈中的结点

    /*** Tries to unlink a timed-out or interrupted wait node to avoid* accumulating garbage.  Internal nodes are simply unspliced* without CAS since it is harmless if they are traversed anyway* by releasers.  To avoid effects of unsplicing from already* removed nodes, the list is retraversed in case of an apparent* race.  This is slow when there are a lot of nodes, but we don't* expect lists to be long enough to outweigh higher-overhead* schemes.*/private void removeWaiter(WaitNode node) {if (node != null) {node.thread = null;retry:for (;;) {          // restart on removeWaiter racefor (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next;// 后继节点不为空if (q.thread != null)pred = q;// 前驱结点不为空else if (pred != null) {// 前驱结点的后继结点指向当前结点的后继结点,就相当于将当前节点删去pred.next = s;if (pred.thread == null) // check for racecontinue retry;}else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}break;}}}

report(int s)

这个方法是真正用来获取任务的返回结果的,这个方法在**get()**方法里面会被调用,如果该方法被调用,说明任务已经执行完了。

    /*** Returns result or throws exception for completed task.** @param s completed state value*/@SuppressWarnings("unchecked")private V report(int s) throws ExecutionException {// 获取outcome的值Object x = outcome;// 如果当前任务的状态为正常结束,则返回outcome的值if (s == NORMAL)return (V)x;// 如果当前任务的状态 >= CANCELLED 说明当前任务状态为被取消、或是在中断中、或是已经中断完成if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}

cancel(boolean mayInterruptIfRunning)

这个方法可以取消当前任务

    public boolean cancel(boolean mayInterruptIfRunning) {// 条件1成立说明当前任务正在运行中或者处于线程池队列中// 条件2成立说明CAS成功可以执行下面的逻辑了,否则返回false,代表cancel失败if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try {    // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {// 获取当前正在执行的线程,也有可能是null,是null的情况代表当前任务正在队列中,线程还没有获取到它呢Thread t = runner;// 给Thread一个中断信号,如果你的程序是响应中断的,则走中断逻辑;如果你的程序不是响应中断的,则什么也不做if (t != null)t.interrupt();} finally { // final state// 设置任务状态为已中断UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 唤醒所有因调用get()而陷入阻塞的线程finishCompletion();}return true;}

http://www.ppmy.cn/news/526756.html

相关文章

PHP网站整合支付宝网页接口的代码

最近做了个商城需要支付宝网页支付&#xff0c;实现了网站的在线支付和订单通知回调功能。 语言有障碍直接看实现代码&#xff1a; 在支付宝SDK文件夹里建立一个文件名称自定。我的是Alipay.php require_once dirname ( __FILE__ ).DIRECTORY_SEPARATOR.wappay/service/Ali…

为什么说数据不动代码动?移动计算比移动数据更划算?

写在前面 本文隶属于专栏《100个问题搞定大数据理论体系》&#xff0c;该专栏为笔者原创&#xff0c;引用请注明来源&#xff0c;不足和错误之处请在评论区帮忙指出&#xff0c;谢谢&#xff01; 本专栏目录结构和文献引用请见100个问题搞定大数据理论体系 解答 在分布式系统…

蛮力0-1背包问题c语言实现,蛮力动规贪心回溯法的01背包和TSP问题

《蛮力动规贪心回溯法的01背包和TSP问题》由会员分享&#xff0c;可在线阅读&#xff0c;更多相关《蛮力动规贪心回溯法的01背包和TSP问题(8页珍藏版)》请在人人文库网上搜索。 1、华北科技学院计算机学院综合性实验实 验 报 告 课程名称 算法分析与设计 实验学期 2014 至 2015…

大学物理:补充-能量

文章目录 知识点非保守力保守力动能势能机械能能量守恒 Some tips不能单独考虑某个质点的能量可以单独考虑某个质点的能量 杂项 知识点 非保守力 做功与路径有关的力如摩擦力等 保守力 做功与路径无关&#xff0c;只与位矢的起始点和终止点有关的力&#xff08;只和位置有关…

我的人生感悟

我的人生感悟 年幼无知&#xff0c;只随父母脚步&#xff0c; 叛逆失落&#xff0c;迷茫整个高中&#xff1b; 收拾心情&#xff0c;寻答案于大学&#xff0c; 人间冷暖&#xff0c;重拾难为自信&#xff1b; 偏见自封&#xff0c;误把消极当理性&#xff0c; 固执狭隘&…

logo设计如何增加附加价值

如今&#xff0c;是品牌观念盛行的时代&#xff0c;logo以及标志设计对企业公司起着重要的作用。logo不仅是企业品牌传达&#xff0c;更是对企业文化以及企业理念的传达&#xff0c;因此&#xff0c;企业logo设计很重要。优秀的公司Logo设计可提高销售力&#xff0c;扩展品牌效…

学习坐标【javascript】拼图小游戏 -java教程

发一下牢骚和主题无关&#xff1a; 兴致才是学习的力动源泉。始终欢喜游戏类的编程&#xff0c;因为得觉好玩。。。 周末这2天预备写个拼图的小游戏&#xff0c;只为学习面向对象&#xff0c;虽然对面向对象里的很多西东还不太熟悉。 昨天&#xff08;星期六&#xff09;开始写…

云计算、系统-什么是云计算?-by小雨

最近研讨云计算、系统-&#xff0c;稍微总结一下&#xff0c;以后继续补充&#xff1a; 说到云盘算&#xff0c;大部份人都云里雾里。有人以为它是一台巨无霸电脑&#xff0c;供给了超级盘算能力&#xff1b;也有人将其比作物流系统&#xff0c;把云盘算带来的利便比作一个个包…