【异步编程解析】

news/2025/2/21 5:27:11/

文章目录

  • FutureTask分析
    • FutureTask介绍
    • FutureTask应用
    • FutureTask源码分析
  • CompletableFuture
    • CompletableFuture介绍
    • 场景应用
    • 源码分析

FutureTask分析

FutureTask介绍

FutureTask 是 Java 并发包 (java.util.concurrent) 中的一个 可取消的异步计算任务,它实现了 Runnable 和 Future 接口,可以用于 异步任务执行 和 获取结果。

FutureTask应用

java">        Callable<Integer> callable = () -> {System.out.println("任务开始执行...");Thread.sleep(2000);return 10;};// 包装成 FutureTaskFutureTask<Integer> futureTask = new FutureTask<>(callable);// 启动线程执行任务new Thread(futureTask).start();System.out.println("主线程可以做其他事情...");// 获取任务执行结果(如果任务未完成,会阻塞)Integer result = futureTask.get();System.out.println("任务执行结果: " + result);

结合线程池执行FutureTask

java">import java.util.concurrent.*;public class FutureTaskWithThreadPool {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executor = Executors.newFixedThreadPool(2);FutureTask<Integer> futureTask = new FutureTask<>(() -> {Thread.sleep(2000);return 200;});// 提交 FutureTask 到线程池executor.submit(futureTask);System.out.println("主线程继续执行...");Integer result = futureTask.get(); // 阻塞等待结果System.out.println("异步任务结果: " + result);executor.shutdown();}
}

FutureTask源码分析

FutureTask的run流程和get流程图
在这里插入图片描述

了解FutureTask的枚举值

java"> /*** NEW -> COMPLETING -> NORMAL           任务正常执行,并且返回结果也正常返回* NEW -> COMPLETING -> EXCEPTIONAL      任务正常执行,但是结果是异常* NEW -> CANCELLED                      任务被取消   * NEW -> INTERRUPTING -> INTERRUPTED    任务被中断*///状态标识private volatile int state;//初始状态private static final int NEW          = 0;//中间状态private static final int COMPLETING   = 1;//正常执行完成private static final int NORMAL       = 2;//任务报错private static final int EXCEPTIONAL  = 3;//任务取消private static final int CANCELLED    = 4;//任务中断执行中private static final int INTERRUPTING = 5;//中断完成private static final int INTERRUPTED  = 6;/** 需要执行的任务,会被赋值到这个全局变量 */private Callable<V> callable;/** 任务执行结果,也会被赋值到这个全局对象中 */private Object outcome; // non-volatile, protected by state reads/writes/** 执行任务的线程 */private volatile Thread runner;/** 等待返回结果的线程WaiteNode */private volatile WaitNode waiters;static final class WaitNode {//线程volatile Thread thread;volatile WaitNode next;//有参构造WaitNode() { thread = Thread.currentThread(); }}

有参构造及run()方法的源码解析

java">  public FutureTask(Callable<V> callable) {//健壮性校验if (callable == null)throw new NullPointerException();//给callable赋值this.callable = callable;//将当前的状态置为NEWthis.state = NEW;       // ensure visibility of callable}//run方法的执行流程public void run() {//判断state==new,并且cas将runnerOffset赋值给当前线程if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {//将全局变量赋值给局部变量Callable<V> c = callable;//健壮性校验,dclif (c != null && state == NEW) {V result;//执行成功的标志,默认falseboolean ran;try {//执行callable中的call()方法result = c.call();//将标志位设置为trueran = true;} catch (Throwable ex) {//任务执行报错,将结果置为nullresult = null;//任务执行完成设置为falseran = false;//设置报错信息setException(ex);}//任务成功执行完成if (ran)//设置结果set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}//任务报错,设置报错信息protected void setException(Throwable t) {//CAS,先将当前状态从NEW设置为COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//全局结果设置为toutcome = t;//将当前的state,设置为EXCEPTIONALUNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state//唤醒下一个节点finishCompletion();}}//任务成功执行完成protected void set(V v) {//cas 将stateOffset从new设置为completingif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//设置结果outcome = v;//设置为normalUNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state//唤醒此节点的后续节点finishCompletion();}}//唤醒下一个节点的操作private void finishCompletion() {// 设置局部变量q,并且给局部变量赋值waitersfor (WaitNode q; (q = waiters) != null;) {//将waiterOffset从q设置为nullif (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//死循环唤醒后续所有的节点for (;;) {Thread t = q.thread;if (t != null) {//help GCq.thread = null;//唤醒此节点LockSupport.unpark(t);}//获取到下一个节点WaitNode next = q.next;//如果后续节点为nullif (next == null)//跳出此节点,结束break;//去除上一个节点,help gcq.next = null; // unlink to help gc//将下一个节点,赋值给qq = next;}//结束break;}}//待实现done();//将callable置为nullcallable = null;        // to reduce footprint}

