目录
- JUC(java.util.concurrent)的常见类
- Callable 接口
- ReentrantLock
- 原子类
- 线程池
- ExecutorService 和 Executors
- ThreadPoolExecutor
- 信号量 Semaphore
- CountDownLatch
- 集合类
- 多线程环境使用 ArrayList
- 多线程环境使用队列
- 多线程环境使用哈希表
javautilconcurrent_5">JUC(java.util.concurrent)的常见类
concurrent:并发(多线程)
Callable 接口
Callable
是一个 interface
。也是一种创建多线程的方式,相当于把线程封装了一个 “返回值”。方便程序员借助多线程的方式计算结果。
Runnable
能表示一个任务(run方法),返回void
Callable
也能表示一个任务(call方法),返回一个具体的值,类型可以通过泛型参数来指定(Object)
如果进行多线程操作,如果你关心多线程执行的过程,使用Runnable
,比如线程池,定时器,就是用的Runnable
只关心过程
如果进行多线程操作,如果你关心多线程的计算结果,使用Callable
,比如通过多线程的方式计算一个公式,计算1+2+…+1000
代码示例: 创建线程计算 1 + 2 + 3 + … + 1000
不使用 Callable 版本
- 创建一个类 Result , 包含一个 sum 表示最终结果, lock 表示线程同步使用的锁对象.
- main 方法中先创建 Result 实例, 然后创建一个线程 t. 在线程内部计算 1 + 2 + 3 + … + 1000.
- 主线程同时使用 wait 等待线程 t 计算结束. (注意, 如果执行到 wait 之前, 线程 t 已经计算完了, 就不必等待了).
- 当线程 t 计算完毕后, 通过 notify 唤醒主线程, 主线程再打印结果.
java">static class Result {public int sum = 0;public Object lock = new Object();
}
public static void main(String[] args) throws InterruptedException {Result result = new Result();Thread t = new Thread() {@Overridepublic void run() {int sum = 0;for (int i = 1; i <= 1000; i++) {sum += i;}synchronized (result.lock) {result.sum = sum;result.lock.notify();}}};t.start();synchronized (result.lock) {while (result.sum == 0) {result.lock.wait();}System.out.println(result.sum);}
}
可以看到, 上述代码需要一个辅助类 Result, 还需要使用一系列的加锁和 wait notify 操作, 代码复杂, 容易出错.
使用 Callable 版本
- 创建一个匿名内部类, 实现 Callable 接口. Callable 带有泛型参数. 泛型参数表示返回值的类型.
- 重写 Callable 的 call 方法, 完成累加的过程. 直接通过返回值返回计算结果.
- 把 callable 实例使用 FutureTask 包装一下.
- 创建线程, 线程的构造方法传入 FutureTask . 此时新线程就会执行 FutureTask 内部的 Callable 的call 方法, 完成计算. 计算结果就放到了 FutureTask 对象中.
- 在主线程中调用
futureTask.get()
能够阻塞等待新线程计算完毕. 并获取到 FutureTask 中的结果.
java">Callable<Integer> callable = new Callable<Integer>() {@Overridepublic Integer call() throws Exception {int sum = 0;for (int i = 1; i <= 1000; i++) {sum += i;}return sum;}
};
FutureTask<Integer> futureTask = new FutureTask<>(callable);
Thread t = new Thread(futureTask);//使用Callable不能作为Thread的构造方法参数,是借助FutureTask
t.start();
int result = futureTask.get();//通过FutureTask获取Callable的call方法的结果,get类似join一样,如果call方法没算完会阻塞等待
System.out.println(result);
可以看到, 使用 Callable 和 FutureTask 之后, 代码简化了很多, 也不必手动写线程同步代码了.
理解Callable
Callable 和 Runnable 相对, 都是描述一个 “任务”. Callable 描述的是带有返回值的任务, Runnable 描述的是不带返回值的任务.
Callable 通常需要搭配 FutureTask 来使用. FutureTask 用来保存 Callable 的返回结果. 因为Callable 往往是在另一个线程中执行的, 啥时候执行完并不确定.
FutureTask 就可以负责这个等待结果出来的工作.
理解FutureTask
想象去吃麻辣烫. 当餐点好后, 后厨就开始做了. 同时前台会给你一张 “小票” . 这个小票就是FutureTask. 后面我们可以随时凭这张小票去查看自己的这份麻辣烫做出来了没
目前为止学到的创建线程的方式:
- 直接继承Thread
- 实现Runnable
- 使用lambda
- 使用线程池
- 使用Callable
其中1、2、5搭配匿名内部类使用
ReentrantLock
可重入互斥锁,和 synchronized 定位类似,都是用来实现互斥效果,保证线程安全
ReentrantLock的用法:
- lock():加锁,如果获取不到锁就死等
- trylock(超时时间):加锁,如果获取不到锁,等待一定的时间之后就放弃加锁
- unlock():解锁
ReentrantLock具有一些特点,是synchronized 不具备的功能:
- 提供了一个tryLock方法进行加锁:
对于lock操作,如果加锁不成功,就会阻塞等待(死等)
对于tryLock,如果加锁失败,直接返回false/也可以设定等待时间
- ReentrantLock有两种模式,可以在工作在公平锁状态下,也可以工作在非公平锁状态下。构造方法中通过参数设定的公平/非公平模式
- ReentrantLock也有等待通知机制,搭配Condition这样的类来完成,这里的等待通知要比wait、notify功能更强
- 但是ReentrantLock也可能容易遗漏
unlock()
,通常使用finally
来执行
ReentrantLock 和 synchronized 的区别:
- synchronized 是一个关键字,是 JVM 内部实现的(大概率是基于 C++ 实现)。ReentrantLock 是标准库的一个类,在 JVM 外实现的(基于 Java 实现)。
- synchronized 使用时不需要手动释放锁。ReentrantLock 使用时需要手动释放。使用起来更灵活,但是也容易遗漏 unlock.
- synchronized 在申请锁失败时,会死等. ReentrantLock 可以通过 trylock 的方式等待一段时间就放弃.
- synchronized 是非公平锁。ReentrantLock 默认是非公平锁,可以通过构造方法传入一个 true 开启公平锁模式.。
如何选择使用哪个锁?
- 锁竞争不激烈的时候,使用 synchronized,效率更高,自动释放更方便(实际开发也是用synchronized多)
- 锁竞争激烈的时候,使用 ReentrantLock,搭配 trylock 更灵活控制加锁的行为,而不是死等
- 如果需要使用公平锁,使用 ReentrantLock
原子类
原子类内部用的是 CAS 实现,所以性能要比加锁实现 i++ 高很多。原子类有以下几个
- AtomicBoolean
- AtomicInteger
- AtomicIntegerArray
- AtomicLong
- AtomicReference
- AtomicStampedReference
以 AtomicInteger 举例,常见方法有
java">addAndGet(int delta); i += delta;
decrementAndGet(); --i;
getAndDecrement(); i--;
incrementAndGet(); ++i;
getAndIncrement(); i++;
应用场景:
- 计数需求
播放量、点赞量…
- 统计效果
统计出现错误的请求数目、统计收到的请求数目(衡量服务器的压力)、统计每个请求的响应时间=>平均响应时间(衡量服务器的运行效率)…
通过上述统计内容,实现监控服务器,获取/统计/展示/报警
线程池
虽然创建销毁线程比创建销毁进程更轻量,但是在频繁创建销毁线程的时候还是会比较低效.
线程池就是为了解决这个问题。如果某个线程不再使用了,并不是真正把线程释放,而是放到一个 “池子” 中,下次如果需要用到线程就直接从池子中取,不必通过系统来创建了。
ExecutorService 和 Executors
代码示例:
- ExecutorService 表示一个线程池实例.
- Executors 是一个工厂类,能够创建出几种不同风格的线程池.
- ExecutorService 的 submit 方法能够向线程池中提交若干个任务.
java">ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(new Runnable() {@Overridepublic void run() {System.out.println("hello");}
});
Executors 创建线程池的几种方式
- newFixedThreadPool:创建固定线程数的线程池
- newCachedThreadPool:创建线程数目动态增长的线程池.
- newSingleThreadExecutor:创建只包含单个线程的线程池.
- newScheduledThreadPool:设定 延迟时间后执行命令,或者定期执行命令。是进阶版的 Timer.
Executors 本质上是 ThreadPoolExecutor 类的封装.
ThreadPoolExecutor
ThreadPoolExecutor 提供了更多的可选参数,可以进一步细化线程池行为的设定.
ThreadPoolExecutor 的构造方法
信号量 Semaphore
信号量, 用来表示 “可用资源的个数”. 本质上就是一个计数器,描述的是当前这个线程,是否“有临界资源可以用”
临界资源:多个线程/进程等并发执行的实体可以公共使用到的资源(多个线程修改同一个资源,这个变量就可以认为是临界资源)
理解信号量
可以把信号量想象成是停车场的展示牌: 当前有车位 100 个. 表示有 100 个可用资源.
当有车开进去的时候, 就相当于申请一个可用资源, 可用车位就 -1 (这个称为信号量的 P 操作)
当有车开出来的时候, 就相当于释放一个可用资源, 可用车位就 +1 (这个称为信号量的 V 操作)
如果计数器的值已经为 0 了, 还尝试申请资源, 就会阻塞等待, 直到有其他线程释放资源.
这个阻塞等待的过程有点像锁是吧。
锁,本质上就是一个特殊的信号量(里面的数值,非0即1,二元信号量)
信号量要比锁更广义,不仅可以描述一个资源,还可以描述N个资源。
但还是锁用的更多一些
Semaphore 的 PV 操作中的加减计数器操作都是原子的, 可以在多线程环境下直接使用.
CountDownLatch
针对特定场景一个组件
同时等待 N 个任务执行结束
好像跑步比赛,10个选手依次就位,哨声响才同时出发;所有选手都通过终点,才能公布成绩。
- 构造 CountDownLatch 实例,初始化 10 表示有 10 个任务需要完成.
- 每个任务执行完毕, 都调用 latch.countDown(),在 CountDownLatch 内部的计数器同时自减.
- 主线程中使用 latch.await(),阻塞等待所有任务执行完毕,相当于计数器为 0 了。(这里await的a是all的意思)
集合类
原来的集合类,大部分都不是线程安全的
Vector,Stack,HashTable,是线程安全的(不建议用),其他的集合类不是线程安全的
Vector、HashTable是上古时期搞出来的集合类
加了锁不一定就线程安全了,不加锁也不一定就线程不安全了
多线程环境使用 ArrayList
- 自己使用同步机制 (synchronized 或者 ReentrantLock)
Collections.synchronizedList(new ArrayList)
synchronizedList 是标准库提供的一个基于 synchronized 进行线程同步的 List.
synchronizedList 的关键操作上都带有 synchronized
相当于让ArrayList像Vector一样使用
- 使用 CopyOnWriteArrayList
CopyOnWrite容器即写时复制的容器。
- 当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,
- 添加完元素之后,再将原容器的引用指向新的容器。
这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会
添加任何元素。
所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
优点:
在读多写少的场景下, 性能很高, 不需要加锁竞争.
缺点:
- 占用内存较多.
- 新写的数据不能被第一时间读取到.
多线程环境使用队列
- ArrayBlockingQueue
基于数组实现的阻塞队列
- LinkedBlockingQueue
基于链表实现的阻塞队列
- PriorityBlockingQueue
基于堆实现的带优先级的阻塞队列
- TransferQueue
最多只包含一个元素的阻塞队列
多线程环境使用哈希表
HashMap 本身不是线程安全的
在多线程环境下使用哈希表可以使用:
- Hashtable
- ConcurrentHashMap
- Hashtable
只是简单的把关键方法加上了 synchronized 关键字
java">public synchronized V put(K key, V value) {
java">public synchronized V get(Object key) {
这相当于直接针对 Hashtable 对象本身加锁
- 如果多线程访问同一个 Hashtable 就会直接造成锁冲突
- size 属性也是通过 synchronized 来控制同步,也是比较慢的
- 一旦触发扩容, 就由该线程完成整个扩容过程。这个过程会涉及到大量的元素拷贝,效率会非常低
- ConcurrentHashMap
相比于 Hashtable 做出了一系列的改进和优化. 以 Java1.8 为例
- 读操作没有加锁(但是使用了 volatile 保证从内存读取结果), 只对写操作进行加锁. 加锁的方式仍然是用 synchronized, 但是不是锁整个对象, 而是 “锁桶” (用每个链表的头结点作为锁对象), 大大降低了锁冲突的概率.
- 充分利用 CAS 特性. 比如 size 属性通过 CAS 来更新. 避免出现重量级锁的情况.
- 优化了扩容方式: 化整为零
- 发现需要扩容的线程, 只需要创建一个新的数组, 同时只搬几个元素过去.
- 扩容期间, 新老数组同时存在.
- 后续每个来操作 ConcurrentHashMap 的线程, 都会参与搬家的过程. 每个操作负责搬运一小
- 部分元素.
- 搬完最后一个元素再把老数组删掉.
- 这个期间, 插入只往新数组加.
- 这个期间, 查找需要同时查新数组和老数组