并发编程(六)—AbstractExecutorService源码分析

news/2024/10/25 11:32:17/

一、AbstractExecutorService简介


AbstractExecutorService是一个抽象类,实现了ExecutorService接口,提供了线程池的基本实现。它是Java Executor框架的核心类,提供了线程池的基本操作,如提交任务、管理线程池、执行任务等。

自定义执行器服务可用扩展AbreactExecutorService 并覆盖其方法以提供自己的实现。这使开发人员可以创建适合其特定需求的执行器服务。

二、AbstractExecutorService如何使用


AbstractExecutorService是一个抽象类,不能直接实例化,需要通过继承它的子类ThreadPoolExecutorScheduledThreadPoolExecutor来使用。使用AbstractExecutorService创建线程池的步骤如下:

2.1 没有返回值

// 1、创建ThreadPoolExecutor  创建一个固定大小的线程池,核心线程数为1,最大线程数为5,任务队列大小为10
ExecutorService executorService = new ThreadPoolExecutor(1, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10));
// 2、提交任务
executorService.submit(new Runnable() {@Overridepublic void run() {//执行任务}
});
// 3、关闭线程池
executorService.shutdown();

2.2 有返回值

// 1、创建ThreadPoolExecutor  创建一个固定大小的线程池,核心线程数为1,最大线程数为5,任务队列大小为10
ExecutorService executorService = new ThreadPoolExecutor(1, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10));
// 2、提交一个Callable任务
Future<String> future = executorService.submit(new Callable<String>() {@Overridepublic String call() throws Exception {//执行任务,返回结果return "hello ExecutorService";}
});
// 3、获取值
String result = future.get();
// 4、关闭线程池
executorService.shutdown();

总之,使用AbstractExecutorService创建线程池非常简单,只需要创建线程池对象、提交任务、关闭线程池即可。同时,AbstractExecutorService也提供了一些高级功能,如异步任务、定时任务等,可以根据具体需求选择使用。

三、如何自定义AbstractExecutorService


要自定义一个延迟AbstractExecutorService,以下步骤:

  1. 继承AbstractExecutorService类

  1. 实现 submit 方法 覆盖 submit 方法,将任务添加到队列中,其中在任务执行前应用延迟。

  1. 实现 execute 方法 覆盖 execute 方法,调用 submit 方法并返回 Future 对象,等待任务完成。

  1. 实现shutdown 方法 覆盖 shutdown 方法,停止所有任务并等待它们完成。

  1. 实现 DelayedTask 类 创建一个 DelayedTask 类,用于封装任务和延迟时间。

  1. 实现 RunnableTask 和 CallableTask 类 创建 RunnableTask 和 CallableTask 类,用于封装任务。

public class CustomDelayedExecutorService extends AbstractExecutorService {private final DelayQueue<DelayedTask<?>> queue = new DelayQueue<>();@Overridepublic <T> Future<T> submit(Callable<T> task) {return schedule(new CallableTask<>(task));}@Overridepublic Future<?> submit(Runnable task) {return schedule(new RunnableTask(task));}private <T> Future<T> schedule(DelayedTask<T> task) {queue.offer(task);return task;}@Overridepublic void execute(Runnable command) {submit(command);}@Overridepublic void shutdown() {for (DelayedTask<?> task : queue) {task.cancel(false);}}@Overridepublic List<Runnable> shutdownNow() {List<Runnable> tasks = new ArrayList<>();for (DelayedTask<?> task : queue) {if (task.cancel()) {tasks.add(task.getTask());}}return tasks;}@Overridepublic boolean isShutdown() {return false;}@Overridepublic boolean isTerminated() {return false;}@Overridepublic boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {return false;}
​
}
class DelayedTask<T> implements RunnableFuture<T>, Delayed {private final long delay;private final long expire;private final RunnableFuture<T> task;public DelayedTask(RunnableFuture<T> task, long delay) {this.task = task;this.delay = delay;this.expire = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, TimeUnit.MILLISECONDS);}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(expire - System.nanoTime(), TimeUnit.NANOSECONDS);}@Overridepublic int compareTo(Delayed o) {return Long.compare(expire, ((DelayedTask<?>) o).expire);}@Overridepublic void run() {if (!isCancelled()) {task.run();}}@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return task.cancel(mayInterruptIfRunning);}@Overridepublic boolean isCancelled() {return task.isCancelled();}@Overridepublic boolean isDone() {return task.isDone();}@Overridepublic T get() throws InterruptedException, ExecutionException {return task.get();}@Overridepublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return task.get(timeout, unit);}public RunnableFuture<T> getTask() {return task;}
}
​
class RunnableTask implements RunnableFuture<Void> {private final Runnable task;public RunnableTask(Runnable task) {this.task = task;}@Overridepublic void run() {task.run();}@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return false;}@Overridepublic boolean isCancelled() {return false;}@Overridepublic boolean isDone() {return true;}@Overridepublic Void get() throws InterruptedException, ExecutionException {return null;}@Overridepublic Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return null;}
}
class CallableTask<T> extends FutureTask<T> {public CallableTask(Callable<T> callable) {super(callable);}
}