get()方法的源码分析

java">  public V get() throws InterruptedException, ExecutionException {//将局部变量设置为全局变量int s = state;//判读那状态if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}//awaitDone方法private int awaitDone(boolean timed, long nanos)throws InterruptedException {//如果有超时时间,设置超时时间final long deadline = timed ? System.nanoTime() + nanos : 0L;//声明局部变量WaitNode q = null;//声明标志位boolean queued = false;//死循环for (;;) {//线程中断if (Thread.interrupted()) {//移除当前节点removeWaiter(q);//抛出异常throw new InterruptedException();}//将全局变量赋值给局部变量int s = state;//判断当前任务是否执行if (s > COMPLETING) {//健壮性校验if (q != null)//help gcq.thread = null;//返回状态return s;}//状态为completingelse if (s == COMPLETING) // cannot time out yet//当前线程让出cpuThread.yield();//q为null,封装新的waitNodeelse if (q == null)q = new WaitNode();//第一次进来else if (!queued)// 没放队列的话,直接放到waiters的前面queued = UNSAFE.compareAndSwapObject(this, waitersOffset,//判断是否有超时限制                                      q.next = waiters, q);else if (timed) {//判断剩余超时时间nanos = deadline - System.nanoTime();//已经超时if (nanos <= 0L) {//移除节点removeWaiter(q);//返回statereturn state;}//挂起当前线程,设置超时时间LockSupport.parkNanos(this, nanos);}//如果没有超时时间限制,直接将当前线程挂起elseLockSupport.park(this);}}//封装当前的结果private V report(int s) throws ExecutionException {//将全局的返回结果,赋值给局部变量Object x = outcome;//判断是否是正常执行完结束if (s == NORMAL)//返回结果return (V)x;//非正常执行我那结束,手动取消if (s >= CANCELLED)throw new CancellationException();//抛出异常throw new ExecutionException((Throwable)x);}//移除waiterNode节点private void removeWaiter(WaitNode node) {//健壮性校验if (node != null) {//help GCnode.thread = null;retry://死循环for (;;) {          // restart on removeWaiter race//pred 前置节点 q 当前节点 s: next的节点for (WaitNode pred = null, q = waiters, s; q != null; q = s) {//赋值s=q.nexts = q.next;if (q.thread != null)//前置节点赋值pred = q;//前置节点不为nullelse if (pred != null) {//移除q节点pred.next = s;//判断pred.thread是否wieldnullif (pred.thread == null) // check for racecontinue retry;}//q节点置为selse if (!UNSAFE.compareAndSwapObject(this, waitersOffset,//失败重试                            q, s))continue retry;}break;}}}

FutureTask的cacel()方法源码分析和流程图
在这里插入图片描述
源码分析

java">  public boolean cancel(boolean mayInterruptIfRunning) {// 查看任务的状态是否是NEW,如果NEW状态,就基于传入的参数mayInterruptIfRunning// 决定任务是直接从NEW转换为CANCEL,还是从NEW转换为INTERRUPTINGif (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try {    // 如果mayInterruptIfRunning为true,需要中断线程if (mayInterruptIfRunning) {try {//将全局变量赋值给局部变量Thread t = runner;//健壮性校验if (t != null)//中断线程t.interrupt();} finally { // final state//cas将状态改为中断UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {//唤醒后续的线程finishCompletion();}//返回结果return true;}

CompletableFuture

CompletableFuture介绍

CompletableFuture 是 Java 8 引入的 异步编程工具,提供了更强大的功能来处理异步任务、组合任务、并行计算,并支持非阻塞编程。
supplyAsync() 适用于有返回值的异步任务。
runAsync() 适用于没有返回值的异步任务。
thenApply() 接收前一个任务的返回值,然后转换返回新值。
thenAccept() 只消费结果,但不返回新值。
thenRun() 不接收前面任务的返回值,只是在任务完成后执行某些操作。
thenCombine() 合并两个 CompletableFuture 的结果。
allOf() 等待所有 CompletableFuture 任务完成,但不会收集返回值。
anyOf() 只要有一个 CompletableFuture 任务完成,就返回结果。
exceptionally() 捕获异常,并提供默认值。
handle() 可同时处理成功和失败情况,更灵活。

场景应用

有返回值的场景

java">  CompletableFuture completableFuture=CompletableFuture.supplyAsync(()->{System.out.println("task1开始执行");return "abc";}).thenApply(result->{System.out.println("task1的结果:"+result+",开始执行task2");return "任务完成";});System.out.println("获取task2的返回结果:"+completableFuture.get());

