JAVA并发编程工具篇--1.1理解Future获取线程执行结果

news/2024/11/9 0:05:18/

背景:在并发编程中,我们可以使用Future来获取子线程执行的结果,然后在主线程一起进行业务处理; 那么Future是如何来工作的;

1 使用:
demo1:使用Future每次都阻塞获取任务的执行结果:

public static void main(String[] args) {// 声明线程池ExecutorService executorService = Executors.newFixedThreadPool(1);// 声明 CallableCallable<Map<String, Object>> commonUseQuatoCall = () -> testGetFutureMap("param");// 开启一个线程进行业务处理Future<Map<String, Object>> submitcommonCall = executorService.submit(commonUseQuatoCall);Map<String, Object> commonUseQuatoData = null;try {// 阻塞获取结果commonUseQuatoData = submitcommonCall.get(50000, TimeUnit.MILLISECONDS);}catch (Exception ex){}finally {// 最后关闭线程池executorService.shutdown();}if (null != commonUseQuatoData){/*** do some thing*/}}// 业务处理private static Map<String, Object> testGetFutureMap(String param) {// 处理业务逻辑Map<String, Object>  mapData = new HashMap<>();/*** do some thing*/mapData.put("flag","sucess");return mapData;}

demo2:使用ExecutorCompletionService 优先处理返回结果最快的任务:

public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {ExecutorService executorService = Executors.newFixedThreadPool(2);ExecutorCompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<>(executorService);completionService.submit(() -> {return methodA();});completionService.submit(() -> {return methodB();});for (int i = 0; i < 2; i++) {// 获得结果并处理try {Map<String, Object> oneMapResult = completionService.take().get(5,TimeUnit.SECONDS);if ("methodAResult".equalsIgnoreCase(oneMapResult.get("type").toString())) {// 方法A 返回的结果System.out.println("\"methodAResult\" = " + "methodAResult");}if ("methodBResult".equalsIgnoreCase(oneMapResult.get("type").toString())) {// 方法B 返回的结果System.out.println("\"methodBResult\" = " + "methodBResult");}}catch (Exception ex){ex.printStackTrace();}}System.out.println("\"finish\" = " + "finish");executorService.shutdown();}private static Map<String, Object> methodB() throws InterruptedException {Map<String, Object> mapData = new HashMap<>(3);Object data = null;/*** 业务处理* data = xxx;*/Thread.sleep(10000);// 返回结果mapData.put("type", "methodBResult");mapData.put("data", data);return mapData;}private static Map<String, Object> methodA() throws InterruptedException {Map<String, Object> mapData = new HashMap<>(3);Object data = null;/*** 业务处理* data = xxx;*/Thread.sleep(20000);// 返回结果mapData.put("type", "methodAResult");mapData.put("data", data);return mapData;}

2 工作过程:
2.1 封装线程:
声明线程池

ExecutorService executorService = Executors.newFixedThreadPool(1);
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}

2.1.1 Future 提交任务:

Callable<Map<String, Object>> commonUseQuatoCall = () -> testGetFutureMap("param");
Future<Map<String, Object>> submitcommonCall = executorService.submit(commonUseQuatoCall);

调用 AbstractExecutorService.submit(Callable task)

/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException       {@inheritDoc}*/
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();// 回调为空抛出异常RunnableFuture<T> ftask = newTaskFor(task);// 包装回调execute(ftask);// 开启线程执行ftask 的任务return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
}
/*** Creates a {@code FutureTask} that will, upon running, execute the* given {@code Callable}.** @param  callable the callable task* @throws NullPointerException if the callable is null*/public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;       // ensure visibility of callable}
private static final int NEW   = 0;

2.1.2 ExecutorCompletionService 提交任务:

 ExecutorCompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<>(executorService);completionService.submit(() -> {return methodA();});

调用 ExecutorCompletionService.submit:

public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);// 使用 QueueingFuture executor.execute(new QueueingFuture(f));return f;}
private RunnableFuture<V> newTaskFor(Callable<V> task) {if (aes == null)return new FutureTask<V>(task);elsereturn aes.newTaskFor(task);}
private final BlockingQueue<Future<V>> completionQueue;
private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}// 在执行任务获取获取结果后调用protected void done() { completionQueue.add(task); }private final Future<V> task;}

