JUC并发编程18 | AQS分析

news/2024/11/7 9:32:20/

尚硅谷(140-155)

18 AQS

前置知识

  • 公平锁和非公平锁
  • 可重入锁
  • 自旋思想
  • LockSupport
  • 双向链表
  • 设计模式——模块设计

18.1 AQS入门级别理论知识

AQS一般指的是 AbstractQueuedSynchronized

AQS 是用来实现锁或者其他同步器组件的公共基础部分的抽象实现,是重量级基础架构以及整个JUC体系的基石,主要用于解决锁分配给谁的问题。

整体就是一个FIFO队列来完成资源获取线程的排队工作,并通过一个int类型变量表示持有锁的状态。

在这里插入图片描述

与AQS有关的:ReentrantLockCountDownLatchReentrantReadWriteLockSemaphore

在这里插入图片描述

既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH队列(双向链表) 的变体实现的.将暂时获取不到锁的线程加入到队列中,这个队列就是AQS同步队列的抽象表现。它将要请求共享资源的线程及自身的等待状态封装成队列的结点对象(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果。

AbstractQueuedSynchronized 部分源码

在这里插入图片描述

  • Node 等待线程的保存位置
  • head 前指针
  • tail 后指针(所以是双向队列)
  • state 线程状态位

在这里插入图片描述

18.2 AQS源码分析前置知识储备

AQS内部体系架构

在这里插入图片描述

18.2.1 AQS 自身

state

  • 0,线程可以进入
  • 大于等于1,有人占用资源,需要等待

CLH 队列

在这里插入图片描述

  • Node 等待线程的保存位置
  • head 前指针
  • tail 后指针(所以是双向队列)

总结:有阻塞就需要排队,实现排队必然需要队列,state 变量 + CLH 双端队列

18.2.2 ASQ 内部 Node 类

static final class Node {// 表示线程以共享的模式等待锁static final Node SHARED = new Node();// 表示线程正在以独占的方式等待锁static final Node EXCLUSIVE = null;// 表示当前线程获取锁的请求已经取消了static final int CANCELLED =  1;// 表示当前线程已经准备好了,就等资源释放了static final int SIGNAL    = -1;// 表示结点在等待队列中,结点线程等待唤醒static final int CONDITION = -2;// 当前线程处在SHARED情况下,该字段才会使用,传播static final int PROPAGATE = -3;// 当前结点在队列中的状态(上面四种)volatile int waitStatus;// 前驱指针volatile Node prev;// 后继指针volatile Node next;// 表示醋鱼该结点的线程volatile Thread thread;// 指向下一个处于condition状态的结点Node nextWaiter;final boolean isShared() {return nextWaiter == SHARED;}final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() {    }Node(Thread thread, Node mode) {this.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus;this.thread = thread;}}

18.3 AQS源码深度讲解和分析

Lock 接口的实现类,基本都是通过【聚合】一个【队列同步器】的子类完成线程访问控制的

ReentrantLock的原理

在这里插入图片描述

默认使用的是非公平锁,传入的是true的话则是公平锁

public ReentrantLock() {sync = new NonfairSync();
}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
}

进入非公平锁

static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;final void lock() {// 我们希望是没有线程占领资源的,如果没有占领资源,把0置为1if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());// 如果有线程占用,acquire置1,去排队elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
}

公平锁:直接置1

static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}...
}

所以,创建代码的时候

// 非公平
Lock lock = new ReentrantLock();
// 公平
Lock lock = new ReentrantLock(true);
// 非公平
Lock lock = new ReentrantLock(false);

在尝试获得锁的过程中 公平锁和非公平锁,就差异了一个 !hasQueuedPredecessors() ,有没有前驱对象。

如果公平锁中有前驱对象,需要在队列中等待;非公平锁则需要去抢夺

// 公平锁
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}// 非公平锁
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {	// 由 0 变 1 setExclusiveOwnerThread(current);	// 设置当前的独占线程return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}

由于 FairSync 和 NonfairSync 都是要调用 acquire 方法,置 1 表示有线程抢到了锁

lock()方法

以 NonfairSync 为例,AQS acquire 主要的三条流程

// ReentrantLock 中的 lock 方法
public void lock() {sync.lock();
}
abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;// Sync中的抽象方法abstract void lock();...
}
// FairSync 和 NonfairSync 两个方法都继承了 Sync方法的 lock 操作,下面的代码是 NonfairSync 内部类的 lock 方法实现
final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);
}
// 进入acquire方法
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))	// Node.EXCLUSIVE 独占的 node 结点selfInterrupt();
}

