线程池原理(一)

embedded/2024/10/20 19:20:09/

一、常用线程池体系结构图如下:

        

        由上边的体系图可以知道,要想了解线程池 ThreadPoolExecutor 的实现原理,则需要先

        了解下 Executor、ExecutorService、AbstractExecutorService 的实现,下面就分别看下

         这3个类的实现

二、Executor

       Executor 是线程池体系中的顶层接口,在Executor 中 定义了一个执行无返回值任务的方法;

       Executor 定义如下: 

java">public interface Executor {/*** Executor 是线程池顶级接口,定义了一个执行无返回值任务的方法** execute()方法是一个执行器* 执行无返回值任务* 根据Executor 的实现判断,执行器可能是在新线程、线程池或线程调用中执行*/void execute(Runnable command);
}

三、ExecutorService

       ExecutorService 接口继承了 Executor,ExecutorService主要定义了线程池的一些基本操作;

       在平时开发中 ExecutorService 也指代一个线程池,ExecutorService的每个实现子类都表示

       一个具体的线程池。

      ExecutorService 定义如下:

java">public interface ExecutorService extends Executor {//关闭线程池(但不会立即关闭线程池),不会接收新的任务,但已经提交的任务会等待执行完成void shutdown();//立即关闭线程池,尝试停止正在执行的任务,未执行的任务将不在执行,//被迫停止的任务以及未执行的任务以列表的形式返回List<Runnable> shutdownNow();//检查线程池是否已经关闭boolean isShutdown();//检查线程池是否已经终止,只有在 shutdown()或 shutdownNow() 方法调用之后执行该方法//才有可能返回trueboolean isTerminated();//线程池在指定的时间内达到终止状态才会返回trueboolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;//执行有返回值的任务,任务的返回结果是task.call() 的执行结果,//执行结果保存在 Future 中<T> Future<T> submit(Callable<T> task);//执行有返回值的任务,任务的返回值保存在传入的result 中,//当然只有任务完成了调用get()之后才会返回<T> Future<T> submit(Runnable task, T result);//执行有返回值的任务,任务的返回值位null//当然只有任务完成了调用get()之后才会返回Future<?> submit(Runnable task);//批量执行任务,只有当所有的任务都执行完成了后,这个方法才会返回<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;//在指定的时间内批量执行任务,在指定时间内没有执行完成的任务将被取消//这里的timeout 是所有的任务时间,而不是单个任务时间<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;//返回任意一个已经完成的任务的执行结果,未执行完成的任务将被取消<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;//在指定的时间内,如果有任务完成了,则任意返回一个任务已经完成的任务结果,//未执行完成的任务将被取消<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

四、AbstractExecutorService

       AbstractExecutorService 是接口 ExecutorService 的抽象实现类,主要实现了

       ExecutorService中定义的方法;

      AbstractExecutorService 提供了线程池基本实现,如:提交任务、管理线程池、执行任务等

      AbstractExecutorService 是一个抽象类,不能直接使用,需要通过它的各个子类来使用

      AbstractExecutorService 代码如下:

java">public abstract class AbstractExecutorService implements ExecutorService {/** 包装任务,并返回一个FutureTask 对象*/protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {/*** RunnableFuture 是接口 Future 和Runnable 的组合,用来表示异步执行的结果* FutureTask是RunnableFuture的一个具体实现,它表示一个可取消的异步计算,可以通过调用get方法获取计算结果。* 在创建FutureTask对象时,需要传入一个Callable对象或Runnable对象,用于执行异步计算*/return new FutureTask<T>(runnable, value);}/*** * 将 Callable 包装成一个 RunnableFuture*/protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}/** 基础模板方法:* 提交任务*/public Future<?> submit(Runnable task) {//第一步:判空if (task == null) throw new NullPointerException();//包装taskRunnableFuture<Void> ftask = newTaskFor(task, null);//执行task,execute() 方法由子类实现execute(ftask);//返回执行结果return ftask;}/*** 提交有返回值的任务 Runnable*/public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}/***提交有返回值的任务 Callable*/public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}/*** invokeAny 方法主要实现部分*/private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {//判空if (tasks == null)throw new NullPointerException();//任务数int ntasks = tasks.size();if (ntasks == 0)throw new IllegalArgumentException();ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);/****/ExecutorCompletionService<T> ecs =new ExecutorCompletionService<T>(this);try {ExecutionException ee = null;final long deadline = timed ? System.nanoTime() + nanos : 0L;Iterator<? extends Callable<T>> it = tasks.iterator();// 确定地开始一项任务;其余的增量futures.add(ecs.submit(it.next()));//提交一个任务,将任务提交到 ExecutorCompletionService--ntasks;//任务数减1,每提交一个任务,ntasks就减1,int active = 1;//活动的任务数,每提交一个任务(即开启一个任务),则表示活动的任务数加1for (;;) {Future<T> f = ecs.poll();if (f == null) {//表示任务还没执行完成if (ntasks > 0) {//表示还有任务没提交--ntasks;futures.add(ecs.submit(it.next()));++active;}else if (active == 0) //表示任务全部执行完成,则直接退出break;else if (timed) {f = ecs.poll(nanos, TimeUnit.NANOSECONDS);//若任务还在执行,但超时了,则直接抛出超时异常if (f == null)throw new TimeoutException();nanos = deadline - System.nanoTime();}else//获取队列头部的任务(即当前最先执行的任务)执行结果f = ecs.take();}if (f != null) {//f不等于null,表示任务f已经执行完成--active;try {//返回任务的执行结果return f.get();} catch (ExecutionException eex) {ee = eex;} catch (RuntimeException rex) {ee = new ExecutionException(rex);}}}if (ee == null)ee = new ExecutionException();throw ee;} finally {//最后,取消任务for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);}}public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {try {return doInvokeAny(tasks, false, 0);} catch (TimeoutException cannotHappen) {assert false;return null;}}public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {return doInvokeAny(tasks, true, unit.toNanos(timeout));}/*** 批量执行任务,只有当所有的任务都执行完成了后,这个方法才会返回** @param tasks* @param <T>* @return* @throws InterruptedException*/public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {if (tasks == null)throw new NullPointerException();//保存future结果//每个任务都有一个futureArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());boolean done = false;try {/** 这个for循环相当于把任务放到线程池中执行**/for (Callable<T> t : tasks) {//包装任务task,并返回一个futureRunnableFuture<T> f = newTaskFor(t);futures.add(f);//执行任务//将任务f扔到线程池中去执行execute(f);}for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {try {//get是一个阻塞方法,若任务没有执行完成,则会阻塞在这儿,直至该任务执行完成f.get();} catch (CancellationException ignore) {//取消异常} catch (ExecutionException ignore) {//执行异常}}}done = true;return futures;} finally {/*** todo 注意:*   若在上面执行过程中出现了不是上面两个异常 CancellationException、ExecutionException,则*   异常直接抛出,这里done=false,然后取消所有任务的执行(这里是取消未执行完成的任务),这就是future 的意义所在*/if (!done)for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);}}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {if (tasks == null)throw new NullPointerException();long nanos = unit.toNanos(timeout);ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());boolean done = false;try {/*** 先创建,所有的任务创建Future完成后,再拿出来一起执行,这样是为了计算超时时间更准确*/for (Callable<T> t : tasks)futures.add(newTaskFor(t));//计算等待时间点final long deadline = System.nanoTime() + nanos;final int size = futures.size();for (int i = 0; i < size; i++) {execute((Runnable)futures.get(i));nanos = deadline - System.nanoTime();//超时,直接结束结束执行,futures 中可能有未执行的任务将不再执行if (nanos <= 0L)return futures;}/*** 判断任务是否执行完成*/for (int i = 0; i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {//若任务f没有执行完成//超过超时时间,则直接返回if (nanos <= 0L)return futures;try {//若超时时间还有,则等待超时时间nanosf.get(nanos, TimeUnit.NANOSECONDS);} catch (CancellationException ignore) {//忽略异常} catch (ExecutionException ignore) {} catch (TimeoutException toe) {return futures;}nanos = deadline - System.nanoTime();}}done = true;return futures;} finally {if (!done)for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);}}}

      