2.2 线程执行:
ThreadPoolExecutor. execute(Runnable command):

 public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {// 当前工作的线程数小于声明的核心现车数if (addWorker(command, true))// 添加任务return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {// 将任务添加到 BlockingQueue 队列中int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}

addWorker(command, true):

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();// 线程执行workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

2.3 线程执行的结果数据填充:

FutureTask.run():

public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {// 调用目标方法并阻塞获的获取执行结果result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)// 获取结果后设置结果set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;// 赋值结果UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}
private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null;        // to reduce footprint}

done() 方法 :将结果放入到 ExecutorCompletionService 下BlockingQueue<Future> completionQueue 中

private final BlockingQueue<Future<V>> completionQueue;
private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}// 在执行任务获取获取结果后调用protected void done() { completionQueue.add(task); }private final Future<V> task;}

2.4 获取线程执行结果:
2.4.1 future 获取结果

/**
* @throws CancellationException {@inheritDoc}*/
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}
/*** Returns result or throws exception for completed task.** @param s completed state value*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);
}

2.4.2 ExecutorCompletionService 获取结果:

 Map<String, Object> oneMapResult = completionService.take().get(5,TimeUnit.SECONDS);

ExecutorCompletionService 下take() 方法:

public Future<V> take() throws InterruptedException {return completionQueue.take();
}
// 获取FutureTask的结果
/*** @throws CancellationException {@inheritDoc}*/
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}/*** @throws CancellationException {@inheritDoc}*/
public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);
}
private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);
}

3 过程总结:
3.1 线程异步任务的执行,通过Callable 封装目标方法,通过FutureTask 发起线程完成任务的执行;执行完成将结果放入到FutureTask 的outcome中;再通过FutureTask获取线程异步执行的结果;
3.2 ExecutorCompletionService 通过将线程返回的结果放入到一个队列中,然后在从队列中获取到结果,使用ExecutorCompletionService时,需要注意每次从队列中获取结果后,将改结果从队列中移除,否则改队列中元素的容量会越来越大;


http://www.ppmy.cn/news/9900.html

相关文章

android 换肤框架搭建及使用 (3 完结篇)

本系列计划3篇: Android 换肤之资源(Resources)加载(一)setContentView() / LayoutInflater源码分析(二)换肤框架搭建(三) — 本篇 tips: 本篇只说实现思路,以及使用,具体细节请下载代码查看! 本篇实现效果: fragment换肤recyclerView换肤自定义view属性换肤打开打开打开动…

Java Bean Validation

JSR303 是一套JavaBean参数校验的标准&#xff0c;它定义了很多常用的校验注解&#xff0c;我们可以直接将这些注解加在我们JavaBean的属性上面&#xff0c;就可以在需要校验的时候进行校验了。校验框架注解如下&#xff1a; 注解解释Null被注释的元素必须为nullNotNull被注释…

成为有钱人的终极秘诀:做到这7步,你也可以成为富人!

经常有人问&#xff1a;互联网有什么快速赚钱的方法?大多数人内心浮躁&#xff0c;总想以最快的方式搞到钱。因为浮躁&#xff0c;所以沉不下心来去搞钱。做一个项目赚不到钱&#xff0c;然后又开始找项目&#xff0c;换项目&#xff0c;做项目&#xff0c;一直恶性循环中。最…

whistle的使用【前端抓包】

前言 抓包工具看起来只是测试要用的东西&#xff0c;其实对前端作用也很多&#xff0c;因为我们也要模拟请求、mock数据、调试。站在巨人肩膀上永远不亏! whistle能解决的痛点 一、看请求不方便 跳页、支付时候上一页的请求结果看不到&#xff0c;h5、小程序newWork不能在电…

SQL用法详解

1.SQL语言是什么?有什么作用?SQL:结构化查询语言&#xff0c;用于操作数据库&#xff0c;通用于绝大多数的数据库软件2.SQL的特征大小写不敏感需以;号结尾支持单行、多行注释3操作数据库的SQL语言基于功能可以划分为4类:数据定义:DDL ( Data Definition Language)&#xff1a…

GO——函数(一)

函数函数声明多返回值错误错误处理策略文件结尾错误(EOF)函数值函数声明 函数声明包括函数名、形式参数列表、返回值列表&#xff08;可省略&#xff09;以及函数体。 func name(parameter-list) (result-list) {body }返回值也可以像形式参数一样被命名。在这种情况下&#…

[VP]河南第十三届ICPC大学生程序竞赛 L.手动计算

前言 传送门 : 题意 : 给定两个椭圆 的方程 , 要求 求出椭圆并集的面积之和 思路 : 本题很显然是积分 或者 计算几何的问题 对于积分的做法, 无非就是根据积分公式求出第一象限的面积 之后拓展到后面四个象限。(奈何我懒, 连两个椭圆的焦点都不想求更别提后面的积分公式了)…

舆情监测技术方案,网络舆情分析技术手段有哪些?

网络舆情分析技术手段着力于利用技术实现对海量的网络舆情信息进行深度挖掘与分析&#xff0c;以快速汇总成舆情信息&#xff0c;从而代替人工阅读和分析网络舆情信息的繁复工作&#xff0c;接下来TOOM舆情监测小编带您简单了解舆情监测技术方案&#xff0c;网络舆情分析技术手…