【Java 并发编程】线程池理解与使用

embedded/2024/10/19 0:19:09/

前言


        在进入本章的学习之前,先来回顾一下,在没有使用线程池一直是如何执行任务的:

class Task implements Runnable{@Overridepublic void run() {System.out.println(Thread.currentThread().getName());}
}class Main{public static void main(String[] args) {// 创建任务Task task = new Task();// 创建线程Thread t = new Thread(task);// 启动线程t.start();}
}

        一般都是创建任务、创建线程、启动线程,任务结束后系统就会销毁线程,但是这种方式一个线程只能执行一个任务并不能复用。如果还有任务的话,还需要再创建一个线程去执行它。在数据量庞大的情况,就需要频繁创建与销毁线程,我们知道构造一个线程的开销是非常大的,这样的话就可能会导致系统崩溃

        那么一个线程能不能复用呢?答案是可以的,线程池就提供了线程回收机制,可以重复利用已创建的线程降低线程创建和销毁造成的消耗。

    public static void main(String[] args) {// 创建任务Task task1 = new Task();Task task2 = new Task();Task task3 = new Task();// 创建一个线程的线程池ExecutorService threadPool = Executors.newSingleThreadExecutor();// 提交多个任务threadPool.execute(task1);threadPool.execute(task2);threadPool.execute(task3);// 关闭线程池threadPool.shutdown();}

打印结果:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

        通过打印结果可以知道通过线程池,可以做到一个线程执行多个任务。所以线程池就是一个可以复用线程的技术。想要了解线程池的更多内容,请继续查看本篇文章。


前期回顾:【Java 并发编程】阻塞队列与仿真餐厅


目录

前言

线程池的概念 

线程池的使用

Executors  工厂类

ThreadPoolExecutor 构造器

ThreadPoolExecutor  线程工厂

ThreadPoolExecutor  拒绝策略

使用 ThreadPoolExecutor 创建线程的原因

 FixedThreadPool 源码

 SingleThreadPool 源码

 CachedThreadPool 源码

使用线程池处理 Runnable 任务

线程池的终止

使用线程池处理 Callable 任务

总结

 

线程池的概念 


        线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。

         通过前言的线程池的例子,我们可以总结出以下线程池的优点:

降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池的使用


        Java标准库提供了 ExecutorService 接口接口表示线程池,它的用法我们刚刚已经使用过了,这里就不再过多赘述:

    public static void main(String[] args) {// 创建任务Task task1 = new Task();Task task2 = new Task();Task task3 = new Task();// 创建固定大小的线程池:ExecutorService threadPool = Executors.newFixedThreadPool(3);// 提交任务threadPool.execute(task1);threadPool.execute(task2);threadPool.execute(task3);// 关闭线程池threadPool.shutdown();}

运行结果:

pool-1-thread-2
pool-1-thread-1
pool-1-thread-3

Executors  工厂类

         Executors 是 Java 中用于创建线程池的工厂类,它提供了一系列的静态工厂方法,用于创建不同类型的线程池。这些工厂方法隐藏了线程池的复杂性,使得线程池的创建变得非常简单:

newCachedThreadPool():创建一个可缓存的线程池。
newFixedThreadPool(int nThreads):创建一个固定大小的线程池,其中包含指定数量的线程。
newSingleThreadExecutor():创建一个单线程的线程池。这个线程池中只包含一个线程,用于串行执行任务。适用于需要按顺序执行任务的场景。
newScheduledThreadPool(int corePoolSize):创建一个固定大小的线程池,用于定时执行任务。
newSingleThreadScheduledExecutor():创建一个单线程的定时执行线程池。只包含一个线程,用于串行定时执行任务。
newWorkStealingPool(int parallelism):创建一个工作窃取线程池,线程数量根据CPU核心数动态调整。适用于CPU密集型的任务。

        详细可自行参考JDK文档:Interface Executor 


        因为 ExecutorService  只是接口无法实例出对象,Java标准库提供的几个常用实现类有:

AbstractExecutorService
ForkJoinPool
ScheduledThreadPoolExecutor
ThreadPoolExecutor

        接下来将会重点讲 ThreadPoolExecutor 如何自创建一个线程池对象,它也是我们日常中经常使用的一种方式。

ThreadPoolExecutor 构造器


        接下来我们看一下 ThreadPoolExecutor 是如何构造出线程池来的

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

 参数说明:

corePoolSize:线程池中用来工作的核心线程数量。
maximumPoolSize:最大线程数,线程池允许创建的最大线程数。
keepAliveTime:超出 corePoolSize 后创建的线程存活时间或者是所有线程最大存活时间,取决于配置。
unit:keepAliveTime 的时间单位。
workQueue:任务队列,是一个阻塞队列,当线程数达到核心线程数后,会将任务存储在阻塞队列中。
threadFactory :线程池内部创建线程所用的工厂。
handler:拒绝策略;当队列已满并且线程数量达到最大线程数量时,会调用该方法处理任务。

理解以上参数的含义:

        举个例子 ~

        模拟个在银行办理业务场景,corePoolSize 相当于当前正在工作的窗口,maximumPoolSize 相当于总窗口数,workQueue 相当于等候区,当工作窗口满人了,等待用户就在这里等待。

<1> 场景一:营业窗口没满

        如果营业窗口没有满,用户就可以不用等待直接办理业务。相当于我们的任务直接就可以交给线程去执行。

class Main{public static void main(String[] args) {ExecutorService executor =  new ThreadPoolExecutor(4,6,1, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());for(int i = 0;i < 4;i++){executor.execute(()->{System.out.println(Thread.currentThread().getName()+"正在办理业务");});}executor.shutdown();}
}

 运行结果:

pool-1-thread-2正在办理业务
pool-1-thread-1正在办理业务
pool-1-thread-3正在办理业务
pool-1-thread-4正在办理业务

 <2> 场景二:营业窗口满了

        如果营业窗口满了,用户就需要在等候区等待有空闲的窗口。相当于线程满了就让等待执行的任务进入阻塞队列,当线程空闲就将该任务弹出阻塞队列。


 <3> 场景三:等候区满了

        如果等候区满了,银行看见今天需要办理的用户那么多就会多开一个窗口进行业务办理,然后等候区的用户就能到这个窗口进行业务办理。相当于突然唤醒一个线程去执行任务。

class Main{public static void main(String[] args) {ExecutorService executor =  new ThreadPoolExecutor(4,6,1, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());for(int i = 0;i < 8;i++){executor.execute(()->{try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName()+"正在办理业务");});}executor.shutdown();}
}

 运行结果:

pool-1-thread-3正在办理业务
pool-1-thread-2正在办理业务
pool-1-thread-1正在办理业务
pool-1-thread-5正在办理业务
pool-1-thread-4正在办理业务
pool-1-thread-1正在办理业务
pool-1-thread-2正在办理业务
pool-1-thread-3正在办理业务

        此时我们就可以看见 线程5 在执行任务了,当然也可以看到一个线程执行了多次任务,这主要来至于线程池的复用机制。


<4> 场景四:用户量减少

        当过了一段时间,没有那么多人需要办理业务了,那么银行就会考虑关掉一个窗口。相当于 keepAliveTime 过了多少以 unit 为单位的时间,就需要关闭非核心的线程。 例如,以上设计的等待时间就是 1 秒,TimeUnit.SECONDS 是以秒为单位的。


  <5> 场景五:用户量增多

         当所有的窗口与等候区都满了后,此时又来一个用户要办理业务,就没有位置了,此时你就要拒绝这位用户。而 handler 就是拒绝策略。AbortPolicy() 方法的策略就是直接抛出异常。

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.thz.Main$$Lambda/0x00000151bd003200@9807454 rejected from java.util.concurrent.ThreadPoolExecutor@6e8cf4c6[Running, pool size = 6, active threads = 6, queued tasks = 3, completed tasks = 0]at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)at com.thz.Main.main(Test.java:37)

扩展:

ThreadPoolExecutor  线程工厂

        在工作中使用线程池,万一线程抛出异常了,日志不好记录到底是哪个线程池抛出的异常;所以为了方便排查,给线程池的线程自定义命名。下面代码示范:

class Test{private static int Count;static ExecutorService executor =  new ThreadPoolExecutor(3,3,1, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(1),new ThreadFactory(){ public Thread newThread(Runnable r) {Thread t =  new Thread(r, "线程" + Count);Count++;return t;}},new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) {for(int i=0;i<4;i++){executor.execute(()->{System.out.println(Thread.currentThread().getName()+"正在执行任务");});}executor.shutdown();}
}

 运行结果:

线程0正在执行任务
线程1正在执行任务
线程2正在执行任务
线程0正在执行任务

ThreadPoolExecutor  拒绝策略

AbortPolicy丢弃任务并抛出 RejectedExecutionException 异常。是默认的策略。
DiscardPolicy丢弃任务,但是不抛出异常 这是不推荐的做法。
DiscardoldestPolicy抛弃队列中等待最久的任务 然后把当前任务加入队列中。
CallerRunsPolicy由主线程负责调用任务的run()方法从而绕过线程池直接执行。

 这里举个 CallerRunsPolicy 的例子:

class Main{public static void main(String[] args) {ExecutorService executor =  new ThreadPoolExecutor(1,1,1, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(1),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());for(int i = 0;i < 3;i++){executor.execute(()->{System.out.println(Thread.currentThread().getName()+"正在办理业务");});}executor.shutdown();}
}

 运行结果

pool-1-thread-1正在办理业务
pool-1-thread-1正在办理业务
main正在办理业务

        此时,我们的主线程就会帮助我们去执行任务了。

总结:

临时线程创建条件:新任务提交时发现核心线程都在忙,任务队列也满了,并且还可以创建临时线程,此时才会创建临时线程
拒绝新任务的条件:核心线程和临时线程都在忙,任务队列也满了,新的任务过来的时候才会开始拒绝任务

使用 ThreadPoolExecutor 创建线程的原因

        阿里巴巴《Java开发手册》中提到,最好使用 ThreadPoolExecutor 去创建线程,关于这个建议我们这里可以探究一下 FixedThreadPool 和 SingleThreadPool 以及 CachedThreadPool 有何风险。

FixedThreadPool固定大小的线程池
SingleThreadPool单个线程的线程池
CachedThreadPool可缓存的线程池
FixedThreadPool 源码
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());}

        根据上述源码,我们知道,核心线程数与最大线程数相等,意味着它里面全是核心线程。空闲线程存活时间为 0 毫秒,空闲线程就不会被销毁。任务队列采用的是 LinkedBlockingQueue 需要注意的是,此队列具有一定的风险:

    public final class Integer extends Number implements Comparable<Integer>, Constable, ConstantDesc {public static final int MAX_VALUE = 2147483647;...}public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {...public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}...}

       LinkedBlockingQueue 内部默认长度是 MAX_VALUE,所以我们感觉不到它的长度限制。注意,如果在并发量比较大的情况下,线程池中几乎可以无限制的添加任务,容易导致内存溢出问题。

 SingleThreadPool 源码
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new AutoShutdownDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory));}

        我们发现其内部也是使用 ThreadPoolExecutor 来创建线程池,其中核心线程与最大线程一样都是 1,说明这是一个单个线程的线程池。空闲线程存活时间为 0 毫秒,空闲线程就不会被销毁。任务队列采用的是 LinkedBlockingQueue,与上述一样的风险。

 CachedThreadPool 源码
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory);}

        根据上述源码,我们知道,核心线程数为 0,说明里面全是非核心线程,非核心线程空闲下来是需要被销毁的。最大线程数是 Integer.MAX_VALUE,风险不言而喻,同样是 “无限制的添加任务,容易导致内存溢出问题”。

        所以以上三种创建线程的方式不推荐大家使用,因为会有内存溢出风险。

