从ReentrantLock学习AQS -- JDK17版
前言
技术无止境,无论任何时刻,通过加强我们对技术的了解,都能为我们带来信心!
当前大部分的课程,或生产环境,可能都还在使用JDK8,但到现在这个时间点,包括Spring、Jenkins等等核心框架或核心生态,都已经在逐渐的升级到JDK17。
而JDK17对于JDK8来说的升级,简单来说,就是性能上的提升,无论还是底层逻辑,还是新的GC,任何事物,想长久发展,就必须与时俱进,自我升级!
AQS可以说,是所有Java高级工程师必须要掌握的知识点,掌握AQS对于在代码中如何并发编程,可以说是Java并发编码的基础,与Synchronized共同支撑着Java并发的整个架构。
本文概述
AQS在JDK17的代码有了一些调整。
已经掌握JDK8的大佬可以对比一下,看有哪些区别;
从前未研究过的,可以尝试一下,希望能帮助你了解AQS;
如有疑问,欢迎评论共同探讨。
这篇文章,是希望找一个角度来分析AQS究竟是如何实现锁的,主要的内容包括:
- 简单的说一下AQS是什么
- ReentrantLock是如何基于AQS实现锁管理的
AQS是什么
AQS是Java的JUC包中的AbstractQueueSynchronizer类,他是一个抽象类,基本上Java所有与锁有关的,都是基于AQS实现的。
常见类中包含:
- ReentrantRock
- ReentrantReadWriteRock
- CountDownLatch
- Semaphore
- 线程池
数据结构
AQS通过定义了一些链表内部类,结合自身,实现了线程的状态管理与线程执行顺序
AQS定义了head与tail来表示头部和尾部,还有state属于用于表示当前的锁状态,AQS里面定义了一个抽象静态内部类Node,AQS与Node共同实现了一个双端列表;
同时在AQS中,还定义了一个内部静态类ConditionNode继承Node,与AQS的另一个内部类ConditionObject可以实现一个单向链表;
注意:JDK8中,AQS使用的waiter字来保存当前持有锁的线程, 在JDK17中通过父类AbstractOwnableSynchronizer来管理
JDK17的AQS简单代码:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 等待队列头部,懒汉式初始化
private transient volatile Node head;
// 等待队列尾部
private transient volatile Node tail;
// 锁状态对于可重入锁,落锁的时候可以一直递增,只有释放锁直到为0时才真正的释放
private volatile int state;
// JDK8使用的waiter来保存的线程
// 在JDK17中通过父类AbstractOwnableSynchronizer来管理了
abstract static class Node {
// ... 省略部分代码
volatile Node prev; // 上一节点
volatile Node next; // 下一节点
Thread waiter; // 等待的线程
/**
* 节点抢锁状态
* WAITING 1表示待获取锁并在阻塞前重新检查状态准备重试
* CANCELLER 0x80000000 表示取消抢锁
* 16进制,对应二进制1000 0000 0000 0000 0000 0000 0000 0000,在java中表示-1
* 2表示正在等待
*/
volatile int status;
}
public class ConditionObject implements Condition, java.io.Serializable {
// ...省略很多代码
/** First node of condition queue. */
private transient ConditionNode firstWaiter;
/** Last node of condition queue. */
private transient ConditionNode lastWaiter;
}
static final class ConditionNode extends Node
implements ForkJoinPool.ManagedBlocker {
// ...省略部分代码
ConditionNode nextWaiter; // 用来保存下一节点,单向链表没有使用父类的 next作为下一节点
}
// ...省略很多代码
}
AQS与Node搭配实现的双端链表:
AQS、ConditionObject与CondtionNode搭配实现的单向链表:
AQS的核心方法acquire
所有基于AQS的锁,都是通过AQS的acquire方法获取锁,这个方法是final的,不可重写的
这里是摘抄源码,只是增加了一些翻译,最好结合实现来食用,才会得到比较好的效果,比较复杂,可以暂时跳过:
/**
* Main acquire method, invoked by all exported acquire methods.
* 主要的获取锁方法,所有实现类获取锁的方法
* @param node null unless a reacquiring Condition 除非是有设置条件,默认是空的
* @param arg the acquire argument 获取锁的参数
* @param shared true if shared mode else exclusive 共享锁还是排他锁,true为共享锁
* @param interruptible if abort and return negative on interrupt 如果运行时中断锁,将返回负值
* @param timed if true use timed waits 是否定时等待
* @param time if timed, the System.nanoTime value to timeout 如果设置了定时,使用System.nanoTime定义时间
* @return positive if acquired, 0 if timed out, negative if interrupted 如果获取到锁返回正数,如果超时返回0,如果中断返回负数
*/
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly: 判断逻辑
* Check if node now first 判断当前节点是否为首节点
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
// 如果非首节点,找到获取锁node(参数中的node)的前置节点,如果node的前置节点不为空
// 并且AQS的首节点不是当前node的前置节点,这里还会给first赋值,用于下面的首节点判断
// 这里最终要判断的逻辑就是,node不为首节点后的第一个节点,AQS的head节点是虚拟的
// 默认first就是false,所以此处肯定会执行
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status = 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
//如果获取锁成功
if (acquired) {
// 如果是“第一节点”
if (first) {
// 此处是把节点的前置设为空,这样就解决了node对前置节点的引用
node.prev = null;
// 抢到锁了就表示当前节点成为了首节点
head = node;
// 前面赋值的前置节点,通过把next设为空,解除与当前node的引用,
// 此时node的prev不指向pred,pred的next也不指向node,
// pred没有了使用场景,即可帮助GC
pred.next = null;
// node的线程已经获取到了锁,不需要再等待了,把waiter置成空,
// 待此线程执行完后,就可以帮助线程回收
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
// 如果中断,即中断当前线程
current.interrupt();
}
// 拿到锁,返回1,表示成功
return 1;
}
}
// 如果node为空,则创建一个初始node
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
// pred==null表示是新的,新的抢锁没成功,所以尝试排队
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
// 尝试初始化head
tryInitializeHead();
else if (!casTail(t, node)) // 如果不能把node设置为队尾
node.setPrevRelaxed(null); // back out
else
// 把node设为队尾后,t已经不是队尾了,node才是队尾,所以t.next = node
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
// 如果不是新的,但是状态为0,0是一个初始值,表示还需要继续等待下一轮抢锁
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
// 此循环中,如果没有设置超时或中断等待就会有一个持续获取锁的循环
}
// 取消获取锁
return cancelAcquire(node, interrupted, interruptible);
}
其他方法
AQS的hasQueuedThreads方法
此方法的作用是判断当前排队链表中,是否有线程在等待获取锁:
public final boolean hasQueuedThreads() {
// 只要head不等于tail,且tail不为空则一直循环
// 从tail往head循环
for (Node p = tail, h = head; p != h && p != null; p = p.prev)
if (p.status >= 0)
return true; // 只要有线程不为负,即表示有线程在排队
return false;
}
AQS的cleanQueue方法
这个方法是用来整理AQS的链表的,这里也是回收线程,通知下一链表准备的位置之一:
private void cleanQueue() {
for (;;) {
for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
// 本段逻辑中,通过最后的语义,会从tail开始,不断的让q = q.prev
// 所以这段逻辑是从尾到头的反向遍历
// pqs的关系(要记住,后面处理都有关系)
if (q == null || (p = q.prev) == null)
// 只有q为空了,或者q的前面为空了,才会中止清理队列
// 此方法只有这一个退出点
return; // end of list
if (s == null ? tail != q : (s.prev != q || s.status < 0))
// 经过循环,会对s赋值为上一循环的q,所以s不一定为空
// 如果s为空,且tail != q表示上一个循环的q被清除了
// 或者上一循环的q的前置不等于新的q时,表示s与q的关系发生了变化
// 或者s的状态表示要回收了
// 此时s需要被清除,或者重新处理逻辑
// 这些都表示此时链表发生了变化,所以需要跳出此次循环
// 但上一级是无限循环,所以还是会重新开始从尾到头开始处理
break; // inconsistent
if (q.status < 0) { // cancelled
// 这里表示q结点是需要清除的
if ((s == null ? casTail(q, p) : s.casPrev(q, p))
&& q.prev == p) {
// 如果s为空,表示上一循环的s已经被回收了,
// 如果设置成功把尾节点换成p
// 或者s不为空,把s的前置替换为p(s.casPrev(q, p))成功,
// 并且q.prev == p,理论上q.prev == p是成立的
// 那么把p的下一节点替换成s
// 为什么替换成s,因为这里的逻辑是要把q干掉
p.casNext(q, s); // OK if fails
if (p.prev == null)
// 此处的逻辑是通知p的下一个节点
// 如果p的前置为空了,表示p是head,而p此时的next是s
signalNext(p);
}
// 这些都表示此时链表发生了变化,所以需要跳出此次循环
// 但上一级是无限循环,所以还是会重新开始从尾到头开始处理
break;
}
// 因为前面,p为q.prev,所以理论上p.next应该为q,
// 所以如果不相等,就需要作处理
if ((n = p.next) != q) { // help finish
if (n != null && q.prev == p) {
// n不为空,但是q.prev又等于p
// 把p的next设置为q
p.casNext(n, q);
if (p.prev == null)
// 通知q准备
signalNext(p);
}
break;
}
// 对s,q重新赋值,让遍历的p,q,s各向前移一位
s = q;
q = q.prev;
}
}
}
继承者们基于AQS需要实现的重要方法
AQS在获取锁或释放锁的过程中,AQS无法单独完成,因为部分的尝试机制并没有具体实现,而是只有抽象方法,
他们分别是:
方法名 | 语义 | AQS的调用位置 |
---|---|---|
tryAcquire | 尝试获取锁 | 在acquire中调用 |
tryRelease | 尝试释放锁 | 在release中调用 |
tryAcquireShared | 尝试获取共享锁 | 在acquireShared中调用 |
tryReleaseShared | 尝试释放共享锁 | 在releaseShared中调用 |
isHeldExclusively | 是否为独占锁 | 调用的地方较多,主要用于判断 |
ReentrantLock
ReentrantLock是JUC中定义的一个可重入锁,他并没有通过直接继承AQS来实现锁的功能,而是通过定义一个Sync内部来继承AQS来实现锁的功能的,
Sync也是一个抽象类,所以他也不是最终的使用实例,在ReentrantLock中,还分别有FairSync与NonfairSync两个内部静态类继承Sync来实现公平锁与非公平锁,
ReentrantLock使用代理模式通过sync来实现锁的管理,通过构造函数传参决定是公平锁还是非公平锁:
public class ReentrantLock implements Lock, java.io.Serializable {
// ReetrantLock实现锁功能的代理属性
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
// 实现AQS的Sync
}
static final class NonfairSync extends Sync {
// 非公平锁实现
}
static final class FairSync extends Sync {
// 公平锁实现
}
// 两个构造参数,默认是非公平锁实现
public ReentrantLock() {
sync = new NonfairSync();
}
// 有参构造,true为公平锁,false为非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// 通过sync来实现具体的锁管理,此处仅列举一项
public void lock() {
sync.lock();
}
}
ReentrantLock的Sync
Sync实现了tryRelease,但未对tryAcquire进行实现:
abstract static class Sync extends AbstractQueuedSynchronizer {
// 默认的lock方法,ReentrantLock的lock方法就是这里实现的
@ReservedStackAccess
final void lock() {
if (!initialTryLock())
// 如果初始尝试获取不成功,则使用AQS的acquire的重写方法获取锁
acquire(1);
}
/**
* Checks for reentrancy and acquires if lock immediately
* available under fair vs nonfair rules. Locking methods
* perform initialTryLock check before relaying to
* corresponding AQS acquire methods.
* 初次尝试锁定
* 实现类需要实现的方法,用来检查是否可以重入,同时判断锁是否可用
* 实现类需要在调用AQS的acquire获取锁之前进行前置的判断
*/
abstract boolean initialTryLock();
// 实现的尝试释放锁方法
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
// 具体实现
}
// 非公平的尝试获取锁
@ReservedStackAccess
final boolean tryLock() {
// 具体实现
}
}
Sync的两个实现NonfairSync与FairSync
NonfairSync与FairSync的区别在于对Sync的initialTryLock以及AQS的tryAcquire有着些微不同的实现,绝大部分都是相同的:
static final class xxxxSync extends Sync {
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState(); // 公平锁才会判断当前的锁状态
if (c == 0) {
// FairSync会通过hasQueuedThreads()判断是否有其他线程持有锁,而NonfairSync不会
// 同时两种锁都尝试能不能把AQS的state从0变成1,
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
// 如果设置成功,则表示获取锁,设置当前线程拥有锁,并返回获取锁成功
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) {
// 如果state不为0且持有线程为当前线程,进入此逻辑
// 这里是判断有没有存在死循环,这里直接抛出异常
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 这里对state完成了递增,所以通过lock方法加锁多少次,state就会递增多少
setState(c);
return true;
}
return false;
}
/**
* Acquire for non-reentrant cases after initialTryLock prescreen
* 参数 acquires基本都为1
*/
protected final boolean tryAcquire(int acquires) {
// NonfairSync与FairSync的区别与initialTryLock一样,
// FairSync会通过hasQueuedThreads()判断是否有其他线程持有锁
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 如果设置成功,则表示获取锁,设置当前线程拥有锁,并返回获取锁成功
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
ReentrantLock的抢锁(Lock)流程
流程图如下:
主要的函数如下:
// 第一步:ReentrantLock的lock方法其实是使用代理代工,通过sync实现获取锁的
final void lock() {
sync.lock()
}
@ReservedStackAccess
final void lock() {
// 第二步:sync根据具体的实现类,初始化并尝试获取锁
if (!initialTryLock())
// 第三步:如果初始化并获取锁失败,则进行AQS的重写的,简化的获取锁逻辑
acquire(1);
}
// AQS的获取锁的方法acquire, arg = 1
public final void acquire(int arg) {
// 第四步:尝试获取锁,这里是sync的子类FairSync或NonfairSync
if (!tryAcquire(arg))
/** 第五步:尝试获取锁失败,进入到原始的获取锁逻辑,所传参数表示
* node 节点是空的,
* ags表示其实是是用于state重入锁用的
* 三个false分别代表,不是共享锁,不是打断的,永不超时的,最后的超时时间参数没有意义
*/
acquire(null, arg, false, false, false, 0L);
}
xxxSync的initialTryLock方法
NonfairSync与FairSync在方法上只有些微区别,绝大部分实现是一样的:
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 公平锁才会提前判断当前锁是否被占用
// 公平锁才会判断队列中是否有线程在排队,没有线程排队才尝试获取能否通过CAS设置state的值
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
// 设置当前线程为锁持有者
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) { // 如果c != 0,而持有锁的线程就是当前线程
if (++c < 0) // overflow 如果c递增后,变为负数了,表示超过上限2^32-1了,直接抛异常
throw new Error("Maximum lock count exceeded");
// 设置锁的重入,state递增
setState(c);
return true;
}
return false;
}
通过这个方法,也可以看到,无论是公平锁,还是非公平锁,尝试初始化锁,都只是对state的尝试赋值,并没有排队的概念,AQS的head和tail都是空的
xxxSync的tryAcquire方法
NonfairSync与FairSync在方法上只有些微区别,绝大部分实现是一样的:
protected final boolean tryAcquire(int acquires) {
// 公平锁才会判断队列中是否有线程在排队,没有线程排队才尝试获取能否通过CAS设置state的值
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
在ReentrantLock中AQS的acquire方法
根据ReentrantLock中调用的acquire方法的入参,对AQS的原acquire的方法可以清除后可以简化为以下代码:
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
// 取消标记第一个线程时重试
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean first = false;
Node pred = null; // predecessor of node when enqueued
// 以下循环查看顺序请按循环次序来看,只有抢到锁才会退出,异常了才会跳出循环取消锁获取
for (;;) {
// 第一次循环时
// 此时node为空,所有pred也一定为空,所以pred != null一定为false,所以一定不会进入这个逻辑
// 注意,因为这里是pred != null后面接的是&&,在pred != null就停止判断了,
// 所以first不会重新赋值,所以为flase
// 第二次循环
// 此时node虽然不为空了,但是node.prev还是为空的,所有pred依旧为null,
// 所以first依然不会赋值,里面的逻辑也不会执行
// 第三次循环
// 此时node的prev指向的是原来的tail,所以pred指向了原来的tail,
// 而上一次循环中,原tail可能等于head,也有可能不等于head,
// 当tail等于head时,first会赋值为true,但不会进入以下逻辑
// 当tail不等于head,first还是为false,所以会进入以下逻辑
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
// 如果前置节点的status为-1,表示需要取消等待
// 第三次循环如果进入到此逻辑,
// 详细的逻辑就是从尾部一直往前找
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
// 第一次循环
// 此时pred == null,所以一定会进来尝试获取锁
// 第二次循环
// pred依旧为null,所以会再一次尝试获取锁
if (first || pred == null) {
boolean acquired;
try {
// 第六步,所以AQS也是通过实现类的tryAcquire尝试获取锁的
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
// 第一次循环时
// first在上面的逻辑中,因为不会触发重新赋值,所以以下逻辑是不会进入的
// 第二次循环时,first依然不会重新赋值,所以以下逻辑还是不会进入
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
current.interrupt();
}
// 此时返回1,表示抢到锁了,就直接退出了方法
return 1;
}
}
// 第一次循环
// 此时node是空的,这里的逻辑肯定是执行这里,所以走完逻辑后,进入下一次循环
if (node == null) { // allocate; retry before enqueue
// 初始化一个独享节点,这只是一个类型定义,没有具体实现
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
// 第二次循环时,node不为空了,但是pred == null,所以会进入此逻辑
// 这个逻辑是尝试加入队列的操作
// 这里的waiter设置为当前线程
node.waiter = current;
Node t = tail;
// 让node的前置指向AQS的tail
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
// 如果是空的,尝试初始化head
// 里面的逻辑会给head创建一个独享节点,同时把tail也指向head,即链表的头尾相等
tryInitializeHead();
else if (!casTail(t, node)) // 如果tail不为空,就尝试把当前节点设为tail
// 如果设置为tail失败,让node的前置节点指向空,
node.setPrevRelaxed(null); // back out
else
// 如果tail不为空,让tail的next指向当前node
t.next = node;
// 走完逻辑,将进入第三次循环
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
通过代码的分析,可以发现,AQS的acquire方法逻辑,最终的获取锁的实现,还是通过子类实现类来实现的,自身只是定义了对链表的的维护逻辑,包括清理、初始化、节点的增加、通知
ReentrantLock的AQS的cleanQueue方法解析
// 第三次循环如果进入到此逻辑
private void cleanQueue() {
for (;;) { // restart point
// 这里又是一个无限循环,因为有多次进入cleanQuue的可能,为使逻辑的连续性
// 这里的循环采用 父循环 - 当前循环 - 以下循环 的结构说明
for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
if (q == null || (p = q.prev) == null)
// 3-1-1循环q,即tail 肯定不等于 null,所以不会进入此逻辑
// q为空,或q的前置为空,才会中止清理队列
return; // end of list
if (s == null ? tail != q : (s.prev != q || s.status < 0))
// 3-1-1循环的s是为空的,所以这里判断tail != q,但此时q==tail,所以不会进入此逻辑
break; // inconsistent
// 3-1-1循环中,q的status==0,所以不会进入以下逻辑
if (q.status < 0) { // cancelled
// 这里表示q结点是需要清除的
if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && q.prev == p) {
// s一般不为空,所以把p变成tail的前置(s.casPrev(q, p)),
// 理论上q.prev == p是成立的,所以把p的next转为上一个循环的q
p.casNext(q, s); // OK if fails
if (p.prev == null)
// 此处的逻辑是通知p的下一个节点
// 如果p的前置为空了,表示p是head,而p此时的next是s
signalNext(p);
}
break;
}
// 在3-1-1循环中,p = tail.prev, 所以通过赋值n=p.next,此时n == q,所以不会进入以下逻辑
// 经过多此循环后,p.next会不等于q,此时进入逻辑
if ((n = p.next) != q) { // help finish
if (n != null && q.prev == p) {
// 如果n不为tail,及已经循环过大于等于2次了,
// 因为没有进入q.status