面试高频——JUC并发工具包快速上手(超详细总结)

news/2024/11/21 1:41:48/

目录

  • 一、什么是JUC
  • 二、基本知识
    • 2.1、进程和线程
    • 2.2、Java默认有两个进程
    • 2.3、Java能够开启线程吗?
    • 2.4、并发和并行
    • 2.5、线程的状态
    • 2.6、wait和sleep的区别
    • 2.7、什么是可重入锁
    • 2.8、synchronized买票案例回顾
  • 三、Lock锁
    • 3.1、简介
    • 3.2、买票问题重现
    • 3.3、和Synchronized的区别
  • 四、生产者消费者问题(Lock版)
    • 4.1、Synchronized实现
      • 虚假唤醒问题
    • 4.2、Lock实现
      • Condition精准通知唤醒
  • 五、8个案例彻底理解锁的对象
  • 六、安全集合类
    • 6.1、CopyOnWriteArrayList
      • ArrayList不安全
      • 引入CopyOnWriteArrayList
    • 6.2、CopyOnWriteArraySet
      • HashSet不安全
      • 引入CopyOnWriteArraySet
    • 6.3、ConcurrentHashMap
      • HashMap不安全
      • 引入ConcurrentHashMap
  • 七、Callable
    • 7.1、同Runnable的区别
    • 7.2、怎么实现
  • 八、三大常用辅助类
    • 8.1、CountDownLatch
    • 8.2、CyclicBarrier
    • 8.3、Semaphore
  • 九、读写锁
  • 十、阻塞队列
    • 10.1、关系图
    • 10.2、ArrayBlockingQueue四组API
      • 抛出异常(add、remove、element)
      • 不跑出异常(offer、poll、pick)
      • 等待阻塞(put、take)
      • 限时等待()
    • 10.3、SynchronousQueue
  • 十一、线程池
    • 11.1、创建线程池三大方法
      • newSingleThreadExecutor
      • newFixedThreadPool
      • newCachedThreadPool
    • 11.2、七大参数
    • 11.3、四种拒绝策略
    • 11.4、最大线程数怎么定义?
      • CPU密集型
      • IO密集型
  • 十二、四大函数式接口
    • 什么是函数式接口
    • 12.1、Function
    • 12.2、Predicate
    • 12.3、Consumer
    • 12.4、Supplier
  • 十三、Stream流式计算
    • 13.1、什么是Stream流式计算
    • 13.2、案例测试
  • 十四、ForkJoin
    • 14.1、什么是ForkJoin
    • 14.2、ForkJoin的特点
    • 14.3、案例:计算求和任务
  • 十五、异步回调
  • 十六、理解JMM&Volatile
    • 16.1、请你谈谈对Volatile的理解
    • 16.2、什么是JMM
    • 16.3、Volatile特点
      • 1. 保证可见性
      • 2. 不保证原子性
      • 3. 禁止指令重排
  • 十七、彻底玩转单例模式
    • 17.1、饿汉式
    • 17.2、懒汉式
    • 17.3、双重检测锁式
    • 17.4、静态内部类式
    • 反射破坏单例模式
    • 17.5、枚举单例
  • 十八、深入理解CAS
    • 18.1、什么是CAS
    • 18.2、ABA问题
  • 十九、各种锁的理解
    • 19.1、乐观锁/悲观锁
      • 悲观锁(Pessimistic Lock)
      • 乐观锁(Optimistic Locking)
    • 19.2、公平/非公平锁
    • 19.3、可重入锁
    • 19.4、自旋锁
    • 19.5、死锁
      • 什么是死锁?
      • 死锁问题排查


参考

  • 狂神说Java_JUC并发编程最新版通俗易懂
  • 什么是乐观锁什么是悲观锁

一、什么是JUC

JUC就是java.util.concurrent工具包的简称。这是一个处理线程的工具包,JDK 1.5开始出现的。

image-20210321110312209

image-20210321115140571



二、基本知识

2.1、进程和线程

进程是程序的一次执行过程,包含进程控制块、程序段、数据三部分

1️⃣ 动态性

  • 动态性是进程最基本的特征,表现为:由创建而产生,由调度而执行,得不到资源而暂停执行,由撤销而消亡;有一定的生命周期
  • 程序只是一组有序的指令集合

2️⃣ 并发性

  • 引入进程的目的就是和其他进程能并发执行
  • 程序不能并发执行

3️⃣ 独立性

  • 进程实体是一个能独立运行的基本单位,是系统中独立获得资源和独立调度的基本单位
  • 程序不能作为一个独立的单位进行运行

2.2、Java默认有两个进程

MainGC

2.3、Java能够开启线程吗?

不行,Java是通过native本地方法调底层C++写的方法,Java无法直接操作硬件

2.4、并发和并行

  • 并发:多个事件在同一时间间隔内发生(cpu一核,模拟出来多条线程快速交替运行)
  • 并行:多个事件在同一时刻发生(cpu多核,多个线程可以同时执行)

查看cpu的核数

image-20210321111429010

image-20210321111527079

image-20210321111556525

2.5、线程的状态

NEW尚未启动的线程处于此状态
RUNNABLE在Java虚拟机中执行的线程处于此状态
BLOCKED被阻塞等待监视器锁定的线程处于此状态(IO操作,wait,juc锁定)
WAITING正在等待另一个线程执行特定动作的线程处于此状态(sleep,join)
TIMED_WAITING正在等待另一个线程执行动作达到指定等待时间的线程处于此状态(sleep,join)
TERMINATED已退出的线程处于此状态。

2.6、wait和sleep的区别

  • sleep不释放锁,wait释放锁
  • 来自不同的类:sleep()函数在Thread类中,wait()函数属于Object类
  • 使用范围不同:sleep可以在任何地方使用,wait只能使用在同步代码块中

2.7、什么是可重入锁

可重入,就是可以重复获取相同的锁而不会出现死锁;synchronized和ReentrantLock都是可重入的

// 演示可重入锁是什么意思,可重入,就是可以重复获取相同的锁
// synchronized和ReentrantLock都是可重入的
// 可重入降低了编程复杂性
public class WhatReentrant {public static void main(String[] args) {new Thread(new Runnable() {@Overridepublic void run() {synchronized (this) {System.out.println("第1次获取锁,这个锁是:" + this);int index = 1;while (true) {synchronized (this) {System.out.println("第" + (++index) + "次获取锁,这个锁是:" + this);}if (index == 10) {break;}}}}}).start();}
}

image-20210321153436838

import java.util.Random;
import java.util.concurrent.locks.ReentrantLock;// 演示可重入锁是什么意思
public class WhatReentrant2 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();new Thread(new Runnable() {@Overridepublic void run() {try {lock.lock();System.out.println("第1次获取锁,这个锁是:" + lock);int index = 1;while (true) {try {lock.lock();System.out.println("第" + (++index) + "次获取锁,这个锁是:" + lock);try {Thread.sleep(new Random().nextInt(200));} catch (InterruptedException e) {e.printStackTrace();}if (index == 10) {break;}} finally {lock.unlock();}}} finally {lock.unlock();}}}).start();}
}

image-20210321153415399

2.8、synchronized买票案例回顾

真正的开发中,线程只是一个资源类(包含属性),没有任何附属的操作

模拟卖票

  • 以前我们会将Ticket类继承Runnable接口
  • 现在会将Ticket作为一个资源类,里面不添加关于线程的操作