无返回值的场景:

java"> CompletableFuture completableFuture=CompletableFuture.runAsync(()->{System.out.println("task1开始执行");return ;}).thenRun(()->{System.out.println("开始执行task2");return ;});

源码分析

我们源码分析,主要分析runAsync()方法和thenRun()方法
流程图
在这里插入图片描述

分析runAsync()源码
流程图:
在这里插入图片描述
源码分析

java">//传入Runnable方法public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}//异步执行static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {//健壮性校验if (f == null) throw new NullPointerException();//封装返回的结果CompletableFuture<Void> d = new CompletableFuture<Void>();//扔到线程池中执行e.execute(new AsyncRun(d, f));//返回结果return d;}//AsyncRun()方法static final class AsyncRun extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {//声明全局变量CompletableFuture<Void> dep; Runnable fn;//有参构造AsyncRun(CompletableFuture<Void> dep, Runnable fn) {//给全局变量赋值this.dep = dep; this.fn = fn;}public final Void getRawResult() { return null; }public final void setRawResult(Void v) {}public final boolean exec() { run(); return true; }//在线程池中执行此方法public void run() {//声明局部变量CompletableFuture<Void> d; Runnable f;//健壮性校验,并且给局部变量赋值if ((d = dep) != null && (f = fn) != null) {//将之前的全部变量置为null,help GCdep = null; fn = null;//判断当前任务是否执行if (d.result == null) {try {//任务未执行,此时执行run方法f.run();//封装返回结果d.completeNull();} catch (Throwable ex) {//封装报错信息d.completeThrowable(ex);}}//触发后续任务d.postComplete();}}}//后续任务源码分析final void postComplete() {	//声明局部变量CompletableFuture<?> f = this; Completion h;//进入循环,并且给h赋值,最后进行参数校验while ((h = f.stack) != null ||//任务栈被改变,需要重新办检查(f != this && (h = (f = this).stack) != null)) {//声明两个局部变量CompletableFuture<?> d; Completion t;//cas 将h节点换成h.next,给t赋值为h.nextif (f.casStack(h, t = h.next)) {//健壮性校验if (t != null) {//如果栈发生了新的改变if (f != this) {//将h节点重新压入栈中pushStack(h);//跳过continue;}// help gch.next = null;    // detach}//执行 `Completion` 任务f = (d = h.tryFire(NESTED)) == null ? this : d;}}}//执行UniRun中的tryFire方法static final class UniRun<T> extends UniCompletion<T,Void> {//声明变量Runnable fn;//有参构造src:前置任务,fn:Runnable方法UniRun(Executor executor, CompletableFuture<Void> dep,CompletableFuture<T> src, Runnable fn) {super(executor, dep, src); this.fn = fn;}final CompletableFuture<Void> tryFire(int mode) {//声明两个局部变量CompletableFuture<Void> d; CompletableFuture<T> a;//给局部变量d赋值,并且进行健壮性校验if ((d = dep) == null ||//执行失败,直接返回null!d.uniRun(a = src, fn, mode > 0 ? null : this))return null;//将变量全部赋值为null help gcdep = null; src = null; fn = null;//清理任务栈 stack,并调用 postComplete() 处理后续任务。return d.postFire(a, mode);}}//执行UniRun方法final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {//声明局部变量Object r; Throwable x;//校验前置任务信息,和执行结果if (a == null || (r = a.result) == null || f == null)return false;//当前任务还没有执行if (result == null) {//判断前置任务的执行是否报错if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)//封装错误信息completeThrowable(x, r);elsetry {//校验健壮性 && 异步执行任务if (c != null && !c.claim())return false;//执行run()方法f.run();//封装执行结果completeNull();} catch (Throwable ex) {//报错的话,也需要封装报错信息completeThrowable(ex);}}//返回结果return true;}
// 异步的线程池处理任务
final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {// 只要有线程池对象,不为nullif (e == null)return true;executor = null; // disable// 基于线程池的execute去执行任务e.execute(this);}return false;
}final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {// 如果 `a` 存在,并且 `a.stack` 还有未执行的回调任务if (a != null && a.stack != null) {if (mode < 0 || a.result == null) // mode < 0 表示异步执行a.cleanStack();  // 清理 `stack`elsea.postComplete();  // 处理 `stack` 中的任务}// 如果当前 `CompletableFuture` 已经有结果,并且 `stack` 还有未执行任务if (result != null && stack != null) {if (mode < 0)  // 如果 mode < 0,返回当前 `CompletableFuture`return this;elsepostComplete();  // 触发 `postComplete()` 继续执行}return null;
}