1、调用tryAcquire

// 由AQS中的tryAcquire方法
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}// 在非公平的NonfairSync中的实现
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 从希望值的0 修改为1if (compareAndSetState(0, acquires)) {// 第一个线程抢占到了setExclusiveOwnerThread(current);return true;}}// 如果是被占用的线程就进入,不是的话返回falseelse if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}

2、调用addWaiter方法:入队

private Node addWaiter(Node mode) {// 封装一个当前线程,mode 是 排他模式Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) {		// 等待队列中有线程在等待了node.prev = pred;		// 把当前队列中的尾指针指向的结点赋值给新进线程的前驱结点if (compareAndSetTail(pred, node)) {	// 将新进的node设置为尾指针pred.next = node;		// 把之前尾结点的后继结点设置为新进结点nodereturn node;}}// 进入结点enq(node);return node;
}private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))	// 入队的结点不是传入的结点(虚拟节点&哨兵节点,作用就是占位),new Node(){Thread = null;waitStatus = 0;},而是一个新的结点,设置为头/尾结点tail = head;} else {node.prev = t;	// 把 t(也就是伪指针,指的是新的node)赋值给 node (传入的结点)的前指针if (compareAndSetTail(t, node)) {	// 把node设置为尾指针t.next = node;return t;}}}
}

在这里插入图片描述
在这里插入图片描述

3、调用 acquireQueued 方法

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// 正常的入队出队boolean interrupted = false;for (;;) {final Node p = node.predecessor();	// B结点的前驱结点是虚拟结点,也是头结点if (p == head && tryAcquire(arg)) {	// 如果当前还有线程占用,就往下走,如果没有进入代码块。tryAcquire(arg) 可能来夺锁setHead(node);	// 将当前结点设置为头结点p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&	// B结点的前驱结点也就是虚拟结点,node就是B结点parkAndCheckInterrupt())	interrupted = true;}} finally {// 意外中断,取消排队if (failed)cancelAcquire(node);}
}
private void setHead(Node node) {head = node;	// 将当前结点B设置为头结点node.thread = null;		// 将头结点的thread置为null,没有前驱指针node.prev = null;
}// shouldParkAfterFailedAcquire(p, node) 的源码,前驱结点为SIGNAL返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;	// 获取虚拟结点在队列中的状态,有0(默认值),1,-1,-2,-3等状态if (ws == Node.SIGNAL)	// 如果当前状态为-1,就是线程准备好了,返回truereturn true;if (ws > 0) {	// 如果线程获取锁的活动取消了do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);	// 前置节点(头结点)设置 waitStatus 为 Node.SIGNAL(-1)}return false;
}
// parkAndCheckInterrupt() 方法,也就是this就是B(NonfairSync对象)转着圈,等待这争取锁
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();
}

在这里插入图片描述

unlock() 方法

unlock --> release --> tryRelease/unparkSuccessor

// ReentrantLock 中的 unlock 方法
public void unlock() {sync.release(1);
}public final boolean release(int arg) {if (tryRelease(arg)) {	// 释放锁Node h = head;	// h 为虚拟结点if (h != null && h.waitStatus != 0)	// 等待队列中有线程再等待同时 waitStatus = 0unparkSuccessor(h);return true;}return false;
}protected final boolean tryRelease(int releases) {int c = getState() - releases;		// 1 - 1 = 0 = cif (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);	// 设置 state = 0;说明已经解锁return free;	// 返回 true
}private void unparkSuccessor(Node node) {int ws = node.waitStatus;	// 头结点为-1if (ws < 0)compareAndSetWaitStatus(node, ws, 0);	// 设置头结点(虚拟结点)status为0Node s = node.next;	// 来到真实的结点Bif (s == null || s.waitStatus > 0) {	// 队列中没有等待的线程或者当前线程获取锁的请求已经取消了s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)	// 有线程在等待LockSupport.unpark(s.thread);
}
// 对队列中的等待进行唤醒
public static void unpark(Thread thread) {if (thread != null)UNSAFE.unpark(thread);
}

在这里插入图片描述

cancelAcquire()

cancelAcquire 方法取消流程

private void cancelAcquire(Node node) {		// 比如5号节点if (node == null)return;node.thread = null;		// 线程为nullNode pred = node.prev;	//	5号节点的前驱结点就是4号结点while (pred.waitStatus > 0)	// 如果4号结点也取消,指针继续前移node.prev = pred = pred.prev;Node predNext = pred.next;	node.waitStatus = Node.CANCELLED;	// 5号结点的waitStatus设置为1if (node == tail && compareAndSetTail(node, pred)) {	// 5号结点是尾结点并且把尾指针指向4号结点compareAndSetNext(pred, predNext, null);	// 把4号结点的后继结点设为null} else {		// 删除的不是尾结点int ws;		if (pred != head &&		// 前驱不是头结点((ws = pred.waitStatus) == Node.SIGNAL ||	// 如果前驱结点的 waitStatus 是 -1 (随时待命)(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&	// 线程没有被取消,把前驱结点的线程的waitStatus置为-1pred.thread != null) {	// 前驱结点的thread不为空Node next = node.next;	// 把5号结点的后继结点给nextif (next != null && next.waitStatus <= 0)	// 后继结点为不空compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);	// }node.next = node; // help GC}
}// 
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);	// 设为默认值Node s = node.next;if (s == null || s.waitStatus > 0) {	// 取消线程s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);	// 出去抢夺锁
}

