前面讲到的Executors中的构建线程池的方式,大多数还是基于ThreadPoolExecutor去new出来的。
3.1 为什么要自定义线程池
首先ThreadPoolExecutor中,一共提供了7个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。
但是如果直接采用JDK提供的方式去构建,可以设置的核心参数最多就两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐自己去自定义线程池。手动的去newThreadPoolExecutor设置他的一些核心属性。
自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。查看一下ThreadPoolExecutor提供的七个核心参数。
public ThreadPoolExecutor(
int corePoolSize, // 核心工作线程(当前任务执行结束后,不会被销毁)
int maximumPoolSize, // 最大工作线程(代表当前线程池中,一共可以有多少个工作线程)
long keepAliveTime, // 非核心工作线程在阻塞队列位置等待的时间
TimeUnit unit, // 非核心工作线程在阻塞队列位置等待时间的单位
BlockingQueue<Runnable> workQueue, // 任务在没有核心工作线程处理时,任务先扔到阻塞队列中
ThreadFactory threadFactory, // 构建线程的线程工作,可以设置thread的一些信息
RejectedExecutionHandler handler) { // 当线程池无法处理投递过来的任务时,执行当前的拒绝策略
// 初始化线程池的操作
}
3.2 ThreadPoolExecutor应用
手动new一下,处理的方式还是执行execute或者submit方法。
JDK提供的几种拒绝策略:
● AbortPolicy:当前拒绝策略会在无法处理任务时,直接抛出一个异
常 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
● CallerRunsPolicy:当前拒绝策略会在线程池无法处理任务时,将任务交给调用者处
理 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
● DiscardPolicy:当前拒绝策略会在线程池无法处理任务时,直接将任务丢弃
掉 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
● DiscardOldestPolicy:当前拒绝策略会在线程池无法处理任务时,将队列中最早的任务丢弃掉,将当前任务再次尝试交给线程池处理 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
● 自定义Policy:根据自己的业务,可以将任务扔到数据库,也可以做其他操
作。 private static class MyRejectedExecution implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("根据自己的业务情况,决定编写的代码!"); } }
代码构建线程池,并处理有无返回结果的任务
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1. 构建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {Thread thread = new Thread(r);
thread.setName("test-ThreadPoolExecutor");
return thread;
}
},
new MyRejectedExecution()
);
//2. 让线程池处理任务,没返回结果
threadPool.execute(() -> {
System.out.println("没有返回结果的任务");
});
//3. 让线程池处理有返回结果的任务
Future<Object> future = threadPool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println("我有返回结果!");
return "返回结果";
}
});
Object result = future.get();
System.out.println(result);
//4. 如果是局部变量的线程池,记得用完要shutdown
threadPool.shutdown();
}
private static class MyRejectedExecution implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("根据自己的业务情况,决定编写的代码!");
}
}
3.3 ThreadPoolExecutor源码剖析
线程池的源码内容会比较多一点,需要一点一点的去查看,内部比较多。
3.3.1 ThreadPoolExecutor的核心属性
核心属性主要就是ctl,基于ctl拿到线程池的状态以及工作线程个数
在整个线程池的执行流程中,会基于ctl判断上述两个内容
// 当前是线程池的核心属性
// 当前的ctl其实就是一个int类型的数值,内部是基于AtomicInteger套了一层,进行运算时,是原子性的。
// ctl表示着线程池中的2个核心状态:
// 线程池的状态:ctl的高3位,表示线程池状态
// 工作线程的数量:ctl的低29位,表示工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer.SIZE:在获取Integer的bit位个数
// 声明了一个常量:COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
00000000 00000000 00000000 00000001
00100000 00000000 00000000 00000000
00011111 11111111 11111111 11111111
// CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态的表示
// 当前五个状态中,只有RUNNING状态代表线程池没问题,可以正常接收任务处理
// 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务。
private static final int RUNNING = -1 << COUNT_BITS;
// 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。
private static final int STOP = 1 << COUNT_BITS;
// 010:代表TIDYING状态,这个状态是否SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态。
private static final int TIDYING = 2 << COUNT_BITS;
// 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。
private static final int TERMINATED = 3 << COUNT_BITS;
// 在使用下面这几个方法时,需要传递ctl进来
// 基于&运算的特点,保证只会拿到ctl高三位的值。
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 基于&运算的特点,保证只会拿到ctl低29位的值。
private static int workerCountOf(int c) { return c & CAPACITY; }