分析threnRun()源码

java">	//调用thenRun()方法,传入Runnablepublic CompletableFuture<Void> thenRun(Runnable action) {//调用此方法return uniRunStage(null, action);}//uniRunStage源码分析
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {// 后继任务不能为null,健壮性判断if (f == null) throw new NullPointerException();// 创建CompletableFuture对象d,与后继任务f绑定CompletableFuture<Void> d = new CompletableFuture<Void>();// 如果线程池不为null,代表异步执行,将任务压栈// 如果线程池是null,先基于uniRun尝试下,看任务能否执行if (e != null || !d.uniRun(this, f, null)) {// 如果传了线程池,这边需要走一下具体逻辑// e:线程池// d:后继任务的CompletableFuture// this:前继任务的CompletableFuture// f:后继任务UniRun<T> c = new UniRun<T>(e, d, this, f);// 将封装好的任务,push到stack栈结构// 只要前继任务没结束,这边就可以正常的将任务推到栈结构中// 放入栈中可能会失败push(c);// 无论压栈成功与否,都要尝试执行以下。c.tryFire(SYNC);}// 无论任务执行完毕与否,都要返回后继任务的CompletableFuturereturn d;
}

http://www.ppmy.cn/news/1573807.html

相关文章

在高流量下保持WordPress网站的稳定和高效运行

随着流量的不断增加&#xff0c;网站的稳定和高效运行变得越来越重要&#xff0c;特别是使用WordPress搭建的网站。流量过高时&#xff0c;网站加载可能会变慢&#xff0c;甚至崩溃&#xff0c;直接影响用户体验和网站正常运营。因此&#xff0c;我们需要采取一些有效的措施&am…

华为固态电池引发的思索

华为固态电池真牛&#xff01; 超长续航&#xff1a;单次充电即可行驶3000公里 极速充电&#xff1a;五分钟内充满80% 极致安全&#xff1a;不可燃、不漏液 长寿命设计&#xff1a;循环寿命达10000次以上 如上是华为电池展示的优势项&#xff0c;每一条都让我们心动不已。…

为什么外贸办公需要跨境专线网络?

你好&#xff0c;今天我们来聊聊SD-WAN技术在出海企业办公中的应用以及其带来的诸多优势。当今出海企业在与海外分支机构或合作伙伴开展高效的网络通讯和数据传输时&#xff0c;面临着许多挑战。此时&#xff0c;SD-WAN作为一种新兴的网络优化技术&#xff0c;正在改变这些企业…

WPS的AI助手进化跟踪(灵犀+插件)

Ver V0.0 250216: 如何给WPS安装插件用以支持其他大模型LLM V0.1 250217: WPS的灵犀AI现在是DeepSeek R1(可能是全参数671B) 前言 WPS也有内置的AI&#xff0c;叫灵犀&#xff0c;之前应是自已的LLM模型&#xff0c;只能说是属于“能用&#xff0c;有好过无”&#xff0c;所…

[uniapp] 实现扫码功能,含APP、h5、小程序

&#x1f680; 个人简介&#xff1a;某大型国企资深软件开发工程师&#xff0c;信息系统项目管理师、CSDN优质创作者、阿里云专家博主&#xff0c;华为云云享专家&#xff0c;分享前端后端相关技术与工作常见问题~ &#x1f49f; 作 者&#xff1a;码喽的自我修养&#x1f9…

【核心算法篇十三】《DeepSeek自监督学习:图像补全预训练方案》

引言:为什么自监督学习成为AI新宠? 在传统监督学习需要海量标注数据的困境下,自监督学习(Self-Supervised Learning)凭借无需人工标注的特性异军突起。想象一下,如果AI能像人类一样通过观察世界自我学习——这正是DeepSeek图像补全方案的技术哲学。根据,自监督学习通过…

如何使用 DeepSpeed-Chat 和自定义数据集训练类 ChatGPT 模型

如果你想使用自己的数据集进行训练&#xff0c;可以按照以下步骤操作&#xff1a; 1. 数据集格式要求 DeepSpeed-Chat 的数据集需要符合特定的格式。每个数据项应该是一个 JSON 对象&#xff0c;包含以下字段&#xff1a; JSON复制 {"prompt": "Human: 你的…

zookeeper有序临时结点实现公平锁的实践例子

目录 实践例子1. 先创建一个持久结点2. 创建一个结点监听程序3. 锁程序4. 测试和输出截图测试说明 回顾zkNode类型zookeeper分布式锁的优缺点 实践例子 1. 先创建一个持久结点 ./bin/zkServer.sh start conf/zoo_local.cfg ./bin/zkCli.sh -server 127.0.0.1:21812. 创建一个…