http://www.ppmy.cn/news/68011.html

相关文章

Learn RabbitMQ with SpringBoot

文章目录 What is RabbitMQ?RabbitMQ Core conceptRabbitMQ ArchitectureInstall and setup RabbitMQ using DockerExplore RabbitMQ using management UICreate and setup Springboot3 project in intellijSpringboot and RabbitMQ Basic MessageConnection between Springbo…

filter和handlerInterceptor的各个方法执行的排序

Filter和HandlerInterceptor的方法执行顺序如下&#xff1a; Filter 在请求进入Servlet容器之前&#xff0c;先经过Filter的过滤器链。Filter的init()方法只会在容器启动时执行一次。Filter的doFilter()方法是每次请求都会执行的&#xff0c;如果需要放行请求&#xff0c;需要…

搭建高效微服务架构:Kubernetes、Prometheus和ELK Stack的完美组合

搭建高效微服务架构&#xff1a;Kubernetes、Prometheus和ELK Stack的完美组合 一、前言1 微服务架构简介2 Kubernetes 简介3 Kubernetes 与微服务 二、准备工作1 安装 Kubernetes1.1 搭建 Kubernetes 集群1.2 安装 kubectl 工具 2 准备 Docker 镜像2.1 编写 Dockerfile 文件2.…

华为OD机试(21-40)老题库解析Java源码系列连载ing

华为OD机试算法题新老题库练习及源码 老题库21.字符串序列判定22.最长的指定瑕疵度的元音子串 郑重声明&#xff1a; 1.博客中涉及题目为网上搜索而来&#xff0c;若侵权&#xff0c;请联系作者删除。 源码内容为个人原创&#xff0c;仅允许个人学习使用。 2.博客中涉及的源…

多优先级(笔记)

目录 支持多优先级的方法通用方法优化方法1、修改任务控制块2、修改xTaskCerateStactic()修改 prvInitialiseNewTask() 函数prvAddTaskToReadyList()初始化任务列表prvAddTaskToReadyList()vTaskStartScheduler()vTaskDelay()vTaskSwitchContext()xTaskIncrementTick() 实验实验…

Java EE 进阶---多线程(一)

目录 一、常见的锁策略 乐观锁 vs 悲观锁 重量级锁 vs 轻量级锁 读写锁&#xff06;普通互斥锁 自旋锁&#xff06;挂起等待锁 可重入锁&#xff06;不可重入锁 公平锁&#xff06;非公平锁 synchronized实现了哪些锁策略&#xff1f; 二、Compare And Swap 比较并交换…

swift语言特性,swift语法介绍,swift使用技巧

Swift语言特性、Swift语法介绍、Swift使用技巧 Swift是一种由苹果公司开发的编程语言&#xff0c;于2014年首次发布。它是一种现代、快速、安全的编程语言&#xff0c;用于iOS、macOS、watchOS和tvOS应用程序的开发。Swift语言具有许多特性和优点&#xff0c;使得它成为一种非…

【Tkinter.Floodgauge】当程序需要长时间运行,可以用这个组件显示进度【文末附源码地址】

文章目录 效果展示源码解析导包Floodgauge组件界面初始化创建窗口修改数值运行 源码地址 效果展示 我在使用tkinter进行界面化操作的时候&#xff0c;会遇到运行很慢的程序&#xff0c;比如&#xff1a;爬虫下载视频、压缩解压文件&#xff0c;这些操作会很耗时间。 Floodgau…