概述
谈到并发,不得不谈ReentrantLock
;而谈到ReentrantLock
,不得不谈AbstractQueuedSynchronized(AQS
)!,类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch
…。我们以ReentrantLock
作为讲解切入点。
- AQS 思维导图
AOS详解
什么是AQS
AQS的全称为(AbstractQueuedSynchronizer),是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器
AQS与ReentrantLock的关系
ReentrantLock
会保证do something
在同一时间只有一个线程在执行这段代码,或者说,同一时刻只有一个线程的 lock
方法会返回。其余线程会被挂起,直到获取锁。
从这里可以看出,其实 ReentrantLock
实现的就是一个独占锁的功能:有且只有一个线程获取到锁,其余线程全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。没错,ReentrantLock
使用的就是 AQS 的独占 API 实现的。
1 | ReentrantLock lock = new ReentrantLock(); |
我们来看 lock的源码
1 | // ... |
lock 方法是调用的是 sync.lock()
的方法。而Sync
是 ReentrantLock
的一个继承了AbstractQueuedSynchronizer
的一个内部类
这里的Sync定义了一个抽象类,目的就是要让继承类来实现他的lock方法。ReentrantLock
有公平和非公平的锁机制,所以在该类的代码中,我们可以看到他继承了Lock
1 | abstract static class Sync extends AbstractQueuedSynchronizer { |
继承关系如下
AQS 的原理
所以,具体的同步机制就是Sync的具体实现类来管理的。要了解Sync的原理,我们首先要先了解他的父类 AbstractQueuedSynchronizer
(AQS)
AQS核心思想是
- 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。
- 如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
AQS 的功能可以分为两类:独占功能和共享功能,它的所有子类中,要么实现并使用了它独占功能的 API,要么使用了共享锁的功能,而不会同时使用两套 API,即便是它最有名的子类 ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套 API 来实现的。
AbstractQueuedSynchronizer 分析
AbstractQueuedSynchronizer
类图如下
AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。
1 |
|
AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。
Sync的实现公平锁和非公平锁
公平锁:每个线程抢占锁的顺序为先后调用 lock 方法的顺序依次获取锁,类似于排队吃饭。
非公平锁:每个线程抢占锁的顺序不定,谁运气好,谁就获取到锁,和调用 lock 方法的先后顺序无关。
公平锁的实现
我们来看一下公平锁的代码
1 | static final class FairSync extends Sync { |
上面的代码表示的是 一个 volatile
修饰的标志位叫做key
,用来表示有没有线程拿走了锁(tryAcquire
),当锁不存在的时候,将线程放到一个安全的队列,队列里维护一堆被挂起的线程,当锁被归还时,能通知到这些被挂起的线程,可以来竞争获取锁了。
-
获取到锁的情景
我们来看tryAcquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30// AbstractQueuedSynchronizer 留空了,需要让子类去实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// FairSync 子类实现的 获取锁的方法
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
int c = getState(); // 拿到标志位
if (c == 0) {
//如果队列中没有其他线程 说明没有线程正在占有锁!
if (!hasQueuedPredecessors() &&
//修改一下状态位,注意:这里的 acquires 是在 lock 的时候传递来的,从上面的图中可以知道,这个值是写死的 1
compareAndSetState(0, acquires)) {
// 如果通过 CAS 操作将状态为更新成功则代表当前线程获取锁,因此,将当前线程设置到 AQS 的一个变量中,说明这个线程拿走了锁。
setExclusiveOwnerThread(current);
return true;
}
}
// 如果不为 0 意味着,锁已经被拿走了,但是,因为 ReentrantLock 是重入锁,是可以重复 lock,unlock 的,只要成对出现行。一次。这里还要再判断一次 获取锁的线程是不是当前请求锁的线程。
else if (current == getExclusiveOwnerThread()) {
// 如果是的,累加在 state 字段上就可以了。
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
} -
没有获取到锁的情景
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 如果 已经有一个线程获取到了锁,但是没有线程排队
// 即 AQS head == tail == null
// 这里就是 tail == null 因此 这里执行enq方法
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 表示目前没有线程争抢 pred 节点 如果有 则执行enq方法
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 因为end 发生 addWaiter 当中,必定是获取锁失败的情况下
// 所以为tail 为空 表示有线程获取到了锁,所以这里新建了一个
// 当head 为空的时候 head 会新建一个Node 节点 治理 head = tail = Node(empty)
// Node 节点 然后在 acquireQueued 方法中获取到锁的线程set到了head 当中 完成更新
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 当首节点为新建好之后 会变成如下 Node(empty) -> <- Node 这样
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
} -
AQS 节点的变化
-
当我们只有一个线程进行加锁的时候 如果加锁成功,即是
tryAcquire
进行hasQueuedPredecessors
的判断。1
2
3
4
5
6
7
8
9
10
11
12
13public final boolean hasQueuedPredecessors() {
// 首个线程进行竞争的时候, AQS 的tail 和 head 均为空 则会走判断 h!=t 则表示 加锁成功,直接返回。
// 当第二个线程来竞争的时候 发现法 statue 不是 0, 则直接进入大 addWaiter 的排队进程中
// 当 head == tail 的时候 证明 头结点已经有一个 在排队了
// 这里有个问题 h <-> t h -> next == null 为什么表示 不需要排队,
// 这里涉及到解锁的过程
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
} -
用当前线程去构造一个
Node
对象,mode
是一个表示Node
类型的字段,仅仅表示这个节点是独占的,还是共享的,或者说,AQS 的这个队列中,哪些节点是独占的,哪些是共享的。(这里 lock 调用的是 AQS 独占的 API,当然,可以写死是独占状态的节点。) -
创建好节点后,将节点加入到队列尾部,此处,在队列不为空的时候,先尝试通过 cas 方式修改尾节点为最新的节点,如果修改失败,意味着有并发,这个时候才会进入 enq 中死循环,“自旋”方式修改。
-
将线程的节点接入到队里中后,当然还需要做一件事: 将当前线程挂起!这个事,由 acquireQueued 来做。
-
-
Node节点类型
在解释acquireQueued
之前,我们需要先看下 AQS 中队列的内存结构,我们知道,队列由 Node 类型的节点组成,其中至少有两个变量,一个封装线程,一个封装节点类型。1
2
3
4Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode; // 节点类型
this.thread = thread; // 封装的线程
}而实际上,它的内存结构是这样的(第一次节点插入时,第一个节点是一个空节点,代表有一个线程已经获取锁,事实上,队列的第一个节点就是代表持有锁的节点):
黄色节点为队列默认的头节点,每次有线程竞争失败,进入队列后其实都是插入到队列的尾节点(tail 后面)后面。这个从 enq 方法可以看出来,上文中有提到 enq 方法为将节点插入队列的方法:
-
循环获取锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
/*
仔细看看这个方法是个无限循环,感觉如果p == head && tryAcquire(arg)条件不满足循环将永远无法结束,当然不会出现死循环,奥秘在于第12行的parkAndCheckInterrupt会把当前线程挂起,从而阻塞住线程的调用栈。
*/
for (;;) {
// 这里的 表示 他的上一个节点 如果排队成功 那么 node 的上一个节点应该是有的 头结点 始终是个Node(empty)
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 如果当前的节点是 head 说明他是队列中第一个“有效的”节点,因此尝试获取,上文中有提到这个类是交给子类去扩展的。
setHead(node);// 成功后,将上图中的黄色节点移除,Node1 变成头节点。 也就是将头结点变成 empty
p.next = null; // help GC
failed = false;
return interrupted; // 这里返回 false 表示上层方法不需要改变 当前线程的 interrupted 状态
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 否则,检查前一个节点的状态为,看当前获取锁失败的线程是否需要挂起。
parkAndCheckInterrupt())
/*
上面的方法是调用 LockSupport.park(this)
如前面所述,LockSupport.park最终把线程交给系统(Linux)内核进行阻塞。当然也不是马上把请求不到锁的线程进行阻塞,还要检查该线程的状态,比如如果该线程处于Cancel状态则没有必要,具体的检查在shouldParkAfterFailedAcquire中
*/
interrupted = true;
}
} finally {
if (failed) // 如果有异常
cancelAcquire(node);// 取消请求,对应到队列操作,就是将当前节点从队列中移除。
}
}Node
节点中,除了存储当前线程,节点类型,队列中前后元素的变量,还有一个叫waitStatus
的变量,改变量用于描述节点的状态,为什么需要这个状态呢?原因是:
- AQS 的队列中,在有并发时,肯定会存取一定数量的节点,每个节点代表了一个线程的状态,有的线程可能“等不及”获取锁了,需要放弃竞争,退出队列,有的线程在等待一些条件满足,满足后才恢复执行(这里的描述很像某个 J.U.C 包下的工具类,ReentrankLock 的 Condition,事实上,Condition 同样也是 AQS 的子类)等等,总之,各个线程有各个线程的状态,但总需要一个变量来描述它,这个变量就叫 waitStatus, 它有四种状态:
1
2
3
4
5
6
7
8
9// 0 代表无状态
/** 节点取消 因为超时或中断,该线程已经被取消 */
static final int CANCELLED = 1;
/** 节点等待触发 线程的后继线程正/已被阻塞,当该线程release或cancel时要重新这个后继线程(unpark)*/
static final int SIGNAL = -1;
/** 等待条件触发 表明该线程被处于条件队列,就是因为调用了>- Condition.await而被阻塞 */
static final int CONDITION = -2;
/** 节点状态需要向后传播 传播共享锁*/
static final int PROPAGATE = -3;只有当前节点的前一个节点为
SIGNAL
时,才能当前节点才能被挂起。- 规则1:如果前继的节点状态为
SIGNAL
,表明当前节点需要unpark
,则返回成功,此时acquireQueued
方法的第12行(parkAndCheckInterrupt
)将导致线程阻塞 - 规则2:如果前继节点状态为
CANCELLED
(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回false,acquireQueued
方法的无限循环将递归调用该方法,直至规则1返回true,导致线程阻塞 - 规则3:如果前继节点状态为非
SIGNA
L、非CANCELLED
,则设置前继的状态为SIGNAL
,返回false后进入acquireQueued
的无限循环,与规则2同 - 总体看来,
shouldParkAfterFailedAcquire
就是靠前继节点判断当前线程是否应该被阻塞,如果前继节点处于CANCELLED
状态,则顺便删除这些节点重新构造队列。
我们来看一下代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前继的节点状态为`SIGNAL`,表明当前节点需要`unpark` 返回 true
return true;
if (ws > 0) {
// 如果前继节点状态为`CANCELLED`(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,
// 返回false,`acquireQueued`方法的无限循环将递归调用该方法,直至规则1返回true,导致线程阻塞
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前继节点状态为非`SIGNA`L、非`CANCELLED`,则设置前继的状态为`SIGNAL`,返回false后进入`acquireQueued`的无限循环,
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
到此为止,一个线程对于锁的一次竞争才告于段落,结果有两种
- 要么成功获取到锁(不用进入到 AQS 队列中
- 要么,获取失败,被挂起,等待下次唤醒后继续循环尝试获取锁
值得注意的是,AQS 的队列为 FIFO 队列,所以,每次被 CPU 假唤醒,且当前线程不是出在头节点的位置,也是会被挂起的。AQS 通过这样的方式,实现了竞争的排队策略
-
释放锁
我们来看一下释放锁的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29// AbstractQueuedSynchronizer 当中定义的
public final boolean release(int arg) {
// 同样 release 由实现类来实现 这里用是Sync抽象类的实现
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 释放锁,成功后,找到 AQS 的头节点,并唤醒它即可:
unparkSuccessor(h);
return true;
}
return false;
}
// Sync 的实现
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常。
// 这段可以理解为没有 lock 直接unlock 则会直接报错
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 因为是重入的关系,不是每次释放锁 c 都等于 0,直到最后一次释放锁时,才通知 AQS 不需要再记录哪个线程正在获取锁。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 更新状态
setState(c);
return free;
}当发现 锁已经需要释放的时候,我们调用
unparkSuccessor
寻找的顺序是从队列尾部开始往前去找的最前面的一个 waitStatus 小于 0 的节点。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 从尾到头寻找最后一个节点 表示 s是个沉睡的节点
if (s == null || s.waitStatus > 0) {
s = null;
// 找到第一个阻塞的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
到此,ReentrantLock
的 lock
和 unlock
方法已经基本解析完毕了,唯独还剩下一个非公平锁 NonfairSync
没说,其实,它和公平锁的唯一区别就是获取锁的方式不同,一个是按前后顺序一次获取锁,一个是抢占式的获取锁,那 ReentrantLock 是怎么实现的呢?再看两段代码:
非公平锁的实现
非公平锁的 lock 方法的处理方式是: 在 lock
的时候先直接 cas
修改一次 state
变量(尝试获取锁),成功就返回,不成功再排队,从而达到不排队直接抢占的目的。
1 | static final class NonfairSync extends Sync { |
参考
- 深度解析 Java 8:AbstractQueuedSynchronizer 的实现分析(上)
- 深度解析 Java 8:AbstractQueuedSynchronizer 的实现分析(下)
- java AQS的实现原理
- 深入学习java同步器AQS
- Java并发之AQS详解
- Java 多线程 AQS
- 【死磕Java并发】—–J.U.C之AQS(一篇就够了)
- JDK源码系列 AbstractQueuedSynchronizer源码剖析
赞赏一下