package com.zsr;public class saleTicket {public static void main(String[] args) {//一份资源Ticket ticket = new Ticket();//多个线程new Thread(() -> {for (int i = 0; i < 30; i++) {ticket.sale();}}, "A").start();new Thread(() -> {for (int i = 0; i < 20; i++) {ticket.sale();}}, "B").start();new Thread(() -> {for (int i = 0; i < 80; i++) {ticket.sale();}}, "C").start();}
}//资源类
class Ticket {//票数private int number = 100;public synchronized void sale() {if (number > 0)System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余" + number);}
}


三、Lock锁

3.1、简介

官方文档地址:https://tool.oschina.net/apidocs/apidoc?api=jdk-zh

image-20210321133621186
使用方法:创建锁、加锁、业务代码、解锁
image-20210321134513971

3.2、买票问题重现

我们接下来用Lock锁的方式来解决上述买票问题,Lock接口最常用的实现类就是ReentrantLock可重入锁

  • 可以看到看到ReentrantLock有两个构造方法,可以指定使用 公平锁/非公平锁
  • 公平锁:十分公平,先来后到
  • 非公平锁:不公平,可以插队

image-20210321133926406
修改Ticket

//资源类
class Ticket2 {//票数private int number = 100;//Lock锁Lock lock = new ReentrantLock();public void sale() {lock.lock();//加锁try {if (number > 0)System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余" + number);} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();//解锁}}
}

3.3、和Synchronized的区别

  • Synchronized是内置的关键字,Lock是一个Java类

  • Synchronized无法判断锁的状态,Lock可以判断是否获取到了锁

  • Synchronized会自动释放锁,Lock需要手动释放锁(如果不释放锁,会造成死锁)

  • 假如有两个线程:线程1、线程2;线程1获得了锁

    Synchronized:如果线程1阻塞了,线程2就会一直等待,造成死锁

    Lock:如果线程1阻塞了,线程2不会一直等待,可以通过trylock()方法尝试获取锁

  • 两者都是可重入锁,但是Synchronized不可中断,为非公平锁;而Lock锁可判断锁状态,并且可以设置为公平锁/非公平锁

  • Synchronized适合锁少量代码同步代码,Lock适合锁大量代码同步代码



四、生产者消费者问题(Lock版)

生产者消费者——线程之间的通信问题

  • 通过Synchronized实现,我们常用 object.wait() + Object.notify()
  • 通过Lock怎么实现呢,用condition.await() + condition.signal()

image-20210321143716982

4.1、Synchronized实现

两个线程A、B实现:

如果number不等于0,则number-1;如果number等于0,则number+1

package com.zsr;public class communicate {public static void main(String[] args) {Data data = new Data();new Thread(() -> {for (int i = 0; i < 5; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "A").start();new Thread(() -> {for (int i = 0; i < 5; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "B").start();}
}class Data {private int number = 0;//如果number=0,则number+1public synchronized void plus() throws InterruptedException {if (number != 0)this.wait();//等待number++;System.out.println(Thread.currentThread().getName() + "=>" + number);this.notifyAll();//唤醒其他线程}//如果number!=0,则number-1public synchronized void minor() throws InterruptedException {if (number == 0)this.wait();number--;System.out.println(Thread.currentThread().getName() + "=>" + number);this.notifyAll();//唤醒其他线程}
}

结果:
image-20210321141752317

虚假唤醒问题

如果再加两个线程呢?

public static void main(String[] args) {Data data = new Data();new Thread(() -> {for (int i = 0; i < 5; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "A").start();new Thread(() -> {for (int i = 0; i < 5; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "B").start();new Thread(() -> {for (int i = 0; i < 5; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "C").start();new Thread(() -> {for (int i = 0; i < 5; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "D").start();
}
}

看结果,会出现2、3的情况;这就是因为if判断只判断一次,两个线程可能同时+1;造成了虚假唤醒的问题
image-20210321142530487
image-20210321142942399
修改代码:if判断改成while判断

class Data {private int number = 0;//如果number=0,则number+1public synchronized void plus() throws InterruptedException {while (number != 0)this.wait();//等待number++;System.out.println(Thread.currentThread().getName() + "=>" + number);this.notifyAll();//唤醒其他线程}//如果number!=0,则number-1public synchronized void minor() throws InterruptedException {while (number == 0)this.wait();number--;System.out.println(Thread.currentThread().getName() + "=>" + number);this.notifyAll();//唤醒其他线程}
}

4.2、Lock实现

image-20210321143908744
修改代码:

package com.zsr;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class communicate2 {public static void main(String[] args) {Data2 data = new Data2();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "A").start();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "B").start();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "C").start();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "D").start();}
}class Data2 {private int number = 0;//创建Lock锁private Lock lock = new ReentrantLock();//获得conditionCondition condition = lock.newCondition();//如果number=0,则number+1public void plus() throws InterruptedException {lock.lock();//加锁try {while (number != 0)condition.await();//等待number++;System.out.println(Thread.currentThread().getName() + "=>" + number);condition.signalAll();//唤醒其他线程} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();//解锁}}//如果number!=0,则number-1public void minor() throws InterruptedException {lock.lock();try {while (number == 0)condition.await();number--;System.out.println(Thread.currentThread().getName() + "=>" + number);condition.signalAll();//唤醒其他线程} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}
}

image-20210321145522108
根据结果,成功实现!但是发现一个问题,线程的执行都是随机的,怎么进行有序的实现呢?A=>B=>C=>D

Condition精准通知唤醒

可以设置多个condition监视器,每个监视器监视一个线程,精确等待唤醒某个线程

package com.zsr;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class communicate3 {public static void main(String[] args) {Data3 data = new Data3();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "A").start();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "B").start();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.plus();} catch (InterruptedException e) {e.printStackTrace();}}}, "C").start();new Thread(() -> {for (int i = 0; i < 10; i++) {try {data.minor();} catch (InterruptedException e) {e.printStackTrace();}}}, "D").start();}
}class Data3 {private int number = 0;//创建Lock锁private Lock lock = new ReentrantLock();//获得conditionCondition conditionA = lock.newCondition();Condition conditionB = lock.newCondition();Condition conditionC = lock.newCondition();Condition conditionD = lock.newCondition();//如果number=0,则number+1public void plus() throws InterruptedException {lock.lock();//加锁try {if (Thread.currentThread().getName()=="A") {while (number != 0)conditionA.await();//A等待number++;System.out.println(Thread.currentThread().getName() + "=>" + number);conditionB.signal();//唤醒B线程}if (Thread.currentThread().getName()=="C") {while (number != 0)conditionC.await();//C等待number++;System.out.println(Thread.currentThread().getName() + "=>" + number);conditionD.signal();//唤醒D线程}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();//解锁}}//如果number!=0,则number-1public void minor() throws InterruptedException {lock.lock();try {if (Thread.currentThread().getName()=="B") {while (number == 0)conditionB.await();//B等待number--;System.out.println(Thread.currentThread().getName() + "=>" + number);conditionC.signal();//唤醒C线程}if (Thread.currentThread().getName()=="D") {while (number == 0)conditionD.await();//D等待number--;System.out.println(Thread.currentThread().getName() + "=>" + number);conditionA.signal();//唤醒A线程}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}
}

结果:
image-20210321151156051



五、8个案例彻底理解锁的对象

如何判断锁的是谁!永远的知道什么锁,锁到底锁的是谁(对象、 Class)

创建两个线程A、B,A线程执行发短信方法,B线程执行打电话方法,谁先执行?

package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone = new Phone();//线程A发消息new Thread(() -> {phone.send_message();}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);//线程B打电话new Thread(() -> {phone.call();}, "B").start();}
}class Phone {public synchronized void send_message() {System.out.println("发短信");}public synchronized void call() {System.out.println("打电话");}
}

结果:先发短信再打电话
image-20210321190231935

那如果让发短信休眠2s呢?

package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone = new Phone();new Thread(() -> {try {phone.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone.call();}, "B").start();}
}class Phone {public synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public synchronized void call() {System.out.println("打电话");}
}

结果:还是先发短信再打电话
image-20210321190405716
这是为什么呢?并不是因为A先执行,而是有锁的存在

  • synchronized锁的对象是方法的调用者,也就是phone对象,因此打电话和发短信方法锁的是同一个对象,谁先拿到锁谁就先执行

如果新增一个普通方法hello,那先是hello还是发短信呢?

package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone = new Phone();new Thread(() -> {try {phone.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone.hello();}, "B").start();}
}class Phone {public synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public synchronized void call() {System.out.println("打电话");}public void hello() {System.out.println("hello");}
}

image-20210321193511631
根据结果,先执行hello,这是因为hello是一个普通方法,没有锁,不受锁的影响

如果有两个phone对象,是先发短信还是先打电话

package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone1 = new Phone();Phone phone2 = new Phone();new Thread(() -> {try {phone1.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone2.call();}, "B").start();}
}class Phone {public synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public synchronized void call() {System.out.println("打电话");}
}

两个对象,两个调用者,所以有两把锁,所以按时间顺序执行

image-20210321194003558

修改两个方法为静态方法,只有一个对象,是先打电话还是发短信

package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone = new Phone();new Thread(() -> {try {phone.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone.call();}, "B").start();}
}class Phone {public static synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public static synchronized void call() {System.out.println("打电话");}
}

根据结果,先发短信
image-20210321194256128
因为static静态方法在类加载的时候就有了,因此这里synchronized锁的是Class模板,也就是Phone.class,全局唯一;也就是两个方法拿的仍是同一把锁

那如果两个方法为静态方法,有两个对象,是先打电话还是发短信?

package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone1 = new Phone();Phone phone2 = new Phone();new Thread(() -> {try {phone1.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone2.call();}, "B").start();}
}class Phone {public static synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public static synchronized void call() {System.out.println("打电话");}
}

根据结果,仍是先发短信
image-20210321194617283
因为锁的是Phone.class,也就是说两个方法仍是同一把锁

如果是一个普通的同步方法和一个静态的同步方法,只有一个对象

package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone = new Phone();new Thread(() -> {try {phone.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone.call();}, "B").start();}
}class Phone {public static synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public synchronized void call() {System.out.println("打电话");}
}

结果:
image-20210322095928191
一个锁的是Phone.class模板,一个锁的是phone对象,因此两个方法不是一把锁,因此按时间顺序运行

如果是一个普通的同步方法和一个静态的同步方法,有两个对象

