目录
- 一、什么是JUC
- 二、基本知识
- 2.1、进程和线程
- 2.2、Java默认有两个进程
- 2.3、Java能够开启线程吗?
- 2.4、并发和并行
- 2.5、线程的状态
- 2.6、wait和sleep的区别
- 2.7、什么是可重入锁
- 2.8、synchronized买票案例回顾
- 三、Lock锁
- 3.1、简介
- 3.2、买票问题重现
- 3.3、和Synchronized的区别
- 四、生产者消费者问题(Lock版)
- 4.1、Synchronized实现
- 虚假唤醒问题
- 4.2、Lock实现
- Condition精准通知唤醒
- 五、8个案例彻底理解锁的对象
- 六、安全集合类
- 6.1、CopyOnWriteArrayList
- ArrayList不安全
- 引入CopyOnWriteArrayList
- 6.2、CopyOnWriteArraySet
- HashSet不安全
- 引入CopyOnWriteArraySet
- 6.3、ConcurrentHashMap
- HashMap不安全
- 引入ConcurrentHashMap
- 七、Callable
- 7.1、同Runnable的区别
- 7.2、怎么实现
- 八、三大常用辅助类
- 8.1、CountDownLatch
- 8.2、CyclicBarrier
- 8.3、Semaphore
- 九、读写锁
- 十、阻塞队列
- 10.1、关系图
- 10.2、ArrayBlockingQueue四组API
- 抛出异常(add、remove、element)
- 不跑出异常(offer、poll、pick)
- 等待阻塞(put、take)
- 限时等待()
- 10.3、SynchronousQueue
- 十一、线程池
- 11.1、创建线程池三大方法
- newSingleThreadExecutor
- newFixedThreadPool
- newCachedThreadPool
- 11.2、七大参数
- 11.3、四种拒绝策略
- 11.4、最大线程数怎么定义?
- CPU密集型
- IO密集型
- 十二、四大函数式接口
- 什么是函数式接口
- 12.1、Function
- 12.2、Predicate
- 12.3、Consumer
- 12.4、Supplier
- 十三、Stream流式计算
- 13.1、什么是Stream流式计算
- 13.2、案例测试
- 十四、ForkJoin
- 14.1、什么是ForkJoin
- 14.2、ForkJoin的特点
- 14.3、案例:计算求和任务
- 十五、异步回调
- 十六、理解JMM&Volatile
- 16.1、请你谈谈对Volatile的理解
- 16.2、什么是JMM
- 16.3、Volatile特点
- 1. 保证可见性
- 2. 不保证原子性
- 3. 禁止指令重排
- 十七、彻底玩转单例模式
- 17.1、饿汉式
- 17.2、懒汉式
- 17.3、双重检测锁式
- 17.4、静态内部类式
- 反射破坏单例模式
- 17.5、枚举单例
- 十八、深入理解CAS
- 18.1、什么是CAS
- 18.2、ABA问题
- 十九、各种锁的理解
- 19.1、乐观锁/悲观锁
- 悲观锁(Pessimistic Lock)
- 乐观锁(Optimistic Locking)
- 19.2、公平/非公平锁
- 19.3、可重入锁
- 19.4、自旋锁
- 19.5、死锁
- 什么是死锁?
- 死锁问题排查
参考:
- 狂神说Java_JUC并发编程最新版通俗易懂
- 什么是乐观锁什么是悲观锁
一、什么是JUC
JUC
就是java.util.concurrent
工具包的简称。这是一个处理线程的工具包,JDK 1.5开始出现的。
二、基本知识
2.1、进程和线程
进程是程序的一次执行过程,包含进程控制块、程序段、数据三部分
1️⃣ 动态性
- 动态性是进程最基本的特征,表现为:由创建而产生,由调度而执行,得不到资源而暂停执行,由撤销而消亡;有一定的生命周期
- 程序只是一组有序的指令集合
2️⃣ 并发性
- 引入进程的目的就是和其他进程能并发执行
- 程序不能并发执行
3️⃣ 独立性
- 进程实体是一个能独立运行的基本单位,是系统中独立获得资源和独立调度的基本单位
- 程序不能作为一个独立的单位进行运行
2.2、Java默认有两个进程
Main
和GC
2.3、Java能够开启线程吗?
不行,Java是通过native本地方法调底层C++写的方法,Java无法直接操作硬件
2.4、并发和并行
- 并发:多个事件在同一时间间隔内发生(cpu一核,模拟出来多条线程快速交替运行)
- 并行:多个事件在同一时刻发生(cpu多核,多个线程可以同时执行)
查看cpu的核数
2.5、线程的状态
NEW | 尚未启动的线程处于此状态 |
---|---|
RUNNABLE | 在Java虚拟机中执行的线程处于此状态 |
BLOCKED | 被阻塞等待监视器锁定的线程处于此状态(IO操作,wait,juc锁定) |
WAITING | 正在等待另一个线程执行特定动作的线程处于此状态(sleep,join) |
TIMED_WAITING | 正在等待另一个线程执行动作达到指定等待时间的线程处于此状态(sleep,join) |
TERMINATED | 已退出的线程处于此状态。 |
2.6、wait和sleep的区别
- sleep不释放锁,wait释放锁
- 来自不同的类:sleep()函数在Thread类中,wait()函数属于Object类
- 使用范围不同:sleep可以在任何地方使用,wait只能使用在同步代码块中
2.7、什么是可重入锁
可重入,就是可以重复获取相同的锁而不会出现死锁;synchronized和ReentrantLock都是可重入的
// 演示可重入锁是什么意思,可重入,就是可以重复获取相同的锁
// synchronized和ReentrantLock都是可重入的
// 可重入降低了编程复杂性
public class WhatReentrant {public static void main(String[] args) {new Thread(new Runnable() {@Overridepublic void run() {synchronized (this) {System.out.println("第1次获取锁,这个锁是:" + this);int index = 1;while (true) {synchronized (this) {System.out.println("第" + (++index) + "次获取锁,这个锁是:" + this);}if (index == 10) {break;}}}}}).start();}
}
import java.util.Random;
import java.util.concurrent.locks.ReentrantLock;// 演示可重入锁是什么意思
public class WhatReentrant2 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();new Thread(new Runnable() {@Overridepublic void run() {try {lock.lock();System.out.println("第1次获取锁,这个锁是:" + lock);int index = 1;while (true) {try {lock.lock();System.out.println("第" + (++index) + "次获取锁,这个锁是:" + lock);try {Thread.sleep(new Random().nextInt(200));} catch (InterruptedException e) {e.printStackTrace();}if (index == 10) {break;}} finally {lock.unlock();}}} finally {lock.unlock();}}}).start();}
}
2.8、synchronized买票案例回顾
真正的开发中,线程只是一个资源类(包含属性),没有任何附属的操作
模拟卖票:
- 以前我们会将
Ticket
类继承Runnable接口 - 现在会将
Ticket
作为一个资源类,里面不添加关于线程的操作
package com.zsr;public class saleTicket {public static void main(String[] args) {//一份资源Ticket ticket = new Ticket();//多个线程new Thread(() -> {for (int i = 0; i < 30; i++) {ticket.sale();}}, "A").start();new Thread(() -> {for (int i = 0; i < 20; i++) {ticket.sale();}}, "B").start();new Thread(() -> {for (int i = 0; i < 80; i++) {ticket.sale();}}, "C").start();}
}//资源类
class Ticket {//票数private int number = 100;public synchronized void sale() {if (number > 0)System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余" + number);}
}
三、Lock锁
3.1、简介
官方文档地址:https://tool.oschina.net/apidocs/apidoc?api=jdk-zh
使用方法:创建锁、加锁、业务代码、解锁
3.2、买票问题重现
我们接下来用Lock锁
的方式来解决上述买票问题,Lock接口
最常用的实现类就是ReentrantLock
可重入锁
- 可以看到看到
ReentrantLock
有两个构造方法,可以指定使用 公平锁/非公平锁 - 公平锁:十分公平,先来后到
- 非公平锁:不公平,可以插队
修改Ticket
类
//资源类
class Ticket2 {//票数private int number = 100;//Lock锁Lock lock = new ReentrantLock();public void sale() {lock.lock();//加锁try {if (number > 0)System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余" + number);} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();//解锁}}
}
3.3、和Synchronized的区别
-
Synchronized
是内置的关键字,Lock
是一个Java类 -
Synchronized
无法判断锁的状态,Lock
可以判断是否获取到了锁 -
Synchronized
会自动释放锁,Lock
需要手动释放锁(如果不释放锁,会造成死锁) -
假如有两个线程:线程1、线程2;线程1获得了锁
Synchronized
:如果线程1阻塞了,线程2就会一直等待,造成死锁Lock
:如果线程1阻塞了,线程2不会一直等待,可以通过trylock()
方法尝试获取锁 -
两者都是可重入锁,但是
Synchronized
不可中断,为非公平锁;而Lock
锁可判断锁状态,并且可以设置为公平锁/非公平锁 -
Synchronized
适合锁少量代码同步代码,Lock
适合锁大量代码同步代码
四、生产者消费者问题(Lock版)
生产者消费者——线程之间的通信问题
- 通过
Synchronized
实现,我们常用object.wait()
+Object.notify()
- 通过
Lock
怎么实现呢,用condition.await()
+condition.signal()
4.1、Synchronized实现
两个线程A、B实现:
如果number不等于0,则number-1;如果number等于0,则number+1
package com.zsr;public class communicate {public static void main(String[] args) {Data data = new Data();new Thread(() -> {for (int i = 0; i < 5; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "A").start();new Thread(() -> {for (int i = 0; i < 5; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "B").start();}
}class Data {private int number = 0;//如果number=0,则number+1public synchronized void plus() throws InterruptedException {if (number != 0)this.wait();//等待number++;System.out.println(Thread.currentThread().getName() + "=>" + number);this.notifyAll();//唤醒其他线程}//如果number!=0,则number-1public synchronized void minor() throws InterruptedException {if (number == 0)this.wait();number--;System.out.println(Thread.currentThread().getName() + "=>" + number);this.notifyAll();//唤醒其他线程}
}
结果:
虚假唤醒问题
如果再加两个线程呢?
public static void main(String[] args) {Data data = new Data();new Thread(() -> {for (int i = 0; i < 5; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "A").start();new Thread(() -> {for (int i = 0; i < 5; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "B").start();new Thread(() -> {for (int i = 0; i < 5; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "C").start();new Thread(() -> {for (int i = 0; i < 5; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "D").start();
}
}
看结果,会出现2、3的情况;这就是因为if判断只判断一次,两个线程可能同时+1;造成了虚假唤醒的问题
修改代码:if
判断改成while
判断
class Data {private int number = 0;//如果number=0,则number+1public synchronized void plus() throws InterruptedException {while (number != 0)this.wait();//等待number++;System.out.println(Thread.currentThread().getName() + "=>" + number);this.notifyAll();//唤醒其他线程}//如果number!=0,则number-1public synchronized void minor() throws InterruptedException {while (number == 0)this.wait();number--;System.out.println(Thread.currentThread().getName() + "=>" + number);this.notifyAll();//唤醒其他线程}
}
4.2、Lock实现
修改代码:
package com.zsr;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class communicate2 {public static void main(String[] args) {Data2 data = new Data2();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "A").start();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "B").start();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "C").start();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "D").start();}
}class Data2 {private int number = 0;//创建Lock锁private Lock lock = new ReentrantLock();//获得conditionCondition condition = lock.newCondition();//如果number=0,则number+1public void plus() throws InterruptedException {lock.lock();//加锁try {while (number != 0)condition.await();//等待number++;System.out.println(Thread.currentThread().getName() + "=>" + number);condition.signalAll();//唤醒其他线程} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();//解锁}}//如果number!=0,则number-1public void minor() throws InterruptedException {lock.lock();try {while (number == 0)condition.await();number--;System.out.println(Thread.currentThread().getName() + "=>" + number);condition.signalAll();//唤醒其他线程} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}
}
根据结果,成功实现!但是发现一个问题,线程的执行都是随机的,怎么进行有序的实现呢?A=>B=>C=>D
Condition精准通知唤醒
可以设置多个condition监视器,每个监视器监视一个线程,精确等待唤醒某个线程
package com.zsr;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class communicate3 {public static void main(String[] args) {Data3 data = new Data3();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "A").start();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "B").start();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "C").start();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "D").start();}
}class Data3 {private int number = 0;//创建Lock锁private Lock lock = new ReentrantLock();//获得conditionCondition conditionA = lock.newCondition();Condition conditionB = lock.newCondition();Condition conditionC = lock.newCondition();Condition conditionD = lock.newCondition();//如果number=0,则number+1public void plus() throws InterruptedException {lock.lock();//加锁try {if (Thread.currentThread().getName()=="A") {while (number != 0)conditionA.await();//A等待number++;System.out.println(Thread.currentThread().getName() + "=>" + number);conditionB.signal();//唤醒B线程}if (Thread.currentThread().getName()=="C") {while (number != 0)conditionC.await();//C等待number++;System.out.println(Thread.currentThread().getName() + "=>" + number);conditionD.signal();//唤醒D线程}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();//解锁}}//如果number!=0,则number-1public void minor() throws InterruptedException {lock.lock();try {if (Thread.currentThread().getName()=="B") {while (number == 0)conditionB.await();//B等待number--;System.out.println(Thread.currentThread().getName() + "=>" + number);conditionC.signal();//唤醒C线程}if (Thread.currentThread().getName()=="D") {while (number == 0)conditionD.await();//D等待number--;System.out.println(Thread.currentThread().getName() + "=>" + number);conditionA.signal();//唤醒A线程}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}
}
结果:
五、8个案例彻底理解锁的对象
如何判断锁的是谁!永远的知道什么锁,锁到底锁的是谁(对象、 Class)
创建两个线程A、B,A线程执行发短信方法,B线程执行打电话方法,谁先执行?
package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone = new Phone();//线程A发消息new Thread(() -> {phone.send_message();}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);//线程B打电话new Thread(() -> {phone.call();}, "B").start();}
}class Phone {public synchronized void send_message() {System.out.println("发短信");}public synchronized void call() {System.out.println("打电话");}
}
结果:先发短信再打电话
那如果让发短信休眠2s呢?
package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone = new Phone();new Thread(() -> {try {phone.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone.call();}, "B").start();}
}class Phone {public synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public synchronized void call() {System.out.println("打电话");}
}
结果:还是先发短信再打电话
这是为什么呢?并不是因为A先执行,而是有锁的存在
synchronized
锁的对象是方法的调用者,也就是phone
对象,因此打电话和发短信方法锁的是同一个对象,谁先拿到锁谁就先执行
如果新增一个普通方法hello,那先是hello还是发短信呢?
package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone = new Phone();new Thread(() -> {try {phone.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone.hello();}, "B").start();}
}class Phone {public synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public synchronized void call() {System.out.println("打电话");}public void hello() {System.out.println("hello");}
}
根据结果,先执行hello,这是因为hello
是一个普通方法,没有锁,不受锁的影响
如果有两个
phone
对象,是先发短信还是先打电话
package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone1 = new Phone();Phone phone2 = new Phone();new Thread(() -> {try {phone1.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone2.call();}, "B").start();}
}class Phone {public synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public synchronized void call() {System.out.println("打电话");}
}
两个对象,两个调用者,所以有两把锁,所以按时间顺序执行
修改两个方法为静态方法,只有一个对象,是先打电话还是发短信
package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone = new Phone();new Thread(() -> {try {phone.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone.call();}, "B").start();}
}class Phone {public static synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public static synchronized void call() {System.out.println("打电话");}
}
根据结果,先发短信
因为static静态方法在类加载的时候就有了,因此这里synchronized
锁的是Class模板,也就是Phone.class
,全局唯一;也就是两个方法拿的仍是同一把锁
那如果两个方法为静态方法,有两个对象,是先打电话还是发短信?
package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone1 = new Phone();Phone phone2 = new Phone();new Thread(() -> {try {phone1.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone2.call();}, "B").start();}
}class Phone {public static synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public static synchronized void call() {System.out.println("打电话");}
}
根据结果,仍是先发短信
因为锁的是Phone.class
,也就是说两个方法仍是同一把锁
如果是一个普通的同步方法和一个静态的同步方法,只有一个对象
package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone = new Phone();new Thread(() -> {try {phone.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone.call();}, "B").start();}
}class Phone {public static synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public synchronized void call() {System.out.println("打电话");}
}
结果:
一个锁的是Phone.class
模板,一个锁的是phone
对象,因此两个方法不是一把锁,因此按时间顺序运行
如果是一个普通的同步方法和一个静态的同步方法,有两个对象
package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone1 = new Phone();Phone phone2 = new Phone();new Thread(() -> {try {phone1.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone2.call();}, "B").start();}
}class Phone {public static synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public synchronized void call() {System.out.println("打电话");}
}
同样两个方法不是一把锁,因此按时间顺序运行
六、安全集合类
6.1、CopyOnWriteArrayList
ArrayList不安全
并发情况下,ArrayList
不安全,我们通过一个简单的案例来测试:
package com.zsr.collection;import java.util.ArrayList;
import java.util.UUID;public class ListTest {public static void main(String[] args) {//并发情况下ArrayList<String> list = new ArrayList<>();for (int i = 0; i < 10; i++) {new Thread(() -> {list.add(UUID.randomUUID().toString().substring(0, 8));System.out.println(list);}, String.valueOf(i)).start();}}
}
根据结果,发现报错了ConcurrentModificationException
(并发修改异常)
那么怎么解决呢?
-
方案一:换成线程安全的
vector
Vector<String> lists = new Vector<>();
-
方案二:利用Collections工具类
List<String> list = Collections.synchronizedList(new ArrayList<>());
-
方案三:利用JUC包中的类
CopyOnWriteArrayList
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
引入CopyOnWriteArrayList
CopyOnWrite
简称COW
,是计算机程序设计领域的一种优化策略- 多个线程调用的时候,读取的时候固定,但写入时会复制,避免写入造成的数据覆盖问题
其效率比Vector
更高,因为Vector在方法上都用了Synchronized
关键字,会降低效率
而CopyOnWriteArratList
是用Lock
锁实现的,底层也是数组实现,不过添加的时候会先拷贝一份新的数组,最后再拷贝回去
6.2、CopyOnWriteArraySet
HashSet不安全
并发情况下,HashSet
不安全,我们通过一个简单的案例来测试:
package com.zsr.collection;import java.util.HashSet;
import java.util.UUID;public class SetTest {public static void main(String[] args) {//并发情况下HashSet<String> set = new HashSet<>();for (int i = 0; i < 30; i++) {new Thread(() -> {set.add(UUID.randomUUID().toString().substring(0, 8));System.out.println(set);}, String.valueOf(i)).start();}}
}
同样,ConcurrentModificationException
:并发修改异常
那么怎么解决呢?
-
方案一:利用Collections工具类
Set<String> set = Collections.synchronizedSet(new HashSet<String>());
-
方案三:利用JUC包中的类
CopyOnWriteArrayList
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
引入CopyOnWriteArraySet
CopyOnWrite
简称COW
,是计算机程序设计领域的一种优化策略- 多个线程调用的时候,读取的时候固定,但写入时会复制,避免写入造成的数据覆盖问题
CopyOnWriteArratSet
同样是用Lock
锁实现的,底层也是数组实现,不过添加的时候会先拷贝一份新的数组,最后再拷贝回去
6.3、ConcurrentHashMap
HashMap不安全
并发情况下,HashMap
不安全,我们通过一个简单的案例来测试:
package com.zsr.collection;import java.util.HashMap;
import java.util.UUID;public class MapTest {public static void main(String[] args) {//并发情况下HashMap<String, String> map = new HashMap<>();for (int i = 0; i < 30; i++) {new Thread(() -> {map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 8));System.out.println(map);}, String.valueOf(i)).start();}}
}
同样,ConcurrentModificationException
:并发修改异常
那么怎么解决呢?
-
方案一:利用Collections工具类
Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());
-
方案三:利用JUC包中的类
ConcurrentHashMap
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
引入ConcurrentHashMap
ConcurrentHashMap
融合了hashtable
和hashmap
二者的优势
但是hashtable每次同步执行的时候都要锁住整个结构。看下图:
ConcurrentHashMap正是为了解决这个问题而诞生的,其锁的方式是稍微细粒度的,引入了分段锁
的概念;
-
可以理解为把一个大的Map拆分成N(默认为16)个小的HashTable,根据key.hashCode()来决定把key放到哪个HashTable中。
-
在ConcurrentHashMap中,就是把Map分成了N个Segment,put和get的时候,都是现根据key.hashCode()算出放到哪个Segment中:
通过把整个Map分为N个Segment(类似HashTable),可以提供相同的线程安全;原来只能一个线程进入,现在却能同时16个写线程进入(写线程才需要锁定,而读线程几乎不受限制),并发性的提升是显而易见的
七、Callable
7.1、同Runnable的区别
- 可以有返回值
- 可以抛出异常
- 重写call()方法而不是run()
7.2、怎么实现
实现Runnable接口时,我们通过Thread.start()
进行启动,因为Thread
的构造方法可以传入Runnable
对象
那么怎么实现Callable呢?我们无法直接通过Thread
进行启动,但是我们可以通过Runnable
间接的启动
查看帮助文档,可以看到Runnable
接口有一个FutureTask
启动类,我们点进去看看
可以看到,它有两个构造方法,分别可以传入Callable
和Runnable
对象,这就将两者联系了起来
于事我们就可以通过Thread
来启动Callable
接口的实现类
package com.zsr;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;public class dome1 {public static void main(String[] args) throws ExecutionException, InterruptedException {myThread myThread = new myThread();FutureTask<String> futureTask = new FutureTask<String>(myThread);new Thread(futureTask).start();System.out.println(futureTask.get());//获取call方法返回值}
}class myThread implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("调用call方法");return "call方法执行完成";}
}
注意:
-
通过
FutureTask
的get()获取call方法的返回值,该方法可能会产生阻塞(可能返回结果需要大量的计算,很耗时),一般情况下将其放在最后一行或者使用异步通信来处理 -
FutureTask
任务多线程并发访问时为啥只会被执行一次
八、三大常用辅助类
8.1、CountDownLatch
就是一个减法计数器
package com.zsr.countDown;import java.util.concurrent.CountDownLatch;public class Test {public static void main(String[] args) throws InterruptedException {//创建一个计数器,初始化为6CountDownLatch countDownLatch = new CountDownLatch(6);for (int i = 1; i <= 6; i++) {new Thread(() -> {System.out.println(Thread.currentThread().getName() + "线程执行结束");countDownLatch.countDown();//计数器-1}, String.valueOf(i)).start();}countDownLatch.await();//等待计数器归0再往下执行System.out.println("所有线程执行完毕");}
}
如果不加countDownLatch.await()
加了之后
8.2、CyclicBarrier
相当于加法计数器
package com.zsr.Cyclic;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class Test {//模拟集齐7课龙珠召唤神龙public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {System.out.println("召唤神龙!");});for (int i = 1; i <= 7; i++) {final int temp = i;new Thread(() -> {System.out.println(Thread.currentThread().getName() + "收集了第" + temp + "颗龙珠");try {cyclicBarrier.await();//计数不断+1,直到为7} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}).start();}}
}
8.3、Semaphore
计数信号量
semaphore.acquire()
,获得许可正,如果许可证已经满了,等待其他线程释放许可证semaphore.release()
,释放许可证作用:多个共享资源互斥使用,并发限流,控制最大的线程数
package com.zsr.Sema;import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) {//模拟3个车位,6辆车要停车Semaphore semaphore = new Semaphore(3);for (int i = 1; i <= 6; i++) {new Thread(() -> {try {semaphore.acquire();//获取许可System.out.println(Thread.currentThread().getName() + "抢到车位");TimeUnit.SECONDS.sleep(2);System.out.println(Thread.currentThread().getName() + "离开车位");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();//释放许可}}, String.valueOf(i)).start();}}
}
九、读写锁
代码测试:定义一个缓存区用于读写操作,然后启动5个线程分别进行读和写,测试
-
首先测试不加锁的情况下
import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReadWriteLockDemo {public static void main(String[] args) {UnlockCache unlockCache = new UnlockCache();//5个线程写入for (int i = 1; i <= 5; i++) {final int temp = i;new Thread(() -> {unlockCache.put(temp, temp);}, String.valueOf(i)).start();}//5个线程读出for (int i = 1; i <= 5; i++) {final int temp = i;new Thread(() -> {unlockCache.get(temp);}, String.valueOf(i)).start();}} }//不加锁的 class UnlockCache {private volatile Map<Integer, Object> map = new HashMap<>();//写入public void put(int key, Object value) {System.out.println("开始写入" + key);map.put(key, value);System.out.println(key + "写入完成");}//读出public void get(int key) {System.out.println("开始读取" + key);map.get(key);System.out.println(key + "读取完毕");} }
根据结果,可以看到写入时被插队,这是不允许的! -
加读写锁,实现只能同时有一个线程写,多个线程读
也就是写锁为
独占锁
(一次只能被一个线程占有),读锁为共享锁
(多个线程可以同时占有)- 读-读:可共存
- 读-写:不可共存
- 写-写:不可共存
import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReadWriteLockDemo {public static void main(String[] args) {LockCache lockCache = new LockCache();//5个线程写入for (int i = 1; i <= 5; i++) {final int temp = i;new Thread(() -> {lockCache.put(temp, temp);}, String.valueOf(i)).start();}//5个线程读出for (int i = 1; i <= 5; i++) {final int temp = i;new Thread(() -> {lockCache.get(temp);}, String.valueOf(i)).start();}} }//加锁:实现同时只能有一个线程写,多个线程读 class LockCache {private volatile Map<Integer, Object> map = new HashMap<>();//读写锁:实现只能有一个线程写,多个线程写,更细粒度的控制ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();//写入:只能有一个线程写public void put(int key, Object value) {readWriteLock.writeLock().lock();try {System.out.println("开始写入" + key);map.put(key, value);System.out.println(key + "写入完成");} catch (Exception e) {e.printStackTrace();} finally {readWriteLock.writeLock().unlock();}}//读出:可以多个线程读public void get(int key) {readWriteLock.readLock().lock();try {System.out.println("开始读取" + key);map.get(key);System.out.println(key + "读取完毕");} catch (Exception e) {e.printStackTrace();} finally {readWriteLock.readLock().unlock();}} }
根据结果,我们实现了写入时不能被插队,但是读取可以多个线程读取
十、阻塞队列
10.1、关系图
BlockingQueue
关系图:
队列阻塞:
10.2、ArrayBlockingQueue四组API
方式 | 抛出异常 | 有返回值、不抛出异常 | 阻塞等待 | 限时等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer( , ) |
移除 | remove() | poll() | take() | poll( , ) |
判断队列首 | element() | peek() | \ | \ |
抛出异常(add、remove、element)
//抛出异常
public static void test1() {//初始化阻塞队列大小为3ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.add(1));System.out.println(blockingQueue.add(2));System.out.println(blockingQueue.add(3));//如果添加第四个元素,则发生异常java.lang.IllegalStateException: Queue fullSystem.out.println(blockingQueue.add(4));
}
//抛出异常
public static void test1() {//初始化阻塞队列大小为3ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.add(1));System.out.println(blockingQueue.add(2));System.out.println(blockingQueue.add(3));System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());//如果取出第四个元素,则发生异常java.util.NoSuchElementExceptionSystem.out.println(blockingQueue.remove());
}
//抛出异常
public static void test1() {//初始化阻塞队列大小为3ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.add(1));System.out.println(blockingQueue.add(2));System.out.println(blockingQueue.add(3));System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());//如果取出队首元素,则发生异常java.util.NoSuchElementExceptionSystem.out.println(blockingQueue.element());
}
不跑出异常(offer、poll、pick)
//不抛出异常
public static void test1() {//初始化阻塞队列大小为3ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.offer(1));System.out.println(blockingQueue.offer(2));System.out.println(blockingQueue.offer(3));//如果添加第四个元素,返回falseSystem.out.println(blockingQueue.offer(4));System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());//如果取出第四个元素,返回nullSystem.out.println(blockingQueue.poll());//如果取出队首元素,返回nullSystem.out.println(blockingQueue.peek());
}
等待阻塞(put、take)
//等待阻塞
public static void test1() throws InterruptedException {//初始化阻塞队列大小为3ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);blockingQueue.put(1);blockingQueue.put(2);blockingQueue.put(3);//如果添加第四个元素,程序阻塞blockingQueue.put(4);System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());//如果取出第四个元素,程序阻塞blockingQueue.take();
}
限时等待()
//限时等待
public static void test1() throws InterruptedException {//初始化阻塞队列大小为3ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.offer(1));System.out.println(blockingQueue.offer(2));System.out.println(blockingQueue.offer(3));//如果添加第四个元素,等待2s后程序结束System.out.println(blockingQueue.offer(4, 2, TimeUnit.SECONDS));System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());//如果取出第四个元素,等待2s后程序结束System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
}
10.3、SynchronousQueue
一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek
,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll()
将会返回 null
。对于其他 Collection
方法(例如 contains
),SynchronousQueue
作为一个空 collection。此队列不允许 null
元素。
public static void main(String[] args) throws InterruptedException {SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();//线程T1,存元素new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " put 1");synchronousQueue.put("1");System.out.println(Thread.currentThread().getName() + " put 2");synchronousQueue.put("2");System.out.println(Thread.currentThread().getName() + " put 3");synchronousQueue.put("3");} catch (InterruptedException e) {e.printStackTrace();}}, "T1").start();//线程T2,取元素new Thread(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}, "T2").start();
}
十一、线程池
池化技术:程序的运行就会占用系统资源就会占用系统资源,为了优化资源的使用,就引入了池化技术,事先准备好一些资源,需要使用就来取,用完即放回;例如 线程池、连接池、内存池、对象池
线程池的好处:线程复用、可以控制最大并发数、管理线程
- 降低资源消耗
- 提高响应速度
- 方便管理
11.1、创建线程池三大方法
如何创建线程池呢?java.util.concurrent
中提供了Executors
类
其中有一些静态方法用于创建线程池
newSingleThreadExecutor
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ThreadPool {public static void main(String[] args) {//创建单一线程池,只有一个线程执行ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();//创建线程池,可指定固定线程数量同时执行for (int i = 0; i < 20; i++) {singleThreadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " ok");});}//线程池用完,关闭线程池singleThreadPool.shutdown();}
}
newFixedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ThreadPool {public static void main(String[] args) {//创建线程池,可指定固定线程数量同时执行ExecutorService fixThreadPool = Executors.newFixedThreadPool(5);//使用线程池创建线程for (int i = 0; i < 20; i++) {fixThreadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " ok");});}//线程池用完,关闭线程池 fixThreadPool.shutdown();}
}
newCachedThreadPool
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程, 那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。 此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ThreadPool {public static void main(String[] args) {//创建可伸缩的线程池ExecutorService cachedThreadPool = Executors.newCachedThreadPool();//使用线程池创建线程for (int i = 0; i < 20; i++) {cachedThreadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " ok");});}//线程池用完,关闭线程池cachedThreadPool.shutdown();}
}
11.2、七大参数
查看newSingleThreadExecutor
、newFixedThreadPool
、newCachedThreadPool
的源码,可以发现本质上就是创建了一个ThreadPoolExcutor对象
再查看ThreadPoolExecutor
的源码,可以看到七大参数
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小int maximumPoolSize, //最大线程池大小long keepAliveTime, //存活时间(超时未调用则释放)TimeUnit unit, //超时单位BlockingQueue<Runnable> workQueue, //阻塞队列ThreadFactory threadFactory, //线程工程:创建线程,一般默认不改动RejectedExecutionHandler handler) //拒绝策略
{if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}
各种参数的含义好比银行办理业务,corePoolSize
是已经开放的服务窗口,BlockingQueue
是候客区,假设人流量非常大,就需要多开放几个服务窗口,maximumPoolSize
就是最大开放的服务窗口数;再假如很少有人办理业务,过了一定的时间就关闭窗口,keepAliveTime
就是要关闭窗口的事件;RejectedExecutionHandler
拒绝策略就好比银行满了,再来人就不让进了
可以看到newCachedThreadPool
的核心线程池大小设置未0,最大线程池大小设置为Integer.MAX_VALUE
,约等于21亿;也就是说通过Executors.newCachedThreadPool()创建的线程池可以支持并发的线程数介于0~21亿之间,这是十分耗费资源的
因此,阿里巴巴官方手册有以下规定:
11.3、四种拒绝策略
我们来自定义一个线程池,拒绝策略为AbortPolicy
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPool {public static void main(String[] args) {//自定义线程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,5,3,TimeUnit.SECONDS,new LinkedBlockingDeque<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());//最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常for (int i = 1; i <= 8; i++) {threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " ok");});}}
}
如果i<=9,超过了最大承载,则会抛出异常
修改拒绝策略为CallerRunsPolicy
:可以看到没有抛出异常,而是由main线程处理
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPool {public static void main(String[] args) {//自定义线程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,5,3,TimeUnit.SECONDS,new LinkedBlockingDeque<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());//最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常for (int i = 1; i <= 9; i++) {threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " ok");});}}
}
修改拒绝策略为DiscardPolicy
:可以看到不抛出异常不执行
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPool {public static void main(String[] args) {//自定义线程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,5,3,TimeUnit.SECONDS,new LinkedBlockingDeque<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());//最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常for (int i = 1; i <= 9; i++) {threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " ok");});}}
}
策略DiscardOldestPolicy
同DiscardPolicy
类似:不抛出异常不执行,但是会尝试竞争
11.4、最大线程数怎么定义?
CPU密集型
电脑的cpu是几核就定义为几,定义为常数换台电脑就不行了
//自定义线程池:CPU密集型
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,Runtime.getRuntime().availableProcessors(),3,TimeUnit.SECONDS,new LinkedBlockingDeque<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy()
);
IO密集型
大于 判断程序中耗费IO的线程 即可
十二、四大函数式接口
新时代的程序员:lambda表达式、链式编程、函数式接口、Stream流式计算
什么是函数式接口
函数式接口(Functional Interface)是jdk8引入的,有且仅有一个抽象方法,但是可以有多个非抽象方法的接口。并且这类接口使用了
@FunctionalInterface
进行注解,函数式接口可以被隐式转换为 lambda 表达式JDK 1.8 之前已有的函数式接口:
java.lang.Runnable
@FunctionalInterface public interface Runnable {public abstract void run(); }
java.util.concurrent.Callable
java.security.PrivilegedAction
java.util.Comparator
java.io.FileFilter
java.nio.file.PathMatcher
java.lang.reflect.InvocationHandler
java.beans.PropertyChangeListener
java.awt.event.ActionListener
javax.swing.event.ChangeListener
JDK 1.8 新增加的函数接口:
java.util.function
这个package中的接口大致分为了以下四类:
Function:接收参数,并返回结果,主要方法
R apply(T t)
Consumer:接收参数,无返回结果, 主要方法为
void accept(T t)
// forEach的参数就是消费者类型函数式接口Consumer @Override public void forEach(Consumer<? super E> action) {Objects.requireNonNull(action);final int expectedModCount = modCount;final Object[] es = elementData;final int size = this.size;for (int i = 0; modCount == expectedModCount && i < size; i++)action.accept(elementAt(es, i));if (modCount != expectedModCount)throw new ConcurrentModificationException(); }
Supplier:不接收参数,但返回结构,主要方法为
T get()
Predicate:接收参数,x返回boolean值,主要方法为
boolean test(T t)
12.1、Function
函数式接口:有参数且需要返回值
@FunctionalInterface
public interface Function<T, R> {//有参数且有返回值R apply(T t);default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {Objects.requireNonNull(before);return (V v) -> apply(before.apply(v));}default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {Objects.requireNonNull(after);return (T t) -> after.apply(apply(t));}static <T> Function<T, T> identity() {return t -> t;}
}
实现一个Function接口
package 函数式接口;import java.util.function.Function;public class Demo {public static void main(String[] args) {//匿名内部类,没有类的名称Function function1 = new Function<String, String>() {@Overridepublic String apply(String str) {return str;}};//修改为lambda表达式Function function2 = (str) -> {return str;};System.out.println(function1.apply("hello"));System.out.println(function2.apply("hello lambda"));}
}
12.2、Predicate
判断型接口:有参数,返回值为布尔型
@FunctionalInterface
public interface Predicate<T> {//有参数,返回值为布尔型boolean test(T t);default Predicate<T> and(Predicate<? super T> other) {Objects.requireNonNull(other);return (t) -> test(t) && other.test(t);}default Predicate<T> negate() {return (t) -> !test(t);}default Predicate<T> or(Predicate<? super T> other) {Objects.requireNonNull(other);return (t) -> test(t) || other.test(t);}static <T> Predicate<T> isEqual(Object targetRef) {return (null == targetRef)? Objects::isNull: object -> targetRef.equals(object);}@SuppressWarnings("unchecked")static <T> Predicate<T> not(Predicate<? super T> target) {Objects.requireNonNull(target);return (Predicate<T>)target.negate();}
}
实现一个Predicate接口
package 函数式接口;import java.util.function.Predicate;public class Demo1 {public static void main(String[] args) {//匿名内部类,没有类的名称Predicate<String> predicate1 = new Predicate<>() {@Overridepublic boolean test(String s) {return s.isEmpty();//判断字符串是否为空}};//修改为lambdaPredicate<String> predicate2 = (s) -> {return s.isEmpty();};System.out.println(predicate1.test("hello"));System.out.println(predicate2.test(""));}
}
12.3、Consumer
消费性接口:没有返回值,有参数
@FunctionalInterface
public interface Consumer<T> {//没有返回值,有参数void accept(T t);default Consumer<T> andThen(Consumer<? super T> after) {Objects.requireNonNull(after);return (T t) -> { accept(t); after.accept(t); };}
}
实现Consumer接口
package 函数式接口;import java.util.function.Consumer;public class Demo2 {public static void main(String[] args) {//匿名内部类,没有类的名称Consumer<String> consumer1 = new Consumer<>() {@Overridepublic void accept(String s) {System.out.println(s);}};//修改为lambdaConsumer consumer2 = (s) -> {System.out.println(s);};consumer1.accept("hello1");consumer1.accept("hello2");}
}
12.4、Supplier
供给型接口:无参数,指定返回值类型
@FunctionalInterface
public interface Supplier<T> {//无参数,指定返回值类型T get();
}
实现Supplier接口
package 函数式接口;import java.util.function.Supplier;public class Demo3 {public static void main(String[] args) {//匿名内部类,没有类的名称Supplier<String> supplier1 = new Supplier<>() {@Overridepublic String get() {return "hello1";}};//修改为lambdaSupplier supplier2 = () -> {return "hello2";};System.out.println(supplier1.get());System.out.println(supplier2.get());}
}
十三、Stream流式计算
13.1、什么是Stream流式计算
大数据时代分为存储+计算,存储交给数据库、集合等来处理,计算就交给流来做
Java中就提供了java.util.stream
用于流式计算
13.2、案例测试
package 流式计算;import java.util.Arrays;
import java.util.List;public class Test {public static void main(String[] args) {User user1 = new User("a", 1, 18);User user2 = new User("b", 2, 19);User user3 = new User("c", 3, 21);User user4 = new User("d", 4, 30);User user5 = new User("e", 5, 28);User user6 = new User("f", 6, 27);//list来存储数据List<User> users = Arrays.asList(user1, user2, user3, user4, user5, user6);//stream来计算//筛选出以下条件的用户://1.id为偶数//2.age>23//3.name转换为大写字母//4.name字母倒序排序//5.只输出一个用户users.stream().filter(user -> {//判断型接口Predicatereturn user.getId() % 2 == 0;}).filter(user -> {//判断型接口Predicatereturn user.getAge() > 23;}).map(user -> {//函数式接口Functionreturn user.getName().toUpperCase();}).sorted((u1, u2) -> {//Comparator函数式接口return u2.compareTo(u1);}).limit(1).forEach(System.out::println);}
}
十四、ForkJoin
14.1、什么是ForkJoin
ForkJoin出现于jdk1.7,用于并行执行任务 ,提高效率,用于大数据量的情况(分支合并)
大数据:Map Reduce——将大任务拆分成小任务
14.2、ForkJoin的特点
工作窃取:里面维护的都是双端队列
14.3、案例:计算求和任务
我们可以在JUC中找到ForkJoinPool
类
其中有一个方法,可用于执行一个ForkJoinTask
我们需要返回值,查看RecursiveTask
,可以找到compute
方法进行计算
1️⃣ 编写任务
package forkjoin;import java.util.concurrent.RecursiveTask;//计算任务
public class ForkJoinTask extends RecursiveTask<Long> {private long begin;private long end;public ForkJoinTask(long begin, long end) {this.begin = begin;this.end = end;}//计算方法:计算1加到1_0000_0000@Overrideprotected Long compute() {long sum = 0;if ((end - begin) < 10000) {//如果差值小于10000则暴力加for (long i = begin; i <= end; i++)sum += i;return sum;} else {//数据量大于10000采用forkjoin来计算long mid = (begin + end) / 2;//任务一ForkJoinTask task1 = new ForkJoinTask(begin, mid);task1.fork();//拆分任务,将线程压入队列//任务二ForkJoinTask task2 = new ForkJoinTask(mid + 1, end);task2.fork();//拆分任务,将线程压入队列return task1.join() + task2.join();}}
}
2️⃣ 测试比较
package forkjoin;import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.LongStream;public class Test {public static void main(String[] args) throws ExecutionException, InterruptedException {test1();test2();test3();}//普通遍历方式public static void test1() {long sum = 0;long start = System.currentTimeMillis();for (long i = 0; i <= 10_0000_0000; i++) {sum += i;}long end = System.currentTimeMillis();System.out.println("普通遍历法耗时" + (end - start) + "结果为:" + sum);}//ForkJoin方式public static void test2() throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinTask task = new ForkJoinTask(0, 10_0000_0000);java.util.concurrent.ForkJoinTask<Long> submit = forkJoinPool.submit(task);Long sum = submit.get();long end = System.currentTimeMillis();System.out.println("通过ForkJoin方式耗时" + (end - start) + "结果为:" + sum);}//Stream并行流方式public static void test3() {long start = System.currentTimeMillis();long sum = LongStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Long::sum);//rangeClosed(]long end = System.currentTimeMillis();System.out.println("通过Stream并行流方式耗时" + (end - start) + "结果为:" + sum);}
}
十五、异步回调
异步回调通常用CompletableFuture
发起两个异步请求,一个有返回结果,一个没有返回结果
package future;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;//异步调用异步执行:成功回调/失败回调
public class Demo1 {public static void main(String[] args) throws ExecutionException, InterruptedException {//发起一个请求(没有返回值的异步回调)CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "runAsync=>Void");});completableFuture1.get();//阻塞获取执行结果//有返回值的异步回调CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");return 1024;});System.out.println(completableFuture2.whenComplete((t, u) -> {//成功回调:正确的返回结果System.out.println("t=" + t);System.out.println("u=" + u);}).exceptionally((e) -> {//失败回调:错误的返回结果System.out.println(e.getMessage());return 233;}).get());}
}
如果有返回结果的异步回调报错,就会走失败回调的方法,返回233
十六、理解JMM&Volatile
16.1、请你谈谈对Volatile的理解
Volatile
是 JVM 提供的轻量级的同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排
怎么保证可见性?就需要和JMM挂钩
16.2、什么是JMM
Java内存模型,不存在的东西,是一种概念一种约定
关于JMM同步的约定:
- 线程解锁前,必须立刻把自己的共享变量刷回主存
- 线程加锁前,必须读取主存中的最新值到工作内存中
- 加锁和解锁是同一把锁
线程分为:工作内存、主内存
16.3、Volatile特点
Volatile
可以保证可见性,不能保证原子性,可以避免指令重排的现象
1. 保证可见性
package jmm;import java.util.concurrent.TimeUnit;public class Demo {private static int num;public static void main(String[] args) throws InterruptedException {new Thread(() -> {while (num == 0) ;}, "t").start();//main线程休眠1sTimeUnit.SECONDS.sleep(1);num = 1;System.out.println(num);}
}
开启一个线程,当num=0时不停的死循环;然后让主线程休眠1s后修改num=1,也就是将主内存中的num修改为1;看到结果程序并没有停止,这是因为t线程并没有拿到主内存中num最新的值,不知套其发生了变化,也就是t线程对main线程的变化不可见
如何解决呢?只需要通过volatile
关键字修饰num即可保证其的可见性
可以看到,程序立马停止了
2. 不保证原子性
什么是原子性?也就是一个线程执行的时候不能被打扰分割,要么同时成功要么同时失败
package jmm;public class Demo2 {//通过volatile不能保证原子性private volatile static int num;public static void add() {num++;}public static void main(String[] args) throws InterruptedException {//理论上num=20000for (int i = 0; i < 20; i++) {new Thread(() -> {for (int j = 0; j < 1000; j++) {add();}}).start();}//为了让20000条线程跑完,让main线程进行礼让(这里的2表示java固有的两个线程main和gc)while (Thread.activeCount() > 2) {Thread.yield();}System.out.println(Thread.currentThread().getName() + ":" + num);}
}
根据结果,发现volatile
并不保证原子性的操作,为什么不安全呢?
我们反编译看看,可以看到num++
一行代码在底层是多行操作,因此不能保证原子性,所以是不安全的
D:\学习\IDEA project\Test\out\production\Test\jmm>javap -c Demo2.class
Compiled from "Demo2.java"
public class jmm.Demo2 {public jmm.Demo2();Code:0: aload_01: invokespecial #1 // Method java/lang/Object."<init>":()V4: returnpublic static synchronized void add();Code:0: getstatic #7 // Field num:I3: iconst_14: iadd5: putstatic #7 // Field num:I8: returnpublic static void main(java.lang.String[]) throws java.lang.InterruptedException;Code:0: iconst_01: istore_12: iload_13: bipush 205: if_icmpge 298: new #13 // class java/lang/Thread11: dup12: invokedynamic #15, 0 // InvokeDynamic #0:run:()Ljava/lang/Runnable;17: invokespecial #19 // Method java/lang/Thread."<init>":(Ljava/lang/Runnable;)V20: invokevirtual #22 // Method java/lang/Thread.start:()V23: iinc 1, 126: goto 229: invokestatic #25 // Method java/lang/Thread.activeCount:()I32: iconst_233: if_icmple 4236: invokestatic #29 // Method java/lang/Thread.yield:()V
那如果不通过Lock和Synchronized 怎么保证原子性呢?
可以通过java.util.concurrent.atomic
包中的原子类解决原子问题 ,这些类的底层都直接和操作系统挂钩,在内存中修改值,这些类是特殊的存在
package jmm;import java.util.concurrent.atomic.AtomicInteger;public class Demo {//通过volatile不能保证原子性private volatile static AtomicInteger num = new AtomicInteger();public static void add() {num.getAndIncrement();//并不是简单的+1,而是利用了底层的CAS}public static void main(String[] args) throws InterruptedException {//理论上num=20000for (int i = 0; i < 20; i++) {new Thread(() -> {for (int j = 0; j < 1000; j++) {add();}}).start();}//为了让20000条线程跑完,让main线程进行礼让(这里的2表示java固有的两个线程main和gc)while (Thread.activeCount() > 2) {Thread.yield();}System.out.println(Thread.currentThread().getName() + ":" + num);}
}
3. 禁止指令重排
指令重排:计算机并不是按照我们编写的程序去执行
源代码–》编译器优化代码重排–》指令并行重排–》内存系统重排–》执行
处理器在指令重排的时候,会考虑数据之间的依赖性
int x = 1; //1
int y = 2; //2
x = x + 5; //3
y = x * x; //4按我们所期望的执行顺序是1->2->3->4
但实际上可能是2143或者1324,但不可能是4123
指令重排可能会导致一些错误的结果,如下图所示:
使用volatile
可以避免指令重排,底层实现是通过 内存屏障 实现的,可以保证特定的操作执行顺序,也可以保证某些变量的内存可见性
十七、彻底玩转单例模式
volatile
在单例模式中使用的最多
17.1、饿汉式
package 单例模式;//饿汉式
public class Hungry {private static Hungry hungry = new Hungry();//构造器私有private Hungry() {}public static Hungry getInstance() {return hungry;}
}
优点: static变量会在类装载时初始化,不涉及多个线程访问该对象的问题,可以省略synchronized关键字
缺点:类初始化时就创建了对象,如果只是加载本类,而不是要调用 getinstance(),甚至永远没有调用,则会造成资源浪费!
17.2、懒汉式
package 单例模式;//懒汉式
public class Lazy {private static Lazy lazy;private Lazy() {}public static Lazy getInstance() {if (lazy == null)lazy = new Lazy();return lazy;}
}
优点: 延迟加载,真正用的时候才实例化对象,提高了资源的利用率
缺点:存在并发访问的问题,以下测试并发访问情况
package 单例模式;//懒汉式
public class Lazy {private static Lazy lazy;private Lazy() {System.out.println("创建示例");}public static Lazy getInstance() {if (lazy == null)lazy = new Lazy();return lazy;}public static void main(String[] args) {//10条线程并发访问下for (int i = 0; i < 10; i++) {new Thread(() -> {Lazy.getInstance();}).start();}}
}
根据结果,可以看到有5个线程打印了结果,也就说进行了5次初始化,这是非常大的漏洞,出现了并发访问的问题
17.3、双重检测锁式
为了解决懒汉式并发访问的问题,加入了sychronized
关键字
package 单例模式;//双重检测锁式
public class DoubleLock {private static DoubleLock doubleLock;private DoubleLock() {System.out.println("创建示例");}public static DoubleLock getInstance() {if (doubleLock == null) {synchronized (Lazy.class) {if (doubleLock == null)doubleLock = new DoubleLock();}}return doubleLock;}public static void main(String[] args) {//10条线程并发访问下for (int i = 0; i < 10; i++) {new Thread(() -> {DoubleLock.getInstance();}).start();}}
}
根据打印结果,解决了并发访问的问题;但是这样仍然会存在问题,因为我们new
对象时并不是一个完整的原子性操作,而是分为以下三部:
- 分配内存空间
- 执行构造方法,初始化对象
- 把这个对象指向这个空间
单个线程A执行的情况下可以123按顺序执行,也可能由于指令重排按132执行;但是如果线程A按132顺序执行到3时来了一个线程B,此时该对象已经指向了分配的空间,因此B判断对象不是null,就会直接返回对象,但其实对象并没有进行初始化,就造成了错误
因此指令重排也会导致错误,因此完整的双重检测锁式
还加入了Volatile
关键字来避免指令重排,完整代码如下:
package 单例模式;//双重检测锁式
public class DoubleLock {private volatile static DoubleLock doubleLock;private DoubleLock() {System.out.println("创建示例");}public static DoubleLock getInstance() {if (doubleLock == null) {synchronized (Lazy.class) {if (doubleLock == null)doubleLock = new DoubleLock();}}return doubleLock;}
}
17.4、静态内部类式
package 单例模式;public class InnerClass {private InnerClass() {}//静态内部类里面创建对象public static class inner {private static final InnerClass innerClass = new InnerClass();}public static InnerClass getInstance() {return inner.innerClass;}
}
反射破坏单例模式
package 单例模式;import java.lang.reflect.Constructor;//双重检测锁式
public class DoubleLock {private volatile static DoubleLock doubleLock;private DoubleLock() {System.out.println("创建示例");}public static DoubleLock getInstance() {if (doubleLock == null) {synchronized (Lazy.class) {if (doubleLock == null)doubleLock = new DoubleLock();}}return doubleLock;}public static void main(String[] args) throws Exception {DoubleLock instance1 = doubleLock.getInstance();Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);constructor.setAccessible(true);DoubleLock instance2 = constructor.newInstance();System.out.println(instance1);System.out.println(instance2);}
}
根据结果,看到创建了两个实例,也就是单例模式被破坏,那么怎么解决呢?
可以在私有构造中加锁
package 单例模式;import java.lang.reflect.Constructor;//双重检测锁式
public class DoubleLock {private volatile static DoubleLock doubleLock;private DoubleLock() {synchronized (DoubleLock.class){if(doubleLock!=null){throw new RuntimeException("不要试图使用反射破坏异常");}}System.out.println("创建示例");}public static DoubleLock getInstance() {if (doubleLock == null) {synchronized (Lazy.class) {if (doubleLock == null)doubleLock = new DoubleLock();}}return doubleLock;}public static void main(String[] args) throws Exception {DoubleLock instance1 = doubleLock.getInstance();Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);constructor.setAccessible(true);DoubleLock instance2 = constructor.newInstance();System.out.println(instance1);System.out.println(instance2);}
}
根据结果,可以看到避免了单例模式的破坏?可是上述两个对象一个是通过单例获取,一个通过反射获取;
那如果两个对象都是通过反射获取呢?
public static void main(String[] args) throws Exception {Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);constructor.setAccessible(true);DoubleLock instance1= constructor.newInstance();DoubleLock instance2 = constructor.newInstance();System.out.println(instance1);System.out.println(instance2);
}
根据结果,可以看到单例模式又被破坏了,创建了两个对象!这种情况如何解决呢?
可以通过红绿灯方法实现,定义一个标志位记录对象是否创建
package 单例模式;import java.lang.reflect.Constructor;//双重检测锁式
public class DoubleLock {private volatile static DoubleLock doubleLock;//标志位private static boolean flag = false;private DoubleLock() {synchronized (DoubleLock.class) {if (flag == false)flag = true;elsethrow new RuntimeException("不要试图使用反射破坏异常");}System.out.println("创建示例");}public static DoubleLock getInstance() {if (doubleLock == null) {synchronized (Lazy.class) {if (doubleLock == null)doubleLock = new DoubleLock();}}return doubleLock;}public static void main(String[] args) throws Exception {Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);constructor.setAccessible(true);DoubleLock instance1 = constructor.newInstance();DoubleLock instance2 = constructor.newInstance();System.out.println(instance1);System.out.println(instance2);}
}
可以看到我们通过设置标志位flag
再次解决了这个问题,但是一旦被获取了这个关键字,单例模式仍然可以通过反射被破解,如下所示
public static void main(String[] args) throws Exception {Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);Field declaredField = DoubleLock.class.getDeclaredField("flag");constructor.setAccessible(true);declaredField.setAccessible(true);DoubleLock instance1 = constructor.newInstance();declaredField.set(instance1, false);//第一个对象创建完毕后将flag改为falseDoubleLock instance2 = constructor.newInstance();System.out.println(instance1);System.out.println(instance2);
}
可以看到单例模式再次被破坏;因此为了让程序更加安全,通常对flag
关键字进行加密处理
那么到底如何完全的避免反射破坏单例模式呢?我们查看newInstance
的源码
可以看到,如果是枚举类型的话,就不能通过反射获取枚举;
因此引入了第5种单例模式
17.5、枚举单例
package 单例模式;import java.lang.reflect.Constructor;//enum本质上就是一个Class类
public enum EnumSingle {INSTANCE;public static void main(String[] args) throws Exception {EnumSingle instance1 = EnumSingle.INSTANCE;Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(null);declaredConstructor.setAccessible(true);EnumSingle instance2 = declaredConstructor.newInstance();System.out.println(instance1);System.out.println(instance2);}
}
我们再次通过反射创建对象,根据结果报错没有EnumSingle的空构造方法,这不是我们希望看到的
我们对EnumSingle
的class文件进行反编译,可以看到明明有空构造方法
但是执行明明报错没有无参构造,我们使用更专业的反编译工具jad
对class文件再进行反编译
可以看到枚举类本质上就是继承了Enum
类,本身就是一个Class,而且没有无参构造,而是含两个参数的有参构造,我们修改代码在测试
public static void main(String[] args) throws Exception {EnumSingle instance1 = EnumSingle.INSTANCE;Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class, int.class);declaredConstructor.setAccessible(true);EnumSingle instance2 = declaredConstructor.newInstance();System.out.println(instance1);System.out.println(instance2);
}
这才正确显示了报错的信息:无法反射地创建枚举对象
十八、深入理解CAS
18.1、什么是CAS
CAS
是 compareAndSet 的缩写:比较并交换,是CPU的并发原语
package CAS探究;import java.util.concurrent.atomic.AtomicInteger;public class CASDemo {public static void main(String[] args) {AtomicInteger atomicInteger = new AtomicInteger(2020);//底层是CAS/*** public final boolean compareAndSet(int expectedValue, int newValue)* 期望、更新* 如果期望的值达到了就更新,否则不更新*/System.out.println(atomicInteger.compareAndSet(2020, 2021));//返回是否修改成功System.out.println(atomicInteger.get());System.out.println(atomicInteger.compareAndSet(2020, 2022));System.out.println(atomicInteger.get());}
}
我们再来看看 atomicInteger.getAndIncrement() 方法是怎么实现的?我们该方法的源码
可以看到是由U
调用了getAndAddInt()
方法,而U
就是Unsafe
类的一个实例
什么是
Unsafe
类
- Java无法操作内存,只能通过调用C++来操作内存,
Unsafe
就是Java通过C++操作内存的接口- 就类似于Java通过native关键字来调用C++本地方法来和操作系统交互
可以看到,底层是一个do while循环,也就是一个自旋锁
因此:CAS
就是比较当前工作内存中的值和主内存中的值,如果这个值是期望的,就执行操作;如果不是,就一直循环,因为底层是一个do while循环(自旋锁)
CAS有三个操作数:
- 期望的值
- 比较的值
- 更新的值
缺点:
- 底层是自旋锁,循环耗时
- 一次性只能保证一个共享变量的原子性
- 会存在ABA问题
18.2、ABA问题
比如有两个线程A,B同时向修改A的内容,但是B线程执行速度快,首先cas(1,3)将A修改为3,然后又执行cas(3,1)将A修改为1,这之后线程A再cas(1,2)将A修改为2,但此时A=1已经不是原来的1了;
这就是ABA问题
我们来个代码模拟以下
package CAS探究;import java.util.concurrent.atomic.AtomicInteger;public class ABADemo {//CAS是compareAndSet的缩写:比较并交换,是CPU的并发原语public static void main(String[] args) {AtomicInteger atomicInteger = new AtomicInteger(1);//底层是CAS//线程BSystem.out.println(atomicInteger.compareAndSet(1, 3));System.out.println(atomicInteger.get());System.out.println(atomicInteger.compareAndSet(3, 1));System.out.println(atomicInteger.get());//线程ASystem.out.println(atomicInteger.compareAndSet(1, 2));System.out.println(atomicInteger.get());}
}
可以看到三个结果都为true,但不是我们期望的,我们希望知道谁动过A的值
可以通过类似乐观锁的方案来解决,使用 原子引用类AtomicReference
/AtomicStampedReference
(带时间)
我们使用AtomicStampedReference
测试以下
package CAS探究;import java.util.concurrent.atomic.AtomicStampedReference;public class ABADemo {//CAS是compareAndSet的缩写:比较并交换,是CPU的并发原语public static void main(String[] args) {//public AtomicStampedReference(V initialRef, int initialStamp):这里的第二个参数等同于乐观锁的version,初始值设为1AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1);//线程B,多了连个参数:期望的版本号,更新的版本号System.out.println(atomicStampedReference.compareAndSet(1, 3,atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));System.out.println(atomicStampedReference.compareAndSet(3, 1,atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));//线程ASystem.out.println(atomicStampedReference.compareAndSet(1, 2, 1, 2));}
}
可以看到A成功察觉到了B修改过数据,所以执行失败;和乐观锁原理相同
注意:如果泛型是包装类,注意对象引用问题(正常业务都是对象,这里是使用包装类Integer进行测试)
如果我们的范围不再-128~127,则会失败
十九、各种锁的理解
19.1、乐观锁/悲观锁
悲观锁(Pessimistic Lock)
1️⃣ 简介
当要对数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。这种借助数据库锁机制,在修改数据之前先锁定,再修改的方式被称之为悲观并发控制【Pessimistic Concurrency Control,缩写“PCC”,又名“悲观锁”】
悲观锁,正如其名,具有强烈的独占和排他特性。它指的是对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度。因此,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。
之所以叫做悲观锁,是因为这是一种对数据的修改持有悲观态度的并发控制方式。总是假设最坏的情况,每次读取数据的时候都默认其他线程会更改数据,因此需要进行加锁操作,当其他线程想要访问数据时,都需要阻塞挂起。悲观锁的实现:
- 传统的关系型数据库使用这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
- Java 里面的同步 synchronized 关键字的实现。
2️⃣ 分类
悲观锁主要分为 共享锁
和 排他锁
共享锁
【shared locks】又称为读锁,简称S锁。顾名思义,共享锁就是多个事务对于同一数据可以共享一把锁,都能访问到数据,但是只能读不能修改。排他锁
【exclusive locks】又称为写锁,简称X锁。顾名思义,排他锁就是不能与其他锁并存,如果一个事务获取了一个数据行的排他锁,其他事务就不能再获取该行的其他锁,包括共享锁和排他锁,但是获取排他锁的事务是可以对数据行读取和修改。
3️⃣ 说明
悲观并发控制实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。但是在效率方面,处理加锁的机制会让数据库产生额外的开销,还有增加产生死锁的机会。另外还会降低并行性,一个事务如果锁定了某行数据,其他事务就必须等待该事务处理完才可以处理那行数据。
乐观锁(Optimistic Locking)
1️⃣ 简介
乐观锁是相对悲观锁而言的,乐观锁假设数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则返回给用户错误的信息,让用户决定如何去做。乐观锁适用于读操作多的场景,这样可以提高程序的吞吐量。
乐观锁机制采取了更加宽松的加锁机制。乐观锁是相对悲观锁而言,也是为了避免数据库幻读、业务处理时间过长等原因引起数据处理错误的一种机制,但乐观锁不会刻意使用数据库本身的锁机制,而是依据数据本身来保证数据的正确性。
2️⃣ 实现
- CAS实现:Java中java.util.concurrent.atomic包下面的原子变量使用了乐观锁的一种 CAS 实现方式
- 版本号控制:一般是在数据表中加上一个数据版本号 version 字段,表示数据被修改的次数。当数据被修改时,version 值会+1。当线程A要更新数据值时,在读取数据的同时也会读取 version 值,在提交更新时,若刚才读取到的 version 值与当前数据库中的 version 值相等时才更新,否则重试更新操作,直到更新成功
3️⃣ 说明
乐观并发控制相信事务之间的数据竞争(data race)的概率是比较小的,因此尽可能直接做下去,直到提交的时候才去锁定,所以不会产生任何锁和死锁
19.2、公平/非公平锁
- 公平锁:非常公平,不能插队,线程的执行必须先来后到
- 非公平锁:非常不公平,可以插队,默认都为非公平锁!(比如一个线程3s执行完,一个线程1min执行完,如果使用公平锁严重影响某个线程的效率)
19.3、可重入锁
可重入锁(递归锁)
代码示例:synchronized版
执行结果:
代码示例:Lock版
19.4、自旋锁
不断的尝试,直到成功为止!
我们来编写一个自旋锁
package 自旋锁;import java.util.concurrent.atomic.AtomicReference;//自定义自旋锁
public class SpinLock {//锁线程AtomicReference<Thread> atomicReference = new AtomicReference<>();//加锁public void myLock() {Thread thread = Thread.currentThread();System.out.println(thread.getName() + "==>myLock");//自旋锁while (!atomicReference.compareAndSet(null, thread)) ;}//解锁public void myUnlock() {Thread thread = Thread.currentThread();System.out.println(thread.getName() + "==>myUnLock");atomicReference.compareAndSet(thread, null);}
}
然后编写一段测试代码
package 自旋锁;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {SpinLock spinLock = new SpinLock();//线程T1new Thread(() -> {spinLock.myLock();try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();} finally {spinLock.myUnlock();}}, "T1").start();TimeUnit.SECONDS.sleep(1);//线程T2new Thread(() -> {spinLock.myLock();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} finally {spinLock.myUnlock();}}, "T2").start();}
}
根据结果,总是T1
线程解锁后,T2
线程才能解锁;因为如果T1
线程不解锁,T2
就会卡住在while循环不停的尝试cas直到thread=null为止
19.5、死锁
什么是死锁?
是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象
简单的死锁案例
package 死锁;import java.util.concurrent.TimeUnit;public class DeadLockDemo {public static void main(String[] args) {String lockA = "lockA";String lockB = "lockB";new Thread(new MyThread(lockA, lockB), "T1").start();new Thread(new MyThread(lockB, lockA), "T2").start();}
}class MyThread implements Runnable {private String lockA;private String lockB;public MyThread(String lockA, String lockB) {this.lockA = lockA;this.lockB = lockB;}@Overridepublic void run() {synchronized (lockA) {System.out.println(Thread.currentThread().getName() + "持有锁" + lockA + "尝试获取" + lockB);try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}synchronized (lockB) {System.out.println(Thread.currentThread().getName() + "持有锁" + lockB + "尝试获取" + lockA);}}}
}
根据运行结果,可以看到程序卡死,因为发生了死锁,因为T1和T2分别持有lockA和lockB,但又都试图获取对方的锁!
死锁问题排查
-
使用
jps -l
命令定位进程号 -
使用
jstack 进程号
查看指定进程的堆栈信息,找到死锁问题可以看到,控制台清晰的打印了找到死锁,并可以看到产生的原因就是
T1
和T2
互相尝试获取对方的锁