回顾
什么是线程,什么是进程?
- 进程:是一个应用程序,里面包含很多线程
- 线程:进程执行的基本单元
java实现线程的几种方式
- 继承Thread类
- 实现Runable接口
线程的生命周期
执行线程会出现的问题
- 一个线程只能执行一个任务
- 线程执行完后销毁,不能复用
- 线程过多,会导致jvm宕机(正常一台8核【同时并行的线程最多只有8个,cpu会在2000个线程中来回切】,12g的服务器线程数正常只有2000个线程)
线程池
JUC
- JUC是一个工具类,用户高并发、处理多线程的一个包。
线程池解决了哪些问题
- 降低资源消耗
- 方便线程数的管控
- 功能强大,提供延时定时线程池
线程池引发了什么问题
- jvm宕机,提交的任务会消失
- 使用不合理,导致内存溢出
- 参数多,引入数据结构和算法,增加了学习难度
线程池的设计思想
-
线程维护
-
执行任务
-
状态监控
线程池的原理
- 线程池的结构图
- 最常用的是ThreadPoolExecutor,调度用ScheduleExecutorService;
线程池的工作状态
- RUNNING:线程池一单被创建,就是RUNNING状态。
- SHUTDOWN:不接受新的任务,但能处理已添加的任务。调用shutdown接口后,线程池的状态由RUNNING变成SHUTDOWN。
- STOP:不接受新的任务,不会去处理已经添加的任务,并且会中断住在处理的任务;【调用shutdownnow()接口之后线程池会由running和shutdown变成stop】
- TIDYING(终止状态):所有任务终止,队列中任务数量会变成0。会执行钩子函数terminated()方法。
线程池的参数定义
线程池的结构说明
- 任务提交给线程池,如果有空闲线程,则会将任务分配给空闲线程,如果没有空闲线程会先放到队列中去。
- 如果核心线程都满了,并且队列也满了,才会去只用最大线程数中的线程。
- 达到maxSize,根据拒绝策略处理。
线程池的工具类
确定线程池的线程数
创建合适的线程数才能提高性能
- io密集型任务
io操作时间长,cpu利用率不高,这类任务CPU常处于空闲状态。
此类型任务可以开cpu核心的两倍线程。比如cpu是4核的,可以开8个线程。
- cpu密集型任务
执行计算任务,cpu一直运行,cpu的利用率高 。
取相等的线程数。比如cpu是4核的,可以开4个线程。
- 混合型任务
既要执行逻辑运算,又要进行大量的io操作。针对不同类型的任务,创建不同的线程池。
最佳线程数 = ((线程等待时间+线程cpu时间)/ 线程cpu时间)* cpu核数
线程池的源码分析
- ExecutorService executorService = Executors.newFixedThreadPool(5);
- ExecutorService executorService1 = Executors.newSingleThreadExecutor();
- ExecutorService executorService2 = Executors.newCachedThreadPool();
任务先放在阻塞队列中在进行处理
- ExecutorService executorService3 = Executors.newScheduledThreadPool(5);
为什么大厂里禁止使用Executors工具类,怕使用不规范造成宕机的风险。一般都是自己new一个线程池:ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.HOURS, new LinkedBlockingQueue<>(1000));
threadPoolExecutor.execute(); //提交一个普通的任务
threadPoolExecutor.submit(); //提交一个有返回值的任务
- execute()
//提交任务代码public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();//判断工作数,如果小于coreSize -> addWork,注意第二个参数 core=trueif (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//否则,如果线程池还在运行,offer到队列if (isRunning(c) && workQueue.offer(command)) {//在检查一下状态int recheck = ctl.get();//线程池终止,直接移除任务if (! isRunning(recheck) && remove(command))reject(command);//如果没可用线程的话(coreSize=0),创建一个空work//该work创建时指派给任务(为null)。会被放入works集合,从队列获取任务去执行else if (workerCountOf(recheck) == 0)addWorker(null, false);}//队列已满,继续调addWork,但是注意,core=false,开启到maxSize的大门//超出max,addWork会返回false,进入rejectelse if (!addWorker(command, false))reject(command);}
- addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);//线程大于 2^29次方^-1,或者线程超出了规定的范围,返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();if (runStateOf(c) != rs)continue retry;}}//创建work放入works集合(一个hashSet)boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//符合条件,创建新的work包装成taskw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;//加锁,works是一个hashset,保证线程安全性mainLock.lock();try {int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {//add成功新的work,work立即启动t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
- 任务获取和执行
//执行runWorker()的时候,一直循环,如果携带task,就执行
while (task != null || (task = getTask()) != null)private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?//判断是不是超时处理,决定了当前线程是不是要释放boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//线程数量超出max,并且上次循环poll等待超时,说明该线程已经终止,将线程数量原子性 减if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {//计数器原子性递减,递减成功后,放回null,for终止if (compareAndDecrementWorkerCount(c))return null;//递减失败,继续下一轮循环continue;}try {//如果线程可被释放,那就poll,释放时间为keepAliveTime//否则,线程不会被释放,take一直被阻塞在这里,直到新的任务继续工作Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//到这说明可被释放的线程等待超时,已被销毁,设置标记,下次循环的线程减少timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
线程池的经典面试题
如果队列没有任务,核心线程一直阻塞获取任务的方法,直到返回任务。而任务执行完后,又会进入下一轮work.runWork()中循环。
//核心代码
//work.runWork
while (task != null || (task = getTask()) != null)//work.getTaskboolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//最关键Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):workQueue.take();
没有,被销毁的线程和创建先后无关。即使是第一个被创建的核心线程,仍有可能被销毁。
**验证:**每个work在runwork()的时候去getTask(),在getTask内部,并没有针对性的区分当前work是否是核心线程。只要判断work是都大于core,就会调poll(),否则take()。
- 阅读代码,查看执行结果
结果只会执行 1 和 2,因为队列不会满,只会执行核心线程数,而核心线程在 while(true) 中一直在执行。
- 线程池7个参数作用和生效时机
- int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue workQueue,
- ThreadFactory threadFactory
- RejectedExecutionHandler handler
- 为什么线程池不用数组,而用队列?