ThreadPoolExecutor应用源码剖析(一)

news/2024/11/7 16:58:52/

前面讲到的Executors中的构建线程池的方式,大多数还是基于ThreadPoolExecutor去new出来的。

3.1 为什么要自定义线程池

        首先ThreadPoolExecutor中,一共提供了7个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。
        但是如果直接采用JDK提供的方式去构建,可以设置的核心参数最多就两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐自己去自定义线程池。手动的去newThreadPoolExecutor设置他的一些核心属性。
        自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。查看一下ThreadPoolExecutor提供的七个核心参数。

public ThreadPoolExecutor(
int corePoolSize, // 核心工作线程(当前任务执行结束后,不会被销毁)
int maximumPoolSize, // 最大工作线程(代表当前线程池中,一共可以有多少个工作线程)
long keepAliveTime, // 非核心工作线程在阻塞队列位置等待的时间
TimeUnit unit, // 非核心工作线程在阻塞队列位置等待时间的单位
BlockingQueue<Runnable> workQueue, // 任务在没有核心工作线程处理时,任务先扔到阻塞队列中
ThreadFactory threadFactory, // 构建线程的线程工作,可以设置thread的一些信息
RejectedExecutionHandler handler) { // 当线程池无法处理投递过来的任务时,执行当前的拒绝策略
// 初始化线程池的操作
}

3.2 ThreadPoolExecutor应用

手动new一下,处理的方式还是执行execute或者submit方法。
JDK提供的几种拒绝策略:
● AbortPolicy:当前拒绝策略会在无法处理任务时,直接抛出一个异
常 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
● CallerRunsPolicy:当前拒绝策略会在线程池无法处理任务时,将任务交给调用者处
理 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
● DiscardPolicy:当前拒绝策略会在线程池无法处理任务时,直接将任务丢弃
掉 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
● DiscardOldestPolicy:当前拒绝策略会在线程池无法处理任务时,将队列中最早的任务丢弃掉,将当前任务再次尝试交给线程池处理 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
● 自定义Policy:根据自己的业务,可以将任务扔到数据库,也可以做其他操
作。 private static class MyRejectedExecution implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("根据自己的业务情况,决定编写的代码!"); } }
代码构建线程池,并处理有无返回结果的任务

public static void main(String[] args) throws ExecutionException, InterruptedException {
//1. 构建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {Thread thread = new Thread(r);
thread.setName("test-ThreadPoolExecutor");
return thread;
}
},
new MyRejectedExecution()
);
//2. 让线程池处理任务,没返回结果
threadPool.execute(() -> {
System.out.println("没有返回结果的任务");
});
//3. 让线程池处理有返回结果的任务
Future<Object> future = threadPool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println("我有返回结果!");
return "返回结果";
}
});
Object result = future.get();
System.out.println(result);
//4. 如果是局部变量的线程池,记得用完要shutdown
threadPool.shutdown();
}
private static class MyRejectedExecution implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("根据自己的业务情况,决定编写的代码!");
}
}

3.3 ThreadPoolExecutor源码剖析

线程池的源码内容会比较多一点,需要一点一点的去查看,内部比较多。

3.3.1 ThreadPoolExecutor的核心属性

核心属性主要就是ctl,基于ctl拿到线程池的状态以及工作线程个数
在整个线程池的执行流程中,会基于ctl判断上述两个内容

