【Java 并发编程】线程池理解与使用

devtools/2024/10/18 22:26:56/

前言


        在进入本章的学习之前,先来回顾一下,在没有使用线程池一直是如何执行任务的:

class Task implements Runnable{@Overridepublic void run() {System.out.println(Thread.currentThread().getName());}
}class Main{public static void main(String[] args) {// 创建任务Task task = new Task();// 创建线程Thread t = new Thread(task);// 启动线程t.start();}
}

        一般都是创建任务、创建线程、启动线程,任务结束后系统就会销毁线程,但是这种方式一个线程只能执行一个任务并不能复用。如果还有任务的话,还需要再创建一个线程去执行它。在数据量庞大的情况,就需要频繁创建与销毁线程,我们知道构造一个线程的开销是非常大的,这样的话就可能会导致系统崩溃

        那么一个线程能不能复用呢?答案是可以的,线程池就提供了线程回收机制,可以重复利用已创建的线程降低线程创建和销毁造成的消耗。

    public static void main(String[] args) {// 创建任务Task task1 = new Task();Task task2 = new Task();Task task3 = new Task();// 创建一个线程的线程池ExecutorService threadPool = Executors.newSingleThreadExecutor();// 提交多个任务threadPool.execute(task1);threadPool.execute(task2);threadPool.execute(task3);// 关闭线程池threadPool.shutdown();}

打印结果:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

        通过打印结果可以知道通过线程池,可以做到一个线程执行多个任务。所以线程池就是一个可以复用线程的技术。想要了解线程池的更多内容,请继续查看本篇文章。


前期回顾:【Java 并发编程】阻塞队列与仿真餐厅


目录

前言

线程池的概念 

线程池的使用

Executors  工厂类

ThreadPoolExecutor 构造器

ThreadPoolExecutor  线程工厂

ThreadPoolExecutor  拒绝策略

使用 ThreadPoolExecutor 创建线程的原因

 FixedThreadPool 源码

 SingleThreadPool 源码

 CachedThreadPool 源码

使用线程池处理 Runnable 任务

线程池的终止

使用线程池处理 Callable 任务

总结

 

线程池的概念 


        线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。

         通过前言的线程池的例子,我们可以总结出以下线程池的优点:

降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池的使用


        Java标准库提供了 ExecutorService 接口接口表示线程池,它的用法我们刚刚已经使用过了,这里就不再过多赘述:

    public static void main(String[] args) {// 创建任务Task task1 = new Task();Task task2 = new Task();Task task3 = new Task();// 创建固定大小的线程池:ExecutorService threadPool = Executors.newFixedThreadPool(3);// 提交任务threadPool.execute(task1);threadPool.execute(task2);threadPool.execute(task3);// 关闭线程池threadPool.shutdown();}

运行结果:

pool-1-thread-2
pool-1-thread-1
pool-1-thread-3

Executors  工厂类

         Executors 是 Java 中用于创建线程池的工厂类,它提供了一系列的静态工厂方法,用于创建不同类型的线程池。这些工厂方法隐藏了线程池的复杂性,使得线程池的创建变得非常简单:

newCachedThreadPool():创建一个可缓存的线程池。
newFixedThreadPool(int nThreads):创建一个固定大小的线程池,其中包含指定数量的线程。
newSingleThreadExecutor():创建一个单线程的线程池。这个线程池中只包含一个线程,用于串行执行任务。适用于需要按顺序执行任务的场景。
newScheduledThreadPool(int corePoolSize):创建一个固定大小的线程池,用于定时执行任务。
newSingleThreadScheduledExecutor():创建一个单线程的定时执行线程池。只包含一个线程,用于串行定时执行任务。
newWorkStealingPool(int parallelism):创建一个工作窃取线程池,线程数量根据CPU核心数动态调整。适用于CPU密集型的任务。

        详细可自行参考JDK文档:Interface Executor 


        因为 ExecutorService  只是接口无法实例出对象,Java标准库提供的几个常用实现类有:

AbstractExecutorService
ForkJoinPool
ScheduledThreadPoolExecutor
ThreadPoolExecutor

        接下来将会重点讲 ThreadPoolExecutor 如何自创建一个线程池对象,它也是我们日常中经常使用的一种方式。

ThreadPoolExecutor 构造器


        接下来我们看一下 ThreadPoolExecutor 是如何构造出线程池来的

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

 参数说明:

corePoolSize:线程池中用来工作的核心线程数量。
maximumPoolSize:最大线程数,线程池允许创建的最大线程数。
keepAliveTime:超出 corePoolSize 后创建的线程存活时间或者是所有线程最大存活时间,取决于配置。
unit:keepAliveTime 的时间单位。
workQueue:任务队列,是一个阻塞队列,当线程数达到核心线程数后,会将任务存储在阻塞队列中。
threadFactory :线程池内部创建线程所用的工厂。
handler:拒绝策略;当队列已满并且线程数量达到最大线程数量时,会调用该方法处理任务。

理解以上参数的含义:

        举个例子 ~

        模拟个在银行办理业务场景,corePoolSize 相当于当前正在工作的窗口,maximumPoolSize 相当于总窗口数,workQueue 相当于等候区,当工作窗口满人了,等待用户就在这里等待。

<1> 场景一:营业窗口没满

        如果营业窗口没有满,用户就可以不用等待直接办理业务。相当于我们的任务直接就可以交给线程去执行。

class Main{public static void main(String[] args) {ExecutorService executor =  new ThreadPoolExecutor(4,6,1, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());for(int i = 0;i < 4;i++){executor.execute(()->{System.out.println(Thread.currentThread().getName()+"正在办理业务");});}executor.shutdown();}
}

 运行结果:

pool-1-thread-2正在办理业务
pool-1-thread-1正在办理业务
pool-1-thread-3正在办理业务
pool-1-thread-4正在办理业务

 <2> 场景二:营业窗口满了

        如果营业窗口满了,用户就需要在等候区等待有空闲的窗口。相当于线程满了就让等待执行的任务进入阻塞队列,当线程空闲就将该任务弹出阻塞队列。


 <3> 场景三:等候区满了

        如果等候区满了,银行看见今天需要办理的用户那么多就会多开一个窗口进行业务办理,然后等候区的用户就能到这个窗口进行业务办理。相当于突然唤醒一个线程去执行任务。

class Main{public static void main(String[] args) {ExecutorService executor =  new ThreadPoolExecutor(4,6,1, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());for(int i = 0;i < 8;i++){executor.execute(()->{try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName()+"正在办理业务");});}executor.shutdown();}
}

 运行结果:

pool-1-thread-3正在办理业务
pool-1-thread-2正在办理业务
pool-1-thread-1正在办理业务
pool-1-thread-5正在办理业务
pool-1-thread-4正在办理业务
pool-1-thread-1正在办理业务
pool-1-thread-2正在办理业务
pool-1-thread-3正在办理业务

        此时我们就可以看见 线程5 在执行任务了,当然也可以看到一个线程执行了多次任务,这主要来至于线程池的复用机制。


<4> 场景四:用户量减少

        当过了一段时间,没有那么多人需要办理业务了,那么银行就会考虑关掉一个窗口。相当于 keepAliveTime 过了多少以 unit 为单位的时间,就需要关闭非核心的线程。 例如,以上设计的等待时间就是 1 秒,TimeUnit.SECONDS 是以秒为单位的。


  <5> 场景五:用户量增多

         当所有的窗口与等候区都满了后,此时又来一个用户要办理业务,就没有位置了,此时你就要拒绝这位用户。而 handler 就是拒绝策略。AbortPolicy() 方法的策略就是直接抛出异常。

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.thz.Main$$Lambda/0x00000151bd003200@9807454 rejected from java.util.concurrent.ThreadPoolExecutor@6e8cf4c6[Running, pool size = 6, active threads = 6, queued tasks = 3, completed tasks = 0]at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)at com.thz.Main.main(Test.java:37)

扩展:

ThreadPoolExecutor  线程工厂

        在工作中使用线程池,万一线程抛出异常了,日志不好记录到底是哪个线程池抛出的异常;所以为了方便排查,给线程池的线程自定义命名。下面代码示范:

class Test{private static int Count;static ExecutorService executor =  new ThreadPoolExecutor(3,3,1, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(1),new ThreadFactory(){ public Thread newThread(Runnable r) {Thread t =  new Thread(r, "线程" + Count);Count++;return t;}},new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) {for(int i=0;i<4;i++){executor.execute(()->{System.out.println(Thread.currentThread().getName()+"正在执行任务");});}executor.shutdown();}
}

 运行结果:

线程0正在执行任务
线程1正在执行任务
线程2正在执行任务
线程0正在执行任务

ThreadPoolExecutor  拒绝策略

AbortPolicy丢弃任务并抛出 RejectedExecutionException 异常。是默认的策略。
DiscardPolicy丢弃任务,但是不抛出异常 这是不推荐的做法。
DiscardoldestPolicy抛弃队列中等待最久的任务 然后把当前任务加入队列中。
CallerRunsPolicy由主线程负责调用任务的run()方法从而绕过线程池直接执行。

 这里举个 CallerRunsPolicy 的例子:

class Main{public static void main(String[] args) {ExecutorService executor =  new ThreadPoolExecutor(1,1,1, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(1),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());for(int i = 0;i < 3;i++){executor.execute(()->{System.out.println(Thread.currentThread().getName()+"正在办理业务");});}executor.shutdown();}
}

 运行结果

pool-1-thread-1正在办理业务
pool-1-thread-1正在办理业务
main正在办理业务

        此时,我们的主线程就会帮助我们去执行任务了。

总结:

临时线程创建条件:新任务提交时发现核心线程都在忙,任务队列也满了,并且还可以创建临时线程,此时才会创建临时线程
拒绝新任务的条件:核心线程和临时线程都在忙,任务队列也满了,新的任务过来的时候才会开始拒绝任务

使用 ThreadPoolExecutor 创建线程的原因

        阿里巴巴《Java开发手册》中提到,最好使用 ThreadPoolExecutor 去创建线程,关于这个建议我们这里可以探究一下 FixedThreadPool 和 SingleThreadPool 以及 CachedThreadPool 有何风险。

FixedThreadPool固定大小的线程池
SingleThreadPool单个线程的线程池
CachedThreadPool可缓存的线程池
FixedThreadPool 源码
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());}

        根据上述源码,我们知道,核心线程数与最大线程数相等,意味着它里面全是核心线程。空闲线程存活时间为 0 毫秒,空闲线程就不会被销毁。任务队列采用的是 LinkedBlockingQueue 需要注意的是,此队列具有一定的风险:

    public final class Integer extends Number implements Comparable<Integer>, Constable, ConstantDesc {public static final int MAX_VALUE = 2147483647;...}public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {...public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}...}

       LinkedBlockingQueue 内部默认长度是 MAX_VALUE,所以我们感觉不到它的长度限制。注意,如果在并发量比较大的情况下,线程池中几乎可以无限制的添加任务,容易导致内存溢出问题。

 SingleThreadPool 源码
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new AutoShutdownDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory));}

        我们发现其内部也是使用 ThreadPoolExecutor 来创建线程池,其中核心线程与最大线程一样都是 1,说明这是一个单个线程的线程池。空闲线程存活时间为 0 毫秒,空闲线程就不会被销毁。任务队列采用的是 LinkedBlockingQueue,与上述一样的风险。

 CachedThreadPool 源码
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory);}

        根据上述源码,我们知道,核心线程数为 0,说明里面全是非核心线程,非核心线程空闲下来是需要被销毁的。最大线程数是 Integer.MAX_VALUE,风险不言而喻,同样是 “无限制的添加任务,容易导致内存溢出问题”。

        所以以上三种创建线程的方式不推荐大家使用,因为会有内存溢出风险。