package com.zsr.lock8;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {Phone phone1 = new Phone();Phone phone2 = new Phone();new Thread(() -> {try {phone1.send_message();} catch (InterruptedException e) {e.printStackTrace();}}, "A").start();//休眠1sTimeUnit.SECONDS.sleep(1);new Thread(() -> {phone2.call();}, "B").start();}
}class Phone {public static synchronized void send_message() throws InterruptedException {//休眠2sTimeUnit.SECONDS.sleep(2);System.out.println("发短信");}public synchronized void call() {System.out.println("打电话");}
}

同样两个方法不是一把锁,因此按时间顺序运行
image-20210322100003462



六、安全集合类

image-20210322100452113

6.1、CopyOnWriteArrayList

ArrayList不安全

并发情况下,ArrayList不安全,我们通过一个简单的案例来测试:

package com.zsr.collection;import java.util.ArrayList;
import java.util.UUID;public class ListTest {public static void main(String[] args) {//并发情况下ArrayList<String> list = new ArrayList<>();for (int i = 0; i < 10; i++) {new Thread(() -> {list.add(UUID.randomUUID().toString().substring(0, 8));System.out.println(list);}, String.valueOf(i)).start();}}
}


根据结果,发现报错了ConcurrentModificationException(并发修改异常)

那么怎么解决呢?

  • 方案一:换成线程安全的vector

    Vector<String> lists = new Vector<>();
    
  • 方案二:利用Collections工具类

    List<String> list = Collections.synchronizedList(new ArrayList<>());
    
  • 方案三:利用JUC包中的类CopyOnWriteArrayList

    CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
    

引入CopyOnWriteArrayList

image-20210322141229600

  • CopyOnWrite简称COW,是计算机程序设计领域的一种优化策略
  • 多个线程调用的时候,读取的时候固定,但写入时会复制,避免写入造成的数据覆盖问题

其效率比Vector更高,因为Vector在方法上都用了Synchronized关键字,会降低效率

CopyOnWriteArratList是用Lock锁实现的,底层也是数组实现,不过添加的时候会先拷贝一份新的数组,最后再拷贝回去
image-20210322134138150
image-20210322134213815


6.2、CopyOnWriteArraySet

HashSet不安全

并发情况下,HashSet不安全,我们通过一个简单的案例来测试:

package com.zsr.collection;import java.util.HashSet;
import java.util.UUID;public class SetTest {public static void main(String[] args) {//并发情况下HashSet<String> set = new HashSet<>();for (int i = 0; i < 30; i++) {new Thread(() -> {set.add(UUID.randomUUID().toString().substring(0, 8));System.out.println(set);}, String.valueOf(i)).start();}}
}

同样,ConcurrentModificationException:并发修改异常
image-20210322134948437
那么怎么解决呢?

  • 方案一:利用Collections工具类

    Set<String> set = Collections.synchronizedSet(new HashSet<String>());
    
  • 方案三:利用JUC包中的类CopyOnWriteArrayList

    CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
    

引入CopyOnWriteArraySet

image-20210322141215322

  • CopyOnWrite简称COW,是计算机程序设计领域的一种优化策略
  • 多个线程调用的时候,读取的时候固定,但写入时会复制,避免写入造成的数据覆盖问题

CopyOnWriteArratSet同样是用Lock锁实现的,底层也是数组实现,不过添加的时候会先拷贝一份新的数组,最后再拷贝回去
image-20210322135648975
image-20210322135621456


6.3、ConcurrentHashMap

HashMap不安全

并发情况下,HashMap不安全,我们通过一个简单的案例来测试:

package com.zsr.collection;import java.util.HashMap;
import java.util.UUID;public class MapTest {public static void main(String[] args) {//并发情况下HashMap<String, String> map = new HashMap<>();for (int i = 0; i < 30; i++) {new Thread(() -> {map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 8));System.out.println(map);}, String.valueOf(i)).start();}}
}

同样,ConcurrentModificationException:并发修改异常
image-20210322140807339
那么怎么解决呢?

  • 方案一:利用Collections工具类

    Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());
    
  • 方案三:利用JUC包中的类ConcurrentHashMap

    ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
    

引入ConcurrentHashMap

image-20210322141145100
ConcurrentHashMap融合了hashtablehashmap二者的优势

但是hashtable每次同步执行的时候都要锁住整个结构。看下图:

image-20210414145603675
ConcurrentHashMap正是为了解决这个问题而诞生的,其锁的方式是稍微细粒度的,引入了分段锁的概念;

  • 可以理解为把一个大的Map拆分成N(默认为16)个小的HashTable,根据key.hashCode()来决定把key放到哪个HashTable中。

  • 在ConcurrentHashMap中,就是把Map分成了N个Segment,put和get的时候,都是现根据key.hashCode()算出放到哪个Segment中:

通过把整个Map分为N个Segment(类似HashTable),可以提供相同的线程安全;原来只能一个线程进入,现在却能同时16个写线程进入(写线程才需要锁定,而读线程几乎不受限制),并发性的提升是显而易见的



七、Callable

image-20210322144924183

image-20210323075320880

7.1、同Runnable的区别

image-20210323075243997

  • 可以有返回值
  • 可以抛出异常
  • 重写call()方法而不是run()

7.2、怎么实现

实现Runnable接口时,我们通过Thread.start()进行启动,因为Thread的构造方法可以传入Runnable对象
image-20210323080136708
那么怎么实现Callable呢?我们无法直接通过Thread进行启动,但是我们可以通过Runnable间接的启动
image-20210323080306487
查看帮助文档,可以看到Runnable接口有一个FutureTask启动类,我们点进去看看
image-20210323080439782
可以看到,它有两个构造方法,分别可以传入CallableRunnable对象,这就将两者联系了起来
image-20210323081206052
于事我们就可以通过Thread来启动Callable接口的实现类

package com.zsr;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;public class dome1 {public static void main(String[] args) throws ExecutionException, InterruptedException {myThread myThread = new myThread();FutureTask<String> futureTask = new FutureTask<String>(myThread);new Thread(futureTask).start();System.out.println(futureTask.get());//获取call方法返回值}
}class myThread implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("调用call方法");return "call方法执行完成";}
}

image-20210323080953589
注意

  • 通过FutureTask的get()获取call方法的返回值,该方法可能会产生阻塞(可能返回结果需要大量的计算,很耗时),一般情况下将其放在最后一行或者使用异步通信来处理

  • FutureTask任务多线程并发访问时为啥只会被执行一次
    image-20210323081830058



八、三大常用辅助类

8.1、CountDownLatch

就是一个减法计数器

image-20210323082713513

image-20210323083302699

package com.zsr.countDown;import java.util.concurrent.CountDownLatch;public class Test {public static void main(String[] args) throws InterruptedException {//创建一个计数器,初始化为6CountDownLatch countDownLatch = new CountDownLatch(6);for (int i = 1; i <= 6; i++) {new Thread(() -> {System.out.println(Thread.currentThread().getName() + "线程执行结束");countDownLatch.countDown();//计数器-1}, String.valueOf(i)).start();}countDownLatch.await();//等待计数器归0再往下执行System.out.println("所有线程执行完毕");}
}

如果不加countDownLatch.await()
image-20210323083117066
加了之后
image-20210323083141410

8.2、CyclicBarrier

相当于加法计数器

image-20210323083357327

image-20210323083929351

package com.zsr.Cyclic;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class Test {//模拟集齐7课龙珠召唤神龙public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {System.out.println("召唤神龙!");});for (int i = 1; i <= 7; i++) {final int temp = i;new Thread(() -> {System.out.println(Thread.currentThread().getName() + "收集了第" + temp + "颗龙珠");try {cyclicBarrier.await();//计数不断+1,直到为7} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}).start();}}
}

8.3、Semaphore

计数信号量

image-20210323084317870

  • semaphore.acquire(),获得许可正,如果许可证已经满了,等待其他线程释放许可证
  • semaphore.release(),释放许可证

作用:多个共享资源互斥使用,并发限流,控制最大的线程数

package com.zsr.Sema;import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) {//模拟3个车位,6辆车要停车Semaphore semaphore = new Semaphore(3);for (int i = 1; i <= 6; i++) {new Thread(() -> {try {semaphore.acquire();//获取许可System.out.println(Thread.currentThread().getName() + "抢到车位");TimeUnit.SECONDS.sleep(2);System.out.println(Thread.currentThread().getName() + "离开车位");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();//释放许可}}, String.valueOf(i)).start();}}
}

image-20210323084704901



九、读写锁

image-20210325230129179
代码测试:定义一个缓存区用于读写操作,然后启动5个线程分别进行读和写,测试

  • 首先测试不加锁的情况下

    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReadWriteLockDemo {public static void main(String[] args) {UnlockCache unlockCache = new UnlockCache();//5个线程写入for (int i = 1; i <= 5; i++) {final int temp = i;new Thread(() -> {unlockCache.put(temp, temp);}, String.valueOf(i)).start();}//5个线程读出for (int i = 1; i <= 5; i++) {final int temp = i;new Thread(() -> {unlockCache.get(temp);}, String.valueOf(i)).start();}}
    }//不加锁的
    class UnlockCache {private volatile Map<Integer, Object> map = new HashMap<>();//写入public void put(int key, Object value) {System.out.println("开始写入" + key);map.put(key, value);System.out.println(key + "写入完成");}//读出public void get(int key) {System.out.println("开始读取" + key);map.get(key);System.out.println(key + "读取完毕");}
    }
    

    image-20210325233019529
    根据结果,可以看到写入时被插队,这是不允许的!

  • 加读写锁,实现只能同时有一个线程写,多个线程读

    也就是写锁为独占锁(一次只能被一个线程占有),读锁为共享锁(多个线程可以同时占有)

    • 读-读:可共存
    • 读-写:不可共存
    • 写-写:不可共存
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReadWriteLockDemo {public static void main(String[] args) {LockCache lockCache = new LockCache();//5个线程写入for (int i = 1; i <= 5; i++) {final int temp = i;new Thread(() -> {lockCache.put(temp, temp);}, String.valueOf(i)).start();}//5个线程读出for (int i = 1; i <= 5; i++) {final int temp = i;new Thread(() -> {lockCache.get(temp);}, String.valueOf(i)).start();}}
    }//加锁:实现同时只能有一个线程写,多个线程读
    class LockCache {private volatile Map<Integer, Object> map = new HashMap<>();//读写锁:实现只能有一个线程写,多个线程写,更细粒度的控制ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();//写入:只能有一个线程写public void put(int key, Object value) {readWriteLock.writeLock().lock();try {System.out.println("开始写入" + key);map.put(key, value);System.out.println(key + "写入完成");} catch (Exception e) {e.printStackTrace();} finally {readWriteLock.writeLock().unlock();}}//读出:可以多个线程读public void get(int key) {readWriteLock.readLock().lock();try {System.out.println("开始读取" + key);map.get(key);System.out.println(key + "读取完毕");} catch (Exception e) {e.printStackTrace();} finally {readWriteLock.readLock().unlock();}}
    }
    

    image-20210325233255584
    根据结果,我们实现了写入时不能被插队,但是读取可以多个线程读取



十、阻塞队列

image-20210325234511948

10.1、关系图

BlockingQueue关系图:

image-20210325235643917

队列阻塞

image-20210325235738373

10.2、ArrayBlockingQueue四组API

方式抛出异常有返回值、不抛出异常阻塞等待限时等待
添加add()offer()put()offer( , )
移除remove()poll()take()poll( , )
判断队列首element()peek()\\

抛出异常(add、remove、element)

//抛出异常
public static void test1() {//初始化阻塞队列大小为3ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.add(1));System.out.println(blockingQueue.add(2));System.out.println(blockingQueue.add(3));//如果添加第四个元素,则发生异常java.lang.IllegalStateException: Queue fullSystem.out.println(blockingQueue.add(4));
}

image-20210409190803875

//抛出异常
public static void test1() {//初始化阻塞队列大小为3ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.add(1));System.out.println(blockingQueue.add(2));System.out.println(blockingQueue.add(3));System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());//如果取出第四个元素,则发生异常java.util.NoSuchElementExceptionSystem.out.println(blockingQueue.remove());
}

image-20210409190850740

//抛出异常
public static void test1() {//初始化阻塞队列大小为3ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.add(1));System.out.println(blockingQueue.add(2));System.out.println(blockingQueue.add(3));System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());//如果取出队首元素,则发生异常java.util.NoSuchElementExceptionSystem.out.println(blockingQueue.element());
}

image-20210409191247343

不跑出异常(offer、poll、pick)

//不抛出异常
public static void test1() {//初始化阻塞队列大小为3ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.offer(1));System.out.println(blockingQueue.offer(2));System.out.println(blockingQueue.offer(3));//如果添加第四个元素,返回falseSystem.out.println(blockingQueue.offer(4));System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());//如果取出第四个元素,返回nullSystem.out.println(blockingQueue.poll());//如果取出队首元素,返回nullSystem.out.println(blockingQueue.peek());
}

等待阻塞(put、take)

//等待阻塞
public static void test1() throws InterruptedException {//初始化阻塞队列大小为3ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);blockingQueue.put(1);blockingQueue.put(2);blockingQueue.put(3);//如果添加第四个元素,程序阻塞blockingQueue.put(4);System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());//如果取出第四个元素,程序阻塞blockingQueue.take();
}

限时等待()

//限时等待
public static void test1() throws InterruptedException {//初始化阻塞队列大小为3ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.offer(1));System.out.println(blockingQueue.offer(2));System.out.println(blockingQueue.offer(3));//如果添加第四个元素,等待2s后程序结束System.out.println(blockingQueue.offer(4, 2, TimeUnit.SECONDS));System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());//如果取出第四个元素,等待2s后程序结束System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
}

10.3、SynchronousQueue

image-20210409200508319

一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。

public static void main(String[] args) throws InterruptedException {SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();//线程T1,存元素new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " put 1");synchronousQueue.put("1");System.out.println(Thread.currentThread().getName() + " put 2");synchronousQueue.put("2");System.out.println(Thread.currentThread().getName() + " put 3");synchronousQueue.put("3");} catch (InterruptedException e) {e.printStackTrace();}}, "T1").start();//线程T2,取元素new Thread(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}, "T2").start();
}


十一、线程池

池化技术:程序的运行就会占用系统资源就会占用系统资源,为了优化资源的使用,就引入了池化技术,事先准备好一些资源,需要使用就来取,用完即放回;例如 线程池、连接池、内存池、对象池

线程池的好处:线程复用、可以控制最大并发数、管理线程

  • 降低资源消耗
  • 提高响应速度
  • 方便管理

11.1、创建线程池三大方法

如何创建线程池呢?java.util.concurrent中提供了Executors

image-20210412140530673
其中有一些静态方法用于创建线程池
image-20210412140902077

newSingleThreadExecutor

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ThreadPool {public static void main(String[] args) {//创建单一线程池,只有一个线程执行ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();//创建线程池,可指定固定线程数量同时执行for (int i = 0; i < 20; i++) {singleThreadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " ok");});}//线程池用完,关闭线程池singleThreadPool.shutdown();}
}

image-20210409214330526

newFixedThreadPool

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ThreadPool {public static void main(String[] args) {//创建线程池,可指定固定线程数量同时执行ExecutorService fixThreadPool = Executors.newFixedThreadPool(5);//使用线程池创建线程for (int i = 0; i < 20; i++) {fixThreadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " ok");});}//线程池用完,关闭线程池  fixThreadPool.shutdown();}
}

image-20210409220213831

newCachedThreadPool

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程, 那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。 此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ThreadPool {public static void main(String[] args) {//创建可伸缩的线程池ExecutorService cachedThreadPool = Executors.newCachedThreadPool();//使用线程池创建线程for (int i = 0; i < 20; i++) {cachedThreadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " ok");});}//线程池用完,关闭线程池cachedThreadPool.shutdown();}
}

image-20210409220319946

11.2、七大参数

查看newSingleThreadExecutornewFixedThreadPoolnewCachedThreadPool的源码,可以发现本质上就是创建了一个ThreadPoolExcutor对象

image-20210409232955804

image-20210409233027397
image-20210409233052778
再查看ThreadPoolExecutor的源码,可以看到七大参数

public ThreadPoolExecutor(int corePoolSize,	//核心线程池大小int maximumPoolSize,	//最大线程池大小long keepAliveTime,	//存活时间(超时未调用则释放)TimeUnit unit,	//超时单位BlockingQueue<Runnable> workQueue,	//阻塞队列ThreadFactory threadFactory,	//线程工程:创建线程,一般默认不改动RejectedExecutionHandler handler)	//拒绝策略
{if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

各种参数的含义好比银行办理业务,corePoolSize是已经开放的服务窗口,BlockingQueue是候客区,假设人流量非常大,就需要多开放几个服务窗口,maximumPoolSize就是最大开放的服务窗口数;再假如很少有人办理业务,过了一定的时间就关闭窗口,keepAliveTime就是要关闭窗口的事件;RejectedExecutionHandler拒绝策略就好比银行满了,再来人就不让进了

可以看到newCachedThreadPool的核心线程池大小设置未0,最大线程池大小设置为Integer.MAX_VALUE,约等于21亿;也就是说通过Executors.newCachedThreadPool()创建的线程池可以支持并发的线程数介于0~21亿之间,这是十分耗费资源的

因此,阿里巴巴官方手册有以下规定:
image-20210409233957716

11.3、四种拒绝策略

image-20210410002637368
image-20210410002432207
我们来自定义一个线程池,拒绝策略为AbortPolicy

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPool {public static void main(String[] args) {//自定义线程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,5,3,TimeUnit.SECONDS,new LinkedBlockingDeque<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());//最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常for (int i = 1; i <= 8; i++) {threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " ok");});}}
}

image-20210410001714211
如果i<=9,超过了最大承载,则会抛出异常
image-20210410001755571
修改拒绝策略为CallerRunsPolicy:可以看到没有抛出异常,而是由main线程处理

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPool {public static void main(String[] args) {//自定义线程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,5,3,TimeUnit.SECONDS,new LinkedBlockingDeque<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());//最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常for (int i = 1; i <= 9; i++) {threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " ok");});}}
}

image-20210410002758331
修改拒绝策略为DiscardPolicy:可以看到不抛出异常不执行

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPool {public static void main(String[] args) {//自定义线程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,5,3,TimeUnit.SECONDS,new LinkedBlockingDeque<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());//最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常for (int i = 1; i <= 9; i++) {threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + " ok");});}}
}

image-20210410002912020
策略DiscardOldestPolicyDiscardPolicy类似:不抛出异常不执行,但是会尝试竞争

11.4、最大线程数怎么定义?

CPU密集型

电脑的cpu是几核就定义为几,定义为常数换台电脑就不行了

//自定义线程池:CPU密集型
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,Runtime.getRuntime().availableProcessors(),3,TimeUnit.SECONDS,new LinkedBlockingDeque<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy()
);

IO密集型

大于 判断程序中耗费IO的线程 即可



十二、四大函数式接口

新时代的程序员:lambda表达式、链式编程、函数式接口、Stream流式计算

什么是函数式接口

函数式接口(Functional Interface)是jdk8引入的,有且仅有一个抽象方法,但是可以有多个非抽象方法的接口。并且这类接口使用了@FunctionalInterface进行注解,函数式接口可以被隐式转换为 lambda 表达式

JDK 1.8 之前已有的函数式接口:

  • java.lang.Runnable

    @FunctionalInterface
    public interface Runnable {public abstract void run();
    }
    
  • java.util.concurrent.Callable

  • java.security.PrivilegedAction

  • java.util.Comparator

  • java.io.FileFilter

  • java.nio.file.PathMatcher

  • java.lang.reflect.InvocationHandler

  • java.beans.PropertyChangeListener

  • java.awt.event.ActionListener

  • javax.swing.event.ChangeListener

JDK 1.8 新增加的函数接口:

  • java.util.function

    image-20210410111741105

这个package中的接口大致分为了以下四类:

  • Function:接收参数,并返回结果,主要方法 R apply(T t)

  • Consumer:接收参数,无返回结果, 主要方法为 void accept(T t)

    // forEach的参数就是消费者类型函数式接口Consumer
    @Override
    public void forEach(Consumer<? super E> action) {Objects.requireNonNull(action);final int expectedModCount = modCount;final Object[] es = elementData;final int size = this.size;for (int i = 0; modCount == expectedModCount && i < size; i++)action.accept(elementAt(es, i));if (modCount != expectedModCount)throw new ConcurrentModificationException();
    }
    
  • Supplier:不接收参数,但返回结构,主要方法为 T get()

  • Predicate:接收参数,x返回boolean值,主要方法为 boolean test(T t)

12.1、Function

函数式接口:有参数且需要返回值

@FunctionalInterface
public interface Function<T, R> {//有参数且有返回值R apply(T t);default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {Objects.requireNonNull(before);return (V v) -> apply(before.apply(v));}default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {Objects.requireNonNull(after);return (T t) -> after.apply(apply(t));}static <T> Function<T, T> identity() {return t -> t;}
}

实现一个Function接口

package 函数式接口;import java.util.function.Function;public class Demo {public static void main(String[] args) {//匿名内部类,没有类的名称Function function1 = new Function<String, String>() {@Overridepublic String apply(String str) {return str;}};//修改为lambda表达式Function function2 = (str) -> {return str;};System.out.println(function1.apply("hello"));System.out.println(function2.apply("hello lambda"));}
}

image-20210410113447834

12.2、Predicate

判断型接口:有参数,返回值为布尔型

@FunctionalInterface
public interface Predicate<T> {//有参数,返回值为布尔型boolean test(T t);default Predicate<T> and(Predicate<? super T> other) {Objects.requireNonNull(other);return (t) -> test(t) && other.test(t);}default Predicate<T> negate() {return (t) -> !test(t);}default Predicate<T> or(Predicate<? super T> other) {Objects.requireNonNull(other);return (t) -> test(t) || other.test(t);}static <T> Predicate<T> isEqual(Object targetRef) {return (null == targetRef)? Objects::isNull: object -> targetRef.equals(object);}@SuppressWarnings("unchecked")static <T> Predicate<T> not(Predicate<? super T> target) {Objects.requireNonNull(target);return (Predicate<T>)target.negate();}
}

实现一个Predicate接口

package 函数式接口;import java.util.function.Predicate;public class Demo1 {public static void main(String[] args) {//匿名内部类,没有类的名称Predicate<String> predicate1 = new Predicate<>() {@Overridepublic boolean test(String s) {return s.isEmpty();//判断字符串是否为空}};//修改为lambdaPredicate<String> predicate2 = (s) -> {return s.isEmpty();};System.out.println(predicate1.test("hello"));System.out.println(predicate2.test(""));}
}

image-20210410125206650

12.3、Consumer

消费性接口:没有返回值,有参数

@FunctionalInterface
public interface Consumer<T> {//没有返回值,有参数void accept(T t);default Consumer<T> andThen(Consumer<? super T> after) {Objects.requireNonNull(after);return (T t) -> { accept(t); after.accept(t); };}
}

实现Consumer接口

package 函数式接口;import java.util.function.Consumer;public class Demo2 {public static void main(String[] args) {//匿名内部类,没有类的名称Consumer<String> consumer1 = new Consumer<>() {@Overridepublic void accept(String s) {System.out.println(s);}};//修改为lambdaConsumer consumer2 = (s) -> {System.out.println(s);};consumer1.accept("hello1");consumer1.accept("hello2");}
}

image-20210410125950617

12.4、Supplier

供给型接口:无参数,指定返回值类型

@FunctionalInterface
public interface Supplier<T> {//无参数,指定返回值类型T get();
}

实现Supplier接口

package 函数式接口;import java.util.function.Supplier;public class Demo3 {public static void main(String[] args) {//匿名内部类,没有类的名称Supplier<String> supplier1 = new Supplier<>() {@Overridepublic String get() {return "hello1";}};//修改为lambdaSupplier supplier2 = () -> {return "hello2";};System.out.println(supplier1.get());System.out.println(supplier2.get());}
}

image-20210410130339781



十三、Stream流式计算

13.1、什么是Stream流式计算

大数据时代分为存储+计算,存储交给数据库、集合等来处理,计算就交给流来做

Java中就提供了java.util.stream用于流式计算
image-20210410133603700

13.2、案例测试

package 流式计算;import java.util.Arrays;
import java.util.List;public class Test {public static void main(String[] args) {User user1 = new User("a", 1, 18);User user2 = new User("b", 2, 19);User user3 = new User("c", 3, 21);User user4 = new User("d", 4, 30);User user5 = new User("e", 5, 28);User user6 = new User("f", 6, 27);//list来存储数据List<User> users = Arrays.asList(user1, user2, user3, user4, user5, user6);//stream来计算//筛选出以下条件的用户://1.id为偶数//2.age>23//3.name转换为大写字母//4.name字母倒序排序//5.只输出一个用户users.stream().filter(user -> {//判断型接口Predicatereturn user.getId() % 2 == 0;}).filter(user -> {//判断型接口Predicatereturn user.getAge() > 23;}).map(user -> {//函数式接口Functionreturn user.getName().toUpperCase();}).sorted((u1, u2) -> {//Comparator函数式接口return u2.compareTo(u1);}).limit(1).forEach(System.out::println);}
}


十四、ForkJoin

14.1、什么是ForkJoin

ForkJoin出现于jdk1.7,用于并行执行任务 ,提高效率,用于大数据量的情况(分支合并)

大数据:Map Reduce——将大任务拆分成小任务

image-20210410194839704

14.2、ForkJoin的特点

工作窃取:里面维护的都是双端队列
image-20210410195206428

14.3、案例:计算求和任务

我们可以在JUC中找到ForkJoinPool
image-20210410204610944
其中有一个方法,可用于执行一个ForkJoinTask
image-20210410204632095
image-20210410204736063
我们需要返回值,查看RecursiveTask,可以找到compute方法进行计算
image-20210410205524323

1️⃣ 编写任务

package forkjoin;import java.util.concurrent.RecursiveTask;//计算任务
public class ForkJoinTask extends RecursiveTask<Long> {private long begin;private long end;public ForkJoinTask(long begin, long end) {this.begin = begin;this.end = end;}//计算方法:计算1加到1_0000_0000@Overrideprotected Long compute() {long sum = 0;if ((end - begin) < 10000) {//如果差值小于10000则暴力加for (long i = begin; i <= end; i++)sum += i;return sum;} else {//数据量大于10000采用forkjoin来计算long mid = (begin + end) / 2;//任务一ForkJoinTask task1 = new ForkJoinTask(begin, mid);task1.fork();//拆分任务,将线程压入队列//任务二ForkJoinTask task2 = new ForkJoinTask(mid + 1, end);task2.fork();//拆分任务,将线程压入队列return task1.join() + task2.join();}}
}

2️⃣ 测试比较

package forkjoin;import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.LongStream;public class Test {public static void main(String[] args) throws ExecutionException, InterruptedException {test1();test2();test3();}//普通遍历方式public static void test1() {long sum = 0;long start = System.currentTimeMillis();for (long i = 0; i <= 10_0000_0000; i++) {sum += i;}long end = System.currentTimeMillis();System.out.println("普通遍历法耗时" + (end - start) + "结果为:" + sum);}//ForkJoin方式public static void test2() throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinTask task = new ForkJoinTask(0, 10_0000_0000);java.util.concurrent.ForkJoinTask<Long> submit = forkJoinPool.submit(task);Long sum = submit.get();long end = System.currentTimeMillis();System.out.println("通过ForkJoin方式耗时" + (end - start) + "结果为:" + sum);}//Stream并行流方式public static void test3() {long start = System.currentTimeMillis();long sum = LongStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Long::sum);//rangeClosed(]long end = System.currentTimeMillis();System.out.println("通过Stream并行流方式耗时" + (end - start) + "结果为:" + sum);}
}

image-20210410214523018



十五、异步回调

异步回调通常用CompletableFuture
image-20210411132654111
发起两个异步请求,一个有返回结果,一个没有返回结果

package future;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;//异步调用异步执行:成功回调/失败回调
public class Demo1 {public static void main(String[] args) throws ExecutionException, InterruptedException {//发起一个请求(没有返回值的异步回调)CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "runAsync=>Void");});completableFuture1.get();//阻塞获取执行结果//有返回值的异步回调CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");return 1024;});System.out.println(completableFuture2.whenComplete((t, u) -> {//成功回调:正确的返回结果System.out.println("t=" + t);System.out.println("u=" + u);}).exceptionally((e) -> {//失败回调:错误的返回结果System.out.println(e.getMessage());return 233;}).get());}
}

image-20210411134850651
如果有返回结果的异步回调报错,就会走失败回调的方法,返回233
image-20210411135011585



十六、理解JMM&Volatile

16.1、请你谈谈对Volatile的理解

Volatile是 JVM 提供的轻量级的同步机制

  1. 保证可见性
  2. 不保证原子性
  3. 禁止指令重排

怎么保证可见性?就需要和JMM挂钩

16.2、什么是JMM

Java内存模型,不存在的东西,是一种概念一种约定

关于JMM同步的约定

  • 线程解锁前,必须立刻把自己的共享变量刷回主存
  • 线程加锁前,必须读取主存中的最新值到工作内存中
  • 加锁和解锁是同一把锁

线程分为:工作内存、主内存

image-20210411135950221

16.3、Volatile特点

Volatile可以保证可见性,不能保证原子性,可以避免指令重排的现象

1. 保证可见性

package jmm;import java.util.concurrent.TimeUnit;public class Demo {private static int num;public static void main(String[] args) throws InterruptedException {new Thread(() -> {while (num == 0) ;}, "t").start();//main线程休眠1sTimeUnit.SECONDS.sleep(1);num = 1;System.out.println(num);}
}

image-20210411141806904

开启一个线程,当num=0时不停的死循环;然后让主线程休眠1s后修改num=1,也就是将主内存中的num修改为1;看到结果程序并没有停止,这是因为t线程并没有拿到主内存中num最新的值,不知套其发生了变化,也就是t线程对main线程的变化不可见

如何解决呢?只需要通过volatile关键字修饰num即可保证其的可见性

image-20210411142303690

可以看到,程序立马停止了

2. 不保证原子性

什么是原子性?也就是一个线程执行的时候不能被打扰分割,要么同时成功要么同时失败

package jmm;public class Demo2 {//通过volatile不能保证原子性private volatile static int num;public static void add() {num++;}public static void main(String[] args) throws InterruptedException {//理论上num=20000for (int i = 0; i < 20; i++) {new Thread(() -> {for (int j = 0; j < 1000; j++) {add();}}).start();}//为了让20000条线程跑完,让main线程进行礼让(这里的2表示java固有的两个线程main和gc)while (Thread.activeCount() > 2) {Thread.yield();}System.out.println(Thread.currentThread().getName() + ":" + num);}
}

image-20210411144731153
根据结果,发现volatile并不保证原子性的操作,为什么不安全呢?

我们反编译看看,可以看到num++一行代码在底层是多行操作,因此不能保证原子性,所以是不安全的

D:\学习\IDEA project\Test\out\production\Test\jmm>javap -c Demo2.class
Compiled from "Demo2.java"
public class jmm.Demo2 {public jmm.Demo2();Code:0: aload_01: invokespecial #1                  // Method java/lang/Object."<init>":()V4: returnpublic static synchronized void add();Code:0: getstatic     #7                  // Field num:I3: iconst_14: iadd5: putstatic     #7                  // Field num:I8: returnpublic static void main(java.lang.String[]) throws java.lang.InterruptedException;Code:0: iconst_01: istore_12: iload_13: bipush        205: if_icmpge     298: new           #13                 // class java/lang/Thread11: dup12: invokedynamic #15,  0             // InvokeDynamic #0:run:()Ljava/lang/Runnable;17: invokespecial #19                 // Method java/lang/Thread."<init>":(Ljava/lang/Runnable;)V20: invokevirtual #22                 // Method java/lang/Thread.start:()V23: iinc          1, 126: goto          229: invokestatic  #25                 // Method java/lang/Thread.activeCount:()I32: iconst_233: if_icmple     4236: invokestatic  #29                 // Method java/lang/Thread.yield:()V

那如果不通过Lock和Synchronized 怎么保证原子性呢?

可以通过java.util.concurrent.atomic包中的原子类解决原子问题 ,这些类的底层都直接和操作系统挂钩,在内存中修改值,这些类是特殊的存在
image-20210411150212298

package jmm;import java.util.concurrent.atomic.AtomicInteger;public class Demo {//通过volatile不能保证原子性private volatile static AtomicInteger num = new AtomicInteger();public static void add() {num.getAndIncrement();//并不是简单的+1,而是利用了底层的CAS}public static void main(String[] args) throws InterruptedException {//理论上num=20000for (int i = 0; i < 20; i++) {new Thread(() -> {for (int j = 0; j < 1000; j++) {add();}}).start();}//为了让20000条线程跑完,让main线程进行礼让(这里的2表示java固有的两个线程main和gc)while (Thread.activeCount() > 2) {Thread.yield();}System.out.println(Thread.currentThread().getName() + ":" + num);}
}

image-20210411154037289

3. 禁止指令重排

指令重排:计算机并不是按照我们编写的程序去执行

源代码–》编译器优化代码重排–》指令并行重排–》内存系统重排–》执行

处理器在指令重排的时候,会考虑数据之间的依赖性

int x = 1;	//1
int y = 2;	//2
x = x + 5;	//3
y = x * x;	//4按我们所期望的执行顺序是1->2->3->4
但实际上可能是2143或者1324,但不可能是4123

指令重排可能会导致一些错误的结果,如下图所示:

image-20210411155952539

使用volatile可以避免指令重排,底层实现是通过 内存屏障 实现的,可以保证特定的操作执行顺序,也可以保证某些变量的内存可见性

image-20210411160615817

十七、彻底玩转单例模式

volatile在单例模式中使用的最多

17.1、饿汉式

package 单例模式;//饿汉式
public class Hungry {private static Hungry hungry = new Hungry();//构造器私有private Hungry() {}public static Hungry getInstance() {return hungry;}
}

优点: static变量会在类装载时初始化,不涉及多个线程访问该对象的问题,可以省略synchronized关键字

缺点:类初始化时就创建了对象,如果只是加载本类,而不是要调用 getinstance(),甚至永远没有调用,则会造成资源浪费!

17.2、懒汉式

package 单例模式;//懒汉式
public class Lazy {private static Lazy lazy;private Lazy() {}public static Lazy getInstance() {if (lazy == null)lazy = new Lazy();return lazy;}
}

优点: 延迟加载,真正用的时候才实例化对象,提高了资源的利用率

缺点:存在并发访问的问题,以下测试并发访问情况

package 单例模式;//懒汉式
public class Lazy {private static Lazy lazy;private Lazy() {System.out.println("创建示例");}public static Lazy getInstance() {if (lazy == null)lazy = new Lazy();return lazy;}public static void main(String[] args) {//10条线程并发访问下for (int i = 0; i < 10; i++) {new Thread(() -> {Lazy.getInstance();}).start();}}
}

image-20210411162651485
根据结果,可以看到有5个线程打印了结果,也就说进行了5次初始化,这是非常大的漏洞,出现了并发访问的问题

17.3、双重检测锁式

为了解决懒汉式并发访问的问题,加入了sychronized关键字

package 单例模式;//双重检测锁式
public class DoubleLock {private static DoubleLock doubleLock;private DoubleLock() {System.out.println("创建示例");}public static DoubleLock getInstance() {if (doubleLock == null) {synchronized (Lazy.class) {if (doubleLock == null)doubleLock = new DoubleLock();}}return doubleLock;}public static void main(String[] args) {//10条线程并发访问下for (int i = 0; i < 10; i++) {new Thread(() -> {DoubleLock.getInstance();}).start();}}
}

image-20210411164551307
根据打印结果,解决了并发访问的问题;但是这样仍然会存在问题,因为我们new对象时并不是一个完整的原子性操作,而是分为以下三部:

  1. 分配内存空间
  2. 执行构造方法,初始化对象
  3. 把这个对象指向这个空间

单个线程A执行的情况下可以123按顺序执行,也可能由于指令重排按132执行;但是如果线程A按132顺序执行到3时来了一个线程B,此时该对象已经指向了分配的空间,因此B判断对象不是null,就会直接返回对象,但其实对象并没有进行初始化,就造成了错误

因此指令重排也会导致错误,因此完整的双重检测锁式还加入了Volatile关键字来避免指令重排,完整代码如下:

package 单例模式;//双重检测锁式
public class DoubleLock {private volatile static DoubleLock doubleLock;private DoubleLock() {System.out.println("创建示例");}public static DoubleLock getInstance() {if (doubleLock == null) {synchronized (Lazy.class) {if (doubleLock == null)doubleLock = new DoubleLock();}}return doubleLock;}
}

17.4、静态内部类式

package 单例模式;public class InnerClass {private InnerClass() {}//静态内部类里面创建对象public static class inner {private static final InnerClass innerClass = new InnerClass();}public static InnerClass getInstance() {return inner.innerClass;}
}

反射破坏单例模式

package 单例模式;import java.lang.reflect.Constructor;//双重检测锁式
public class DoubleLock {private volatile static DoubleLock doubleLock;private DoubleLock() {System.out.println("创建示例");}public static DoubleLock getInstance() {if (doubleLock == null) {synchronized (Lazy.class) {if (doubleLock == null)doubleLock = new DoubleLock();}}return doubleLock;}public static void main(String[] args) throws Exception {DoubleLock instance1 = doubleLock.getInstance();Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);constructor.setAccessible(true);DoubleLock instance2 = constructor.newInstance();System.out.println(instance1);System.out.println(instance2);}
}

image-20210411233747573
根据结果,看到创建了两个实例,也就是单例模式被破坏,那么怎么解决呢?

可以在私有构造中加锁

package 单例模式;import java.lang.reflect.Constructor;//双重检测锁式
public class DoubleLock {private volatile static DoubleLock doubleLock;private DoubleLock() {synchronized (DoubleLock.class){if(doubleLock!=null){throw new RuntimeException("不要试图使用反射破坏异常");}}System.out.println("创建示例");}public static DoubleLock getInstance() {if (doubleLock == null) {synchronized (Lazy.class) {if (doubleLock == null)doubleLock = new DoubleLock();}}return doubleLock;}public static void main(String[] args) throws Exception {DoubleLock instance1 = doubleLock.getInstance();Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);constructor.setAccessible(true);DoubleLock instance2 = constructor.newInstance();System.out.println(instance1);System.out.println(instance2);}
}

image-20210411234013067
根据结果,可以看到避免了单例模式的破坏?可是上述两个对象一个是通过单例获取,一个通过反射获取;

那如果两个对象都是通过反射获取呢?

public static void main(String[] args) throws Exception {Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);constructor.setAccessible(true);DoubleLock instance1= constructor.newInstance();DoubleLock instance2 = constructor.newInstance();System.out.println(instance1);System.out.println(instance2);
}

image-20210411234133836
根据结果,可以看到单例模式又被破坏了,创建了两个对象!这种情况如何解决呢?

可以通过红绿灯方法实现,定义一个标志位记录对象是否创建

package 单例模式;import java.lang.reflect.Constructor;//双重检测锁式
public class DoubleLock {private volatile static DoubleLock doubleLock;//标志位private static boolean flag = false;private DoubleLock() {synchronized (DoubleLock.class) {if (flag == false)flag = true;elsethrow new RuntimeException("不要试图使用反射破坏异常");}System.out.println("创建示例");}public static DoubleLock getInstance() {if (doubleLock == null) {synchronized (Lazy.class) {if (doubleLock == null)doubleLock = new DoubleLock();}}return doubleLock;}public static void main(String[] args) throws Exception {Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);constructor.setAccessible(true);DoubleLock instance1 = constructor.newInstance();DoubleLock instance2 = constructor.newInstance();System.out.println(instance1);System.out.println(instance2);}
}

image-20210411234412838
可以看到我们通过设置标志位flag再次解决了这个问题,但是一旦被获取了这个关键字,单例模式仍然可以通过反射被破解,如下所示

public static void main(String[] args) throws Exception {Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);Field declaredField = DoubleLock.class.getDeclaredField("flag");constructor.setAccessible(true);declaredField.setAccessible(true);DoubleLock instance1 = constructor.newInstance();declaredField.set(instance1, false);//第一个对象创建完毕后将flag改为falseDoubleLock instance2 = constructor.newInstance();System.out.println(instance1);System.out.println(instance2);
}

image-20210411234824886
可以看到单例模式再次被破坏;因此为了让程序更加安全,通常对flag关键字进行加密处理

那么到底如何完全的避免反射破坏单例模式呢?我们查看newInstance的源码
image-20210411235142204
可以看到,如果是枚举类型的话,就不能通过反射获取枚举;

因此引入了第5种单例模式

17.5、枚举单例

package 单例模式;import java.lang.reflect.Constructor;//enum本质上就是一个Class类
public enum EnumSingle {INSTANCE;public static void main(String[] args) throws Exception {EnumSingle instance1 = EnumSingle.INSTANCE;Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(null);declaredConstructor.setAccessible(true);EnumSingle instance2 = declaredConstructor.newInstance();System.out.println(instance1);System.out.println(instance2);}
}

我们再次通过反射创建对象,根据结果报错没有EnumSingle的空构造方法,这不是我们希望看到的
image-20210412000059691
我们对EnumSingle的class文件进行反编译,可以看到明明有空构造方法
image-20210412000353611
但是执行明明报错没有无参构造,我们使用更专业的反编译工具jad对class文件再进行反编译
image-20210412000629008
可以看到枚举类本质上就是继承了Enum类,本身就是一个Class,而且没有无参构造,而是含两个参数的有参构造,我们修改代码在测试

public static void main(String[] args) throws Exception {EnumSingle instance1 = EnumSingle.INSTANCE;Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class, int.class);declaredConstructor.setAccessible(true);EnumSingle instance2 = declaredConstructor.newInstance();System.out.println(instance1);System.out.println(instance2);
}

image-20210412000837225
这才正确显示了报错的信息:无法反射地创建枚举对象



十八、深入理解CAS

18.1、什么是CAS

CAS 是 compareAndSet 的缩写:比较并交换,是CPU的并发原语

package CAS探究;import java.util.concurrent.atomic.AtomicInteger;public class CASDemo {public static void main(String[] args) {AtomicInteger atomicInteger = new AtomicInteger(2020);//底层是CAS/*** public final boolean compareAndSet(int expectedValue, int newValue)* 期望、更新* 如果期望的值达到了就更新,否则不更新*/System.out.println(atomicInteger.compareAndSet(2020, 2021));//返回是否修改成功System.out.println(atomicInteger.get());System.out.println(atomicInteger.compareAndSet(2020, 2022));System.out.println(atomicInteger.get());}
}

image-20210412002500909
我们再来看看 atomicInteger.getAndIncrement() 方法是怎么实现的?我们该方法的源码
image-20210412002756242
可以看到是由U调用了getAndAddInt()方法,而U就是Unsafe类的一个实例

什么是Unsafe

  • Java无法操作内存,只能通过调用C++来操作内存,Unsafe就是Java通过C++操作内存的接口
  • 就类似于Java通过native关键字来调用C++本地方法来和操作系统交互

image-20210412004045789
可以看到,底层是一个do while循环,也就是一个自旋锁

因此:CAS就是比较当前工作内存中的值和主内存中的值,如果这个值是期望的,就执行操作;如果不是,就一直循环,因为底层是一个do while循环(自旋锁)

CAS有三个操作数:

  • 期望的值
  • 比较的值
  • 更新的值

缺点

  • 底层是自旋锁,循环耗时
  • 一次性只能保证一个共享变量的原子性
  • 会存在ABA问题

18.2、ABA问题

比如有两个线程A,B同时向修改A的内容,但是B线程执行速度快,首先cas(1,3)将A修改为3,然后又执行cas(3,1)将A修改为1,这之后线程A再cas(1,2)将A修改为2,但此时A=1已经不是原来的1了;

这就是ABA问题

image-20210412005138299

我们来个代码模拟以下

package CAS探究;import java.util.concurrent.atomic.AtomicInteger;public class ABADemo {//CAS是compareAndSet的缩写:比较并交换,是CPU的并发原语public static void main(String[] args) {AtomicInteger atomicInteger = new AtomicInteger(1);//底层是CAS//线程BSystem.out.println(atomicInteger.compareAndSet(1, 3));System.out.println(atomicInteger.get());System.out.println(atomicInteger.compareAndSet(3, 1));System.out.println(atomicInteger.get());//线程ASystem.out.println(atomicInteger.compareAndSet(1, 2));System.out.println(atomicInteger.get());}
}

image-20210412005623931
可以看到三个结果都为true,但不是我们期望的,我们希望知道谁动过A的值

可以通过类似乐观锁的方案来解决,使用 原子引用类AtomicReference/AtomicStampedReference(带时间)
image-20210412010508978
我们使用AtomicStampedReference测试以下

package CAS探究;import java.util.concurrent.atomic.AtomicStampedReference;public class ABADemo {//CAS是compareAndSet的缩写:比较并交换,是CPU的并发原语public static void main(String[] args) {//public AtomicStampedReference(V initialRef, int initialStamp):这里的第二个参数等同于乐观锁的version,初始值设为1AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1);//线程B,多了连个参数:期望的版本号,更新的版本号System.out.println(atomicStampedReference.compareAndSet(1, 3,atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));System.out.println(atomicStampedReference.compareAndSet(3, 1,atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));//线程ASystem.out.println(atomicStampedReference.compareAndSet(1, 2, 1, 2));}
}

image-20210412011555412
可以看到A成功察觉到了B修改过数据,所以执行失败;和乐观锁原理相同

注意:如果泛型是包装类,注意对象引用问题(正常业务都是对象,这里是使用包装类Integer进行测试)

image-20210412011922276
如果我们的范围不再-128~127,则会失败
image-20210412012249646



十九、各种锁的理解

19.1、乐观锁/悲观锁

悲观锁(Pessimistic Lock)

1️⃣ 简介

​ 当要对数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。这种借助数据库锁机制,在修改数据之前先锁定,再修改的方式被称之为悲观并发控制【Pessimistic Concurrency Control,缩写“PCC”,又名“悲观锁”】

​ 悲观锁,正如其名,具有强烈的独占和排他特性。它指的是对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度。因此,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。

img

之所以叫做悲观锁,是因为这是一种对数据的修改持有悲观态度的并发控制方式。总是假设最坏的情况,每次读取数据的时候都默认其他线程会更改数据,因此需要进行加锁操作,当其他线程想要访问数据时,都需要阻塞挂起。悲观锁的实现:

  1. 传统的关系型数据库使用这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
  2. Java 里面的同步 synchronized 关键字的实现。

2️⃣ 分类

悲观锁主要分为 共享锁排他锁

  • 共享锁【shared locks】又称为读锁,简称S锁。顾名思义,共享锁就是多个事务对于同一数据可以共享一把锁,都能访问到数据,但是只能读不能修改。
  • 排他锁【exclusive locks】又称为写锁,简称X锁。顾名思义,排他锁就是不能与其他锁并存,如果一个事务获取了一个数据行的排他锁,其他事务就不能再获取该行的其他锁,包括共享锁和排他锁,但是获取排他锁的事务是可以对数据行读取和修改。

3️⃣ 说明

​ 悲观并发控制实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。但是在效率方面,处理加锁的机制会让数据库产生额外的开销,还有增加产生死锁的机会。另外还会降低并行性,一个事务如果锁定了某行数据,其他事务就必须等待该事务处理完才可以处理那行数据。

乐观锁(Optimistic Locking)

1️⃣ 简介

​ 乐观锁是相对悲观锁而言的,乐观锁假设数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则返回给用户错误的信息,让用户决定如何去做。乐观锁适用于读操作多的场景,这样可以提高程序的吞吐量。
img
​ 乐观锁机制采取了更加宽松的加锁机制。乐观锁是相对悲观锁而言,也是为了避免数据库幻读、业务处理时间过长等原因引起数据处理错误的一种机制,但乐观锁不会刻意使用数据库本身的锁机制,而是依据数据本身来保证数据的正确性。

2️⃣ 实现

  1. CAS实现:Java中java.util.concurrent.atomic包下面的原子变量使用了乐观锁的一种 CAS 实现方式
  2. 版本号控制:一般是在数据表中加上一个数据版本号 version 字段,表示数据被修改的次数。当数据被修改时,version 值会+1。当线程A要更新数据值时,在读取数据的同时也会读取 version 值,在提交更新时,若刚才读取到的 version 值与当前数据库中的 version 值相等时才更新,否则重试更新操作,直到更新成功

3️⃣ 说明

​ 乐观并发控制相信事务之间的数据竞争(data race)的概率是比较小的,因此尽可能直接做下去,直到提交的时候才去锁定,所以不会产生任何锁和死锁

19.2、公平/非公平锁

  • 公平锁:非常公平,不能插队,线程的执行必须先来后到
  • 非公平锁:非常不公平,可以插队,默认都为非公平锁!(比如一个线程3s执行完,一个线程1min执行完,如果使用公平锁严重影响某个线程的效率)
    image-20210321133926406

19.3、可重入锁

可重入锁(递归锁)
image-20210412012855770

代码示例:synchronized版

image-20210412013050474

执行结果:

image-20210412013256865

代码示例:Lock版

image-20210412013522089

19.4、自旋锁

不断的尝试,直到成功为止!

image-20210412014256436

我们来编写一个自旋锁

package 自旋锁;import java.util.concurrent.atomic.AtomicReference;//自定义自旋锁
public class SpinLock {//锁线程AtomicReference<Thread> atomicReference = new AtomicReference<>();//加锁public void myLock() {Thread thread = Thread.currentThread();System.out.println(thread.getName() + "==>myLock");//自旋锁while (!atomicReference.compareAndSet(null, thread)) ;}//解锁public void myUnlock() {Thread thread = Thread.currentThread();System.out.println(thread.getName() + "==>myUnLock");atomicReference.compareAndSet(thread, null);}
}

然后编写一段测试代码

package 自旋锁;import java.util.concurrent.TimeUnit;public class Test {public static void main(String[] args) throws InterruptedException {SpinLock spinLock = new SpinLock();//线程T1new Thread(() -> {spinLock.myLock();try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();} finally {spinLock.myUnlock();}}, "T1").start();TimeUnit.SECONDS.sleep(1);//线程T2new Thread(() -> {spinLock.myLock();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} finally {spinLock.myUnlock();}}, "T2").start();}
}

image-20210412082802954
根据结果,总是T1线程解锁后,T2线程才能解锁;因为如果T1线程不解锁,T2就会卡住在while循环不停的尝试cas直到thread=null为止

19.5、死锁

什么是死锁?

是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象

image-20210412083125750

简单的死锁案例

package 死锁;import java.util.concurrent.TimeUnit;public class DeadLockDemo {public static void main(String[] args) {String lockA = "lockA";String lockB = "lockB";new Thread(new MyThread(lockA, lockB), "T1").start();new Thread(new MyThread(lockB, lockA), "T2").start();}
}class MyThread implements Runnable {private String lockA;private String lockB;public MyThread(String lockA, String lockB) {this.lockA = lockA;this.lockB = lockB;}@Overridepublic void run() {synchronized (lockA) {System.out.println(Thread.currentThread().getName() + "持有锁" + lockA + "尝试获取" + lockB);try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}synchronized (lockB) {System.out.println(Thread.currentThread().getName() + "持有锁" + lockB + "尝试获取" + lockA);}}}
}

image-20210412084034340
根据运行结果,可以看到程序卡死,因为发生了死锁,因为T1和T2分别持有lockA和lockB,但又都试图获取对方的锁!

死锁问题排查

  1. 使用jps -l命令定位进程号

    image-20210412084332447

  2. 使用jstack 进程号查看指定进程的堆栈信息,找到死锁问题

    image-20210412084622766

    可以看到,控制台清晰的打印了找到死锁,并可以看到产生的原因就是T1T2互相尝试获取对方的锁


http://www.ppmy.cn/news/605610.html

相关文章

qt QSqlRelationalTableModel 详解

背景知识&#xff1a; Qt SQL的API分为不同层&#xff1a; 驱动层 驱动层 对于QT是基于C来实现的框架&#xff0c;该层主要包括QSqlDriver、QSqlDriverCreator、QSqlDriverCreatorbase、QSqlDriverPlugin and QSqlResult。这一层提供了特定数据库和SQL API层之间的底层桥梁…

python的print格式化输出,以及使用format来控制。

20210305 time.strftime("%Y%m%d%H%M%S", time.localtime()) 时间格式化 20210206 https://www.runoob.com/w3cnote/python3-print-func-b.html 重点 20210206 https://blog.csdn.net/a19990412/article/details/80149112 !r 20210205 https://www.jianshu.com/p…

LeetCode中等题之使数组中所有元素相等的最小操作数

题目 存在一个长度为 n 的数组 arr &#xff0c;其中 arr[i] (2 * i) 1 &#xff08; 0 < i < n &#xff09;。 一次操作中&#xff0c;你可以选出两个下标&#xff0c;记作 x 和 y &#xff08; 0 < x, y < n &#xff09;并使 arr[x] 减去 1 、arr[y] 加上 1…

Pycharm报错:Process finished with exit code -1073741819 (0xC0000005)

1.本来应该是文件夹目录 写成了文件 2.路径前面没有加r 识别成正则\t,\n等 记录踏坑历程 在用pycharm tensorflow-gpu环境 读tfrecord时出错 pycharm报错&#xff1a;Process finished with exit code -1073741819 (0xC0000005) 。针对这个泛泛的错误&#xff0c;网上存在应对…

LeetCode中等题之两棵二叉搜索树中的所有元素

题目 给你 root1 和 root2 这两棵二叉搜索树。请你返回一个列表&#xff0c;其中包含 两棵树 中的所有整数并按 升序 排序。. 示例 1&#xff1a; 输入&#xff1a;root1 [2,1,4], root2 [1,0,3] 输出&#xff1a;[0,1,1,2,3,4] 示例 2&#xff1a; 输入&#xff1a;root…

查错bug

1.当无法确定引用的母函数的时候,故意制造事故现场引出前面的调用函数

Redis数据类型详解(5基本+3特殊)

目录一、五大基本数据类型1.1、String(字符串)1.2、List(列表)1.3、Set(集合)1.4、Hash(哈希)1.5、Zset(有序集合)二、三种特殊数据类型2.1、geospatial(地理位置)2.1.1、GEOADD2.1.2、GEOPOS2.1.3、GEODIST2.1.4、GEORADIUS2.1.5、GEORADIUSBYMEMBER2.1.6、GEOHASH2.1.7、总结…

LeetCode简单题之1比特与2比特字符

题目 有两种特殊字符&#xff1a; 第一种字符可以用一个比特 0 来表示 第二种字符可以用两个比特(10 或 11)来表示、 给定一个以 0 结尾的二进制数组 bits &#xff0c;如果最后一个字符必须是一位字符&#xff0c;则返回 true 。 示例 1: 输入: bits [1, 0, 0] 输出: true 解…