// 当前是线程池的核心属性
// 当前的ctl其实就是一个int类型的数值,内部是基于AtomicInteger套了一层,进行运算时,是原子性的。
// ctl表示着线程池中的2个核心状态:
// 线程池的状态:ctl的高3位,表示线程池状态
// 工作线程的数量:ctl的低29位,表示工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer.SIZE:在获取Integer的bit位个数
// 声明了一个常量:COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
00000000 00000000 00000000 00000001
00100000 00000000 00000000 00000000
00011111 11111111 11111111 11111111
// CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态的表示
// 当前五个状态中,只有RUNNING状态代表线程池没问题,可以正常接收任务处理
// 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务。
private static final int RUNNING = -1 << COUNT_BITS;
// 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。
private static final int STOP = 1 << COUNT_BITS;
// 010:代表TIDYING状态,这个状态是否SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态。
private static final int TIDYING = 2 << COUNT_BITS;
// 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。
private static final int TERMINATED = 3 << COUNT_BITS;
// 在使用下面这几个方法时,需要传递ctl进来
// 基于&运算的特点,保证只会拿到ctl高三位的值。
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 基于&运算的特点,保证只会拿到ctl低29位的值。
private static int workerCountOf(int c) { return c & CAPACITY; }


http://www.ppmy.cn/news/1255150.html

相关文章

Linux 下命令行启动与关闭WebLogic的相关服务

WebLogic 的服务器类型 WebLogic提供了三种类型的服务器&#xff1a; 管理服务器节点服务器托管服务器 示例和关系如下图&#xff1a; 对应三类服务器&#xff0c; 就有三种启动和关闭的方式。本篇介绍使用命令行脚本的方式启动和关闭这三种类型的服务器。 关于WebLogic 的…

Java架构师技术为业务赋能

目录 1 概论2 天猫的难言之隐3 如何拆解技术难点-三段论4 天猫线的破局之道-双引擎回归测试框架5 架构师的心理游戏-解决问题从转换思维开始6 技术助力业务的两个方向7 阿里新零售部门如何培养技术团队的业务知识8 如何围绕业务特点制定技术发展路线-阿里系和抖音案例9 阿里系业…

一个用c#瞎写的sftp工具

0.下载地址 https://wwus.lanzouj.com/iOZUv1gkgpze 密码:123456 1.能进行单个和批量下载, 没有弄上传 2.速度奇差,可能是某些地方没弄好.有一定的进度显示,但是不太准. 3.很多地方没弄好,有能力的自己弄一下 4.在app.config文件配置sftp

详细了解 MOSFET 晶体管

MOSFET 开关晶体管 MOS 管是 “金属&#xff08;Metal&#xff09;氧化物&#xff08;Oxide&#xff09;半导体&#xff08;Semi&#xff09;” 场效应晶体管&#xff0c;或者称是 “金属&#xff08;Metal&#xff09;绝缘体&#xff08;Insulator&#xff09;半导体&#xf…

代码随想录算法训练营 ---第五十二天

第一题&#xff1a; 简介&#xff1a; 动态规划五部曲&#xff1a; 1.确定 dp数组下标的定义 dp[i] 到达 i 时 最长递增子序列的长度 2.确定递推公式 我们确定当前的最大长度需要遍历前面所有的最大长度&#xff0c;然后如果序列最后一个值小于nums[i]那就dp[j] 1&#xf…

Task中Wait()和Result造成死锁

在使用Task的时候&#xff0c;一不留神就会造成死锁&#xff0c;而且难以发现&#xff0c;尤其是业务繁多的情况下&#xff0c;一个Task嵌套另一个Task的时候&#xff0c;下面就演示一下&#xff0c;在什么情况下&#xff0c;会产生Wait()和Result的死锁&#xff0c;因此&#…

Chat-GPT原理

GPT原理 核心是基于Transformer 架构 英文原文&#xff1a; ​ Transformers are based on the “attention mechanism,” which allows the model to pay more attention to some inputs than others, regardless of where they show up in the input sequence. For exampl…

掌握视频剪辑技巧,轻松自定义视频速率,打造个性化出彩视频

你是否曾经因为视频节奏平淡而缺乏吸引力而苦恼&#xff1f;现在&#xff0c;我们为你推荐一款视频批量剪辑工具&#xff0c;让你轻松自定义视频速率&#xff0c;实现出彩个性化视频。 首先第一步&#xff0c;我们要打开好简单批量智剪&#xff0c;并登录账号。 第二步&#x…