使用线程池处理 Runnable 任务


代码展示: 

class MyRunnable implements Runnable {@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"正在执行任务");}
}class Main{public static void main(String[] args) {ExecutorService executor =  new ThreadPoolExecutor(3,3,5, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(1),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());Runnable r = new MyRunnable();executor.execute(r);}
}
void execute(Runnable command)执行任务,没有返回值,一般用来执行 Runnable 任务

        这里需要注意的是:execute 这个方法会使线程池会自动创建一个新线程,自动处理这个任务。

扩展

线程池的终止

shutdown()
等着线程池的任务全部执行完毕后,再关闭线程池
shutdownNow()
不管任务是否执行完毕,立即关闭线程池!

使用线程池处理 Callable 任务


代码展示: 

class MyCallable implements Callable<String> {private int n;public MyCallable(int n){this.n = n;}@Overridepublic String call() throws Exception {// 描述线程的任务,返回线程执行返回后的结果int sum = 0;for (int i = 0; i <= n; i++) {sum += i;}return Thread.currentThread().getName() + "求出了1-" + n + "的和是:" + sum;}
}class ThreadPoolTest2 {public static void main(String[] args) throws Exception{// 1.通过ThreadPoolExecutor创建一个线程池对象ExecutorService pool = new ThreadPoolExecutor(3, 5, 8,TimeUnit.SECONDS, new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 2.使用线程处理Callable任务。Future<String> f1 = pool.submit(new MyCallable(100));Future<String> f2 = pool.submit(new MyCallable(200));Future<String> f3 = pool.submit(new MyCallable(300));Future<String> f4 = pool.submit(new MyCallable(400));System.out.println(f1.get());System.out.println(f2.get());System.out.println(f3.get());System.out.println(f4.get());pool.shutdown();}
}

运行结果:

pool-1-thread-1求出了1-100的和是:5050
pool-1-thread-2求出了1-200的和是:20100
pool-1-thread-3求出了1-300的和是:45150
pool-1-thread-2求出了1-400的和是:80200
Future<T>submit(Callable<T> task)执行任务,返回未来任务对象获取线程结果,一般拿来执行Callable 任务

        Callable 接口代表一段可以调用并返回结果的代码;Future 接口表示异步任务,是还没有完成的任务给出的未来结果。所以说 Callable 用于产生结果,Future 用于获取结果。Future 提供了 get() 方法让我们可以等待 Callable 结束并获取它的执行结果。

 

总结

线程池中的线程可以复用,降低了高并发的资源消耗
通常使用 ThreadPoolExecutor 来创建线程,要注意其他方法创建线程池的风险
要熟练掌握 ThreadPoolExecutor 构造的七个参数


http://www.ppmy.cn/devtools/126841.html

相关文章

如何识别并防范网络诈骗?

华企网安警示&#xff1a; 1.提高警惕性&#xff1a;对所有在线通信保持警惕&#xff0c;特别是那些要求提供个人信息或财务信息的请求。 2.检查发件人信息&#xff1a;在电子邮件或消息中&#xff0c;检查发件人的电子邮件地址或电话号码是否与他们声称的身份相符。 3.验证…

无人机视角下火灾检测数据集 共12736张 标注文件为YOLO适用的txt格式。已划分为训练集、验证集、测试集。类别:Fire yolov5-v10通用

无人机视角下火灾检测数据集 共12736张 标注文件为YOLO适用的txt格式。已划分为训练集、验证集、测试集。类别&#xff1a;Fire yolov5-v10通用 无人机视角下火灾检测数据集 共12736张 标注文件为YOLO适用的txt格式。已划分为训练集、验证集、测试集。类别&#xff1a;Fire yol…

【微服务】微服务API网关详解:提升系统效率与安全性的关键策略

目录 引言一、什么是API网关&#xff1f;二、API网关的架构三、API网关的优势与劣势分析3.1 API网关的优势3.2 API网关的劣势 四、常见的API网关工具五、实现API网关的最佳实践结论 引言 在微服务架构中&#xff0c;API网关作为客户端与后端服务之间的中介&#xff0c;充当客户…

鸿蒙开发(NEXT/API 12)【公共事件发布】蜂窝通信服务

场景介绍 当需要发布某个自定义公共事件时&#xff0c;可以通过[publish()]方法发布事件。发布的公共事件可以携带数据&#xff0c;供订阅者解析并进行下一步处理。 注意 已发出的粘性公共事件后来订阅者也可以接收到&#xff0c;其他公共事件都需要先订阅再接收 接口说明 …

flutter assets配置加载本地图片报错

首选列出我在照着网上说的设置assets怎么搞都报错&#xff0c;错误如下&#xff0c;搞的我想骂娘。 flutter: uses-material-design: true assets: - assets/images 后来找到了下面这个教程&#xff0c;才终于解决&#xff0c;就是要在后面加一个"/" 。 flutter这个…

算法(C++实现)

从现在开始&#xff0c;开辟一个新的专题----算法&#xff0c;这篇文章介绍双指针&#xff0c;滑动窗口&#xff0c;二分查找这几种算法以及附有相应的练习题。 1.双指针 常见的双指针形式有两种&#xff0c;一种是对撞指针&#xff0c;一种是同向双指针。下面是三种对应情况…

MySQL 中 LIKE 语句的 `%` 和 `_` 以及 BLOB 和 TEXT 的详细解析和案例示范

1. LIKE 语句中的 % 和 _ 用法 1.1 % 通配符的用法 % 通配符代表零个或多个字符。它是 MySQL 中用于模糊匹配的强大工具之一&#xff0c;可以在任何字符的位置使用。 示例 1&#xff1a;查找以特定字符开头的记录 假设我们有一个电商订单系统的 orders 表&#xff0c;其中包…

OpenFeign 入门与实战:快速搭建 Spring Cloud 微服务客户端

1. 前言 随着微服务架构的流行&#xff0c;服务之间的通信变得越来越重要。Spring Cloud 提供了一系列工具来帮助开发者构建分布式系统&#xff0c;其中 OpenFeign 是一个轻量级的 HTTP 客户端&#xff0c;它简化了 Web 服务客户端的开发。本文将介绍如何在 Spring Cloud 应用…