四、AbstractExecutorService源码分析


4.1 AbstractExecutorService dragrams

4.2 方法分析

4.2.1 newTaskFor(Runnable runnable, T value) 方法

AbstractExecutorService中的newTaskFor方法,用于创建一个RunnableFuture对象,其中RunnableFuture是Future和Runnable接口的结合体,可以用来表示异步执行的结果。该方法的参数包括一个Runnable对象和一个泛型T的值。通过传入的Runnable对象和T类型的值,创建了一个FutureTask对象,并将其返回。

FutureTask是RunnableFuture的一个具体实现,它表示一个可取消的异步计算,可以通过调用get方法获取计算结果。在创建FutureTask对象时,需要传入一个Callable对象或Runnable对象,用于执行异步计算。而在这里,我们传入了一个Runnable对象和一个T类型的值,表示该Runnable对象的执行结果为T类型的值。

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {// 将runnable包装成FutureTask类型return new FutureTask<T>(runnable, value);}

4.2.2 submit(Runnable task)方法

AbstractExecutorService中的submit方法是ExecutorService接口中的一个方法,用于提交一个Runnable任务,并返回一个表示该任务异步执行结果的Future对象。

  1. 如果传入的task为null,则抛出NullPointerException异常;

  1. 调用newTaskFor(Runnable runnable, T value)方法创建一个RunnableFuture对象ftask,其中RunnableFuture是Future和Runnable接口的结合体,用于表示异步执行的结果。在该方法中,我们将泛型T的值设置为null,表示该Runnable任务没有返回值;

  1. 通过调用execute(Runnable command)方法提交任务进行执行,该方法是一个抽象方法,需要由子类实现。在该方法中,我们将创建的ftask对象作为参数传入,表示需要执行的任务为ftask对象。

  1. 最后,将ftask对象返回,作为Future对象,用于获取任务执行结果或取消任务。

public Future<?> submit(Runnable task) {// 如果任务为null,则抛出nullPointerExceptionif (task == null) throw new NullPointerException();// 将task包装成RunnableFutureRunnableFuture<Void> ftask = newTaskFor(task, null);// 通过调用execute(Runnable command)方法提交任务进行执行execute(ftask);// 返回RunnableFuturereturn ftask;}

4.2.3 doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)方法

AbstractExecutorService中doInvokeAny方法用于执行一个任务集合中的所有任务并返回第一个成功执行的任务的结果,如果所有任务都执行失败则抛出ExecutionException异常。其中,方法参数tasks为任务集合,timed和nanos用于指定超时时间,如果超时则抛出TimeoutException异常。

  1. 如果任务集合是否为空,则抛出NullPointerException异常

  1. 如果任务集合的大小为0,则抛出IllegalArgumentException异常

  1. futures用于存储所有任务的Future对象, ecs对象用于异步执行任务集合中的任务。

  1. 从迭代器中获取一个任务xecutorCompletionService,并将该任务的执行结果的Future添加到futures列表中

  1. 死循环不断从ExecutorCompletionService中获取已完成的任务的结果Future,直到有一个任务完成为止,具体细流程如下:

  • 调用ExecutorCompletionService的poll()方法获取一个已完成的任务的结果Future,如果poll()返回null,表示当前没有已完成的任务。

  • 当前没有已完成的任务

  • 还有未执行的任务(ntasks > 0),则从任务集合中获取下一个任务,并将其提交给ExecutorCompletionService执行。提交成功后,活跃线程数active加一;

  • 没有执行的任务,并且活跃线程数为0(即所有任务都已执行完毕),则退出循环;

  • 指定超时时间(timed为true),则调用poll(nanos, TimeUnit.NANOSECONDS)方法获取已完成的任务的结果Future,并等待最多nanos纳秒的时间。如果超时则抛出TimeoutException;

  • 没有指定超时时间,则调用take()方法阻塞等待已完成的任务的结果Future。

  • 如果获取到了已完成的任务的结果Future,则将活跃线程数active减一,并尝试获取该任务的执行结果。

  • 如果获取结果时出现异常,则将异常保存在ExecutionException中,继续下一轮循环;

  • 如果成功获取到了一个任务的执行结果,则直接返回该结果。

  1. 如果所有任务都已经完成,但是没有找到任何一个成功的任务,则抛出ExecutionException异常。

  1. 最后,取消所有尚未完成的任务,以便节省资源并提高效率

private<T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {// 如果任务集合是否为空,则抛出NullPointerException异常if (tasks == null)throw new NullPointerException();int ntasks = tasks.size();// 如果任务集合的大小为0,则抛出IllegalArgumentException异常if (ntasks == 0)throw new IllegalArgumentException();// futures用于存储所有任务的Future对象   ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);ExecutorCompletionService<T> ecs =new ExecutorCompletionService<T>(this);try {ExecutionException ee = null;// 计算deadline时间final long deadline = timed ? System.nanoTime() + nanos : 0L;Iterator<? extends Callable<T>> it = tasks.iterator();// 从迭代器中获取一个任务xecutorCompletionService,并将该任务的执行结果的Future添加到futures列表中。futures.add(ecs.submit(it.next()));// 任务数减1--ntasks;// 退出循环标识(也就是调用ecs.sumbit的任务数)int active = 1;// 自旋、死循环for (;;) {// 获取Future对象Future<T> f = ecs.poll();// 判断future 任务是否已完成if (f == null) {// 任务没有完成,继续判断是否还有未执行的任务if (ntasks > 0) {// 任务s数减1--ntasks;// 再从tasks中获取一个任务,提交到ExecutorCompletionService中进行执行。futures.add(ecs.submit(it.next()));// 提交ecs中加1++active;}else if (active == 0)break;// 有超时时间限制    else if (timed) {//调用ExecutorCompletionService的poll(long timeout, TimeUnit unit)方法等待指定的超时时间。f = ecs.poll(nanos, TimeUnit.NANOSECONDS);// 在等待过程中已经超过了指定的超时时间,因此会抛出 TimeoutException 异常if (f == null)throw new TimeoutException();// 将剩余的超时时间重新计算,并继续执行后续的代码。nanos = deadline - System.nanoTime();}// 如果没有超时时间限制,则调用ExecutorCompletionService的take()方法一直等待,直到有任务执行完成。elsef = ecs.take(); }// 任务已完成(获取到Future对象)if (f != null) {--active;try {// 尝试获取Future对象的执行结果,return f.get();} catch (ExecutionException eex) {// 如果获取失败,则将异常保存在ee变量中,继续等待下一个任务的执行结果ee = eex;} catch (RuntimeException rex) {ee = new ExecutionException(rex);}}}// 如果所有任务都已经完成,但是没有找到任何一个成功的任务,则抛出ExecutionException异常。if (ee == null)ee = new ExecutionException();throw ee;} finally {// 取消所有尚未完成的任务,以便节省资源并提高效率for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);}}

4.2.4 invokeAll(Collection<? extends Callable<T>> tasks)方法

AbstractExecutorService中的invokeAll方法,它的作用是在执行给定的任务集合tasks中的所有任务,并等待所有任务完成后返回一个包含Future对象的列表,Future对象可以用来获取每个任务的执行结果。如果其中某个任务抛出异常,则该异常将传播到调用者。

  • 判断任务是否为空,如果空,则抛出NullPointerException异常;

  • 遍历参数tasks中的每个Callable任务,将每个任务转换成一个RunnableFuture对象,并把任务添加到futures中,然后调用execute方法执行任务。

  • 循环遍历futures中的每个Future对象,如果某个Future对象的任务还没有完成,则调用get()方法等待任务完成,如果任务抛出异常,则忽略异常。

  • 如果所有的任务都成功完成,则将done标记为true,并返回futures;

  • 否则,取消所有未完成的任务,并抛出InterruptedException异常。

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {// 判断任务是否为空,如果空,则抛出NullPointerException异常;if (tasks == null)throw new NullPointerException();ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());boolean done = false;try {// 遍历参数tasks中的每个Callable任务for (Callable<T> t : tasks) {// 将任务转换成一个RunnableFuture对象RunnableFuture<T> f = newTaskFor(t);futures.add(f);// 调用execute方法执行任务execute(f);}// 循环遍历futures中的每个Future对象for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);// 如果某个Future对象的任务还没有完成if (!f.isDone()) {try {// 调用get()方法等待任务完成,如果任务抛出异常,则忽略异常。f.get();} catch (CancellationException ignore) {} catch (ExecutionException ignore) {}}}// 如果所有的任务都成功完成,则将done标记为true,并返回futures;done = true;return futures;} finally {if (!done)// 如果所有的任务没有全部成功完成,取消所有未完成的任务for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);}
}

http://www.ppmy.cn/news/34940.html

相关文章

四平方和题解(二分习题)

四平方和 暴力做法 Y总暴力做法&#xff0c;蓝桥云里能通过所有数据 总结&#xff1a;暴力也分好坏&#xff0c;下面这份代码就是写的好的暴力 如何写好暴力:1. 按组合枚举 2. 写好循环结束条件&#xff0c;没必要循环那么多次 #include<iostream> #include<cmath>…

求最大公约数和最小公倍数,附python实现

1、求最大公约数 1&#xff09;采用辗转相除法 例如&#xff0c;需要求63和117的最大公约数 [117]1∗[63]54[63]1∗[54]9[54]6∗[9]0[117] 1*[63]54\\ [63] 1*[54]9\\ [54] 6*[9]0 [117]1∗[63]54[63]1∗[54]9[54]6∗[9]0 可知&#xff0c;最大公约数为9 验证&#xff1a…

图神经网络(GCN)

一、GCN的起源 曾经深度学习一直都是被几大经典模型给统治着&#xff0c;如CNN、RNN等等&#xff0c;它们无论再CV还是NLP领域都取得了优异的效果。 但是对于图结构的数据&#xff0c;无论是CNN还是RNN都无法解决或者效果不好。 &#xff08;1&#xff09;CV中的CNN&#xff1…

9大 HIVE SQL 最频繁被问到的面试题

SQL是用于数据分析和数据处理的最重要的编程语言之一&#xff0c;因此与数据科学相关的工作&#xff08;例如数据分析师、数据科学家和数据工程师&#xff09;在面试时总会问到关于 SQL 的问题。 SQL面试问题旨在评估应聘者的技术和解决问题的能力。因此对于应聘者来说&#x…

每个开发人员都需要掌握的10 个基本 SQL 命令

SQL 是一种非常常见但功能强大的工具&#xff0c;它可以帮助从任何数据库中提取、转换和加载数据。数据查询的本质在于SQL。随着公司和组织发现自己处理的数据量迅速增加&#xff0c;开发人员越来越需要有效地使用数据库来处理这些数据。所以想要暗恋数据领域&#xff0c;SQL是…

TCP和UDP协议的区别?

是否面向连接&#xff1a; TCP 是面向连接的传输&#xff0c;UDP 是面向无连接的传输。 是否是可靠传输&#xff1a;TCP是可靠的传输服务&#xff0c;在传递数据之前&#xff0c;会有三次握手来建立连接&#xff1b;在数据传递时&#xff0c;有确认、窗口、重传、拥塞控制机制…

验证码——vue中后端返回的图片流如何显示

目录 前言 一、p调用接口获取验证码 canvas画布渲染&#xff1f; 二、后端返回图片&#xff08;图片流&#xff09;&#xff0c;前端显示 1.blob 2.arraybuffer 总结 前言 登录界面经常会有验证码&#xff0c;验证码的实现方式也有很多&#xff0c;我目前做过以下两种&…

Android开发-Android UI与布局

01 Android UI 1.1 UI 用户界面(User Interface&#xff0c;简称 UI&#xff0c;亦称使用者界面)是系统和用户之间进行交互和信息交换的媒介&#xff0c;它实现信息的内部形式与人类可以接受形式之间的转换。软件设计可分为两个部分&#xff1a;编码设计与UI设计。 1.2 Andr…