背景:在并发编程中,我们可以使用Future来获取子线程执行的结果,然后在主线程一起进行业务处理; 那么Future是如何来工作的;
1 使用:
demo1:使用Future每次都阻塞获取任务的执行结果:
public static void main(String[] args) {// 声明线程池ExecutorService executorService = Executors.newFixedThreadPool(1);// 声明 CallableCallable<Map<String, Object>> commonUseQuatoCall = () -> testGetFutureMap("param");// 开启一个线程进行业务处理Future<Map<String, Object>> submitcommonCall = executorService.submit(commonUseQuatoCall);Map<String, Object> commonUseQuatoData = null;try {// 阻塞获取结果commonUseQuatoData = submitcommonCall.get(50000, TimeUnit.MILLISECONDS);}catch (Exception ex){}finally {// 最后关闭线程池executorService.shutdown();}if (null != commonUseQuatoData){/*** do some thing*/}}// 业务处理private static Map<String, Object> testGetFutureMap(String param) {// 处理业务逻辑Map<String, Object> mapData = new HashMap<>();/*** do some thing*/mapData.put("flag","sucess");return mapData;}
demo2:使用ExecutorCompletionService 优先处理返回结果最快的任务:
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {ExecutorService executorService = Executors.newFixedThreadPool(2);ExecutorCompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<>(executorService);completionService.submit(() -> {return methodA();});completionService.submit(() -> {return methodB();});for (int i = 0; i < 2; i++) {// 获得结果并处理try {Map<String, Object> oneMapResult = completionService.take().get(5,TimeUnit.SECONDS);if ("methodAResult".equalsIgnoreCase(oneMapResult.get("type").toString())) {// 方法A 返回的结果System.out.println("\"methodAResult\" = " + "methodAResult");}if ("methodBResult".equalsIgnoreCase(oneMapResult.get("type").toString())) {// 方法B 返回的结果System.out.println("\"methodBResult\" = " + "methodBResult");}}catch (Exception ex){ex.printStackTrace();}}System.out.println("\"finish\" = " + "finish");executorService.shutdown();}private static Map<String, Object> methodB() throws InterruptedException {Map<String, Object> mapData = new HashMap<>(3);Object data = null;/*** 业务处理* data = xxx;*/Thread.sleep(10000);// 返回结果mapData.put("type", "methodBResult");mapData.put("data", data);return mapData;}private static Map<String, Object> methodA() throws InterruptedException {Map<String, Object> mapData = new HashMap<>(3);Object data = null;/*** 业务处理* data = xxx;*/Thread.sleep(20000);// 返回结果mapData.put("type", "methodAResult");mapData.put("data", data);return mapData;}
2 工作过程:
2.1 封装线程:
声明线程池
ExecutorService executorService = Executors.newFixedThreadPool(1);
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
2.1.1 Future 提交任务:
Callable<Map<String, Object>> commonUseQuatoCall = () -> testGetFutureMap("param");
Future<Map<String, Object>> submitcommonCall = executorService.submit(commonUseQuatoCall);
调用 AbstractExecutorService.submit(Callable task)
/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();// 回调为空抛出异常RunnableFuture<T> ftask = newTaskFor(task);// 包装回调execute(ftask);// 开启线程执行ftask 的任务return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
}
/*** Creates a {@code FutureTask} that will, upon running, execute the* given {@code Callable}.** @param callable the callable task* @throws NullPointerException if the callable is null*/public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // ensure visibility of callable}
private static final int NEW = 0;
2.1.2 ExecutorCompletionService 提交任务:
ExecutorCompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<>(executorService);completionService.submit(() -> {return methodA();});
调用 ExecutorCompletionService.submit:
public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);// 使用 QueueingFuture executor.execute(new QueueingFuture(f));return f;}
private RunnableFuture<V> newTaskFor(Callable<V> task) {if (aes == null)return new FutureTask<V>(task);elsereturn aes.newTaskFor(task);}
private final BlockingQueue<Future<V>> completionQueue;
private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}// 在执行任务获取获取结果后调用protected void done() { completionQueue.add(task); }private final Future<V> task;}
2.2 线程执行:
ThreadPoolExecutor. execute(Runnable command):
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {// 当前工作的线程数小于声明的核心现车数if (addWorker(command, true))// 添加任务return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {// 将任务添加到 BlockingQueue 队列中int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
addWorker(command, true):
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();// 线程执行workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
2.3 线程执行的结果数据填充:
FutureTask.run():
public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {// 调用目标方法并阻塞获的获取执行结果result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)// 获取结果后设置结果set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;// 赋值结果UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}
private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null; // to reduce footprint}
done() 方法 :将结果放入到 ExecutorCompletionService 下BlockingQueue<Future> completionQueue 中
private final BlockingQueue<Future<V>> completionQueue;
private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}// 在执行任务获取获取结果后调用protected void done() { completionQueue.add(task); }private final Future<V> task;}
2.4 获取线程执行结果:
2.4.1 future 获取结果
/**
* @throws CancellationException {@inheritDoc}*/
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}
/*** Returns result or throws exception for completed task.** @param s completed state value*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);
}
2.4.2 ExecutorCompletionService 获取结果:
Map<String, Object> oneMapResult = completionService.take().get(5,TimeUnit.SECONDS);
ExecutorCompletionService 下take() 方法:
public Future<V> take() throws InterruptedException {return completionQueue.take();
}
// 获取FutureTask的结果
/*** @throws CancellationException {@inheritDoc}*/
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}/*** @throws CancellationException {@inheritDoc}*/
public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);
}
private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);
}
3 过程总结:
3.1 线程异步任务的执行,通过Callable 封装目标方法,通过FutureTask 发起线程完成任务的执行;执行完成将结果放入到FutureTask 的outcome中;再通过FutureTask获取线程异步执行的结果;
3.2 ExecutorCompletionService 通过将线程返回的结果放入到一个队列中,然后在从队列中获取到结果,使用ExecutorCompletionService时,需要注意每次从队列中获取结果后,将改结果从队列中移除,否则改队列中元素的容量会越来越大;