使用线程池处理 Runnable 任务


代码展示: 

class MyRunnable implements Runnable {@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"正在执行任务");}
}class Main{public static void main(String[] args) {ExecutorService executor =  new ThreadPoolExecutor(3,3,5, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(1),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());Runnable r = new MyRunnable();executor.execute(r);}
}
void execute(Runnable command)执行任务,没有返回值,一般用来执行 Runnable 任务

        这里需要注意的是:execute 这个方法会使线程池会自动创建一个新线程,自动处理这个任务。

扩展

线程池的终止

shutdown()
等着线程池的任务全部执行完毕后,再关闭线程池
shutdownNow()
不管任务是否执行完毕,立即关闭线程池!

使用线程池处理 Callable 任务


代码展示: 

class MyCallable implements Callable<String> {private int n;public MyCallable(int n){this.n = n;}@Overridepublic String call() throws Exception {// 描述线程的任务,返回线程执行返回后的结果int sum = 0;for (int i = 0; i <= n; i++) {sum += i;}return Thread.currentThread().getName() + "求出了1-" + n + "的和是:" + sum;}
}class ThreadPoolTest2 {public static void main(String[] args) throws Exception{// 1.通过ThreadPoolExecutor创建一个线程池对象ExecutorService pool = new ThreadPoolExecutor(3, 5, 8,TimeUnit.SECONDS, new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 2.使用线程处理Callable任务。Future<String> f1 = pool.submit(new MyCallable(100));Future<String> f2 = pool.submit(new MyCallable(200));Future<String> f3 = pool.submit(new MyCallable(300));Future<String> f4 = pool.submit(new MyCallable(400));System.out.println(f1.get());System.out.println(f2.get());System.out.println(f3.get());System.out.println(f4.get());pool.shutdown();}
}

运行结果:

pool-1-thread-1求出了1-100的和是:5050
pool-1-thread-2求出了1-200的和是:20100
pool-1-thread-3求出了1-300的和是:45150
pool-1-thread-2求出了1-400的和是:80200
Future<T>submit(Callable<T> task)执行任务,返回未来任务对象获取线程结果,一般拿来执行Callable 任务

        Callable 接口代表一段可以调用并返回结果的代码;Future 接口表示异步任务,是还没有完成的任务给出的未来结果。所以说 Callable 用于产生结果,Future 用于获取结果。Future 提供了 get() 方法让我们可以等待 Callable 结束并获取它的执行结果。

 

总结

线程池中的线程可以复用,降低了高并发的资源消耗
通常使用 ThreadPoolExecutor 来创建线程,要注意其他方法创建线程池的风险
要熟练掌握 ThreadPoolExecutor 构造的七个参数


http://www.ppmy.cn/embedded/128591.html

相关文章

利用编程思维做题之求节点 x 在二叉树中的双亲节点算法

在二叉树中查找某个节点 x 的双亲节点是一个典型的树操作问题。双亲节点的概念是指某个节点的直接上级节点&#xff0c;即它的父节点。我们将通过遍历树的结构&#xff0c;查找并返回 x 的双亲节点。 1. 理解问题 我们需要设计一个算法来查找给定二叉树中节点 x 的双亲节点。目…

spring Jdbc

--------------------------------SpringJdbc-------------------------------- 创建数据库 DROP DATABASE IF EXISTS studb; CREATE DATABASE studb; USE studb; CREATE TABLE student ( id INT PRIMARY KEY, NAME VARCHAR(50) NOT NULL, gender VARCHAR(10) NO…

Vue3新特性合集

Vue3 简介 ‌‌Vue 3‌ 是‌Vue.js的最新版本&#xff0c;它带来了许多改进和新的特性&#xff0c;旨在提供更好的性能、更强的类型支持以及更灵活的组件开发方式。Vue 3的推出是为了解决Vue 2中存在的一些限制&#xff0c;如响应式系统的限制和虚拟DOM的效率问题。Vue 3在多…

Dinky 字段模式演变 PIPELINE 同步MySQL到Doris

背景 用Dinky数据平台 FlinkCDC收集Mysql BinLog 至 Doris 搭建实时数仓 问题 用Dinky CDCSOURCE 字段模式演变 整库同步Mysql到Doris 字段新增删除不生效 组件信息 Flink 1.17 FlinkCDC 3.1 dinky 1.1 Doris 2.1.6 Mysql 8.0Dinky MySQLCDC 整库到 Doris需要的依赖 Flink/…

开发一个微信小程序要多少钱?

在当今数字化时代&#xff0c;微信小程序成为众多企业和个人拓展业务、提供服务的热门选择。那么&#xff0c;开发一个微信小程序究竟需要多少钱呢&#xff1f; 开发成本主要取决于多个因素。首先是功能需求的复杂程度。如果只是一个简单的信息展示小程序&#xff0c;功能仅限…

【C++标准模版库】unordered_map和unordered_set的介绍及使用

unordered_map和unordered_set 一.unordered_set1.unordered_set类的介绍2.unordered_set和set的使用差异 二.unordered_map1.unordered_map和map的使用差异 三.unordered_multimap/unordered_multiset四.unordered_map/unordered_set的哈希相关接口 一.unordered_set 1.unord…

Git 可视化的实现:提升版本控制体验的利器

Git 是目前最流行的分布式版本控制系统&#xff0c;广泛应用于软件开发和项目管理中。然而&#xff0c;对于许多人来说&#xff0c;Git 命令行操作可能有些复杂且难以直观理解&#xff0c;特别是当涉及到复杂的分支和合并操作时。为了更好地帮助开发者掌握 Git 的操作过程&…

electron-vite_5打包后跳转失效?请用hash

关于打包后跳转失效, 请检查你的vue和react路由模式是不是hash模式&#xff1b; 注意必须使用hash模式&#xff0c;否则打包后路由跳转失效 React 版本 // "react-router-dom": "^6.11.2", import { HashRouter } from react-router-dom ReactDOM.createR…