http://www.ppmy.cn/embedded/129068.html

相关文章

6.计算机网络_UDP

UDP的主要特点&#xff1a; 无连接&#xff0c;发送数据之前不需要建立连接。不保证可靠交付。面向报文。应用层给UDP报文后&#xff0c;UDP并不会抽象为一个一个的字节&#xff0c;而是整个报文一起发送。没有拥塞控制。网络拥堵时&#xff0c;发送端并不会降低发送速率。可以…

MongoDB如何查找数据以及条件运算符使用的详细说明

以下是关于MongoDB如何查找数据以及条件运算符使用的详细说明&#xff1a; 查找数据的基本方法 在MongoDB中&#xff0c;使用db.collection.find()方法来查找集合中的数据。如果不添加任何条件&#xff0c;直接使用db.collection.find()会返回集合中的所有文档。例如&#xf…

【STM32 HAL库】MPU6050姿态解算 卡尔曼滤波

【STM32 HAL库】MPU6050姿态解算 卡尔曼滤波 前言MPU6050寄存器代码详解mpu6050.cmpu6050.h 使用说明 前言 本篇文章基于卡尔曼滤波的原理详解与公式推导&#xff0c;来详细的解释下如何使用卡尔曼滤波来解算MPU6050的姿态 参考资料&#xff1a;Github_mpu6050 MPU6050寄存器…

26备战秋招day6——计算机视觉概述

计算机视觉&#xff08;Computer Vision&#xff09;概述 计算机视觉是一个研究如何让机器理解、分析和生成视觉信息的领域。它涉及从图像、视频中获取有意义的信息&#xff0c;目的是通过自动化的方式“看懂”世界。其典型的任务包括&#xff1a;物体识别、图像理解、目标检测…

AI金融攻防赛:YOLO理论学习及赛题进阶思路(DataWhale组队学习)

引言 大家好&#xff0c;我是GISer Liu&#x1f601;&#xff0c;一名热爱AI技术的GIS开发者。本系列文章是我跟随DataWhale 2024年10月学习赛的AI金融攻防赛学习总结文档。本文主要讲解如何在金融场景凭证篡改检测中应用YOLO算法。我们将从模型概述、数据准备、训练流程以及模…

得物iOS函数调用栈及符号化调优实践|得物技术

一、背景 随着《个人信息保护法》等法律法规的逐步发布实施&#xff0c;个人隐私保护受到越来越多人的关注。在这个整体的大背景下&#xff0c;得物持续完善App的各项合规属性&#xff0c;而在这个过程中&#xff0c;绕不开法务、安全、产品、设计、研发、测试几个重要环节&am…

【NLP】GloVe模型

一、Glove简介 GloVe (Global Vectors for Word Representation) 是一种基于词共现矩阵的词向量生成方法&#xff0c;由斯坦福大学的 Jeffrey Pennington、Richard Socher 和 Christopher D. Manning 提出。与 Word2Vec 不同&#xff0c;GloVe 通过全局统计信息&#xff08;词…

Spring Boot在中小型医院网站中的应用

1 绪论 1.1研究背景 随着计算机技术的成熟、普及&#xff0c;现代信息技术革命的迅猛发展,正冲击并进而改变着经济和社会结构。信息化的程度已经成为一个国家&#xff0c;一个企业&#xff0c;一个组织仍至一个人发展的基础和竞争成败的关键。 在实际的生活中&#xff0c;用户都…