JAVA多线程之AQS分析(1)

ReentrantLock的源码分析独占锁的实现

Posted by Jason Lee on 2019-04-21

概述

谈到并发,不得不谈ReentrantLock;而谈到ReentrantLock,不得不谈AbstractQueuedSynchronized(AQS)!,类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch…。我们以ReentrantLock作为讲解切入点。

  • AQS 思维导图

AOS详解

什么是AQS

AQS的全称为(AbstractQueuedSynchronizer),是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器

AQS与ReentrantLock的关系

ReentrantLock 会保证do something在同一时间只有一个线程在执行这段代码,或者说,同一时刻只有一个线程的 lock 方法会返回。其余线程会被挂起,直到获取锁。

从这里可以看出,其实 ReentrantLock 实现的就是一个独占锁的功能:有且只有一个线程获取到锁,其余线程全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。没错,ReentrantLock 使用的就是 AQS 的独占 API 实现的。

1
2
3
4
ReentrantLock lock = new ReentrantLock();
lock.lock();
// do somethings
lock.unlock();

我们来看 lock的源码

1
2
3
4
5
// ...
private final Sync sync;
public void lock() {
sync.lock();
}

lock 方法是调用的是 sync.lock() 的方法。而SyncReentrantLock 的一个继承了AbstractQueuedSynchronizer 的一个内部类
这里的Sync定义了一个抽象类,目的就是要让继承类来实现他的lock方法。ReentrantLock 有公平和非公平的锁机制,所以在该类的代码中,我们可以看到他继承了Lock

1
2
3
4
5
6
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
// ...
}
static final class NonfairSync extends Sync {...}
static final class FairSync extends Sync {

继承关系如下

AQS 的原理

所以,具体的同步机制就是Sync的具体实现类来管理的。要了解Sync的原理,我们首先要先了解他的父类 AbstractQueuedSynchronizer (AQS)

AQS核心思想是

  • 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。
  • 如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

AQS 的功能可以分为两类:独占功能共享功能,它的所有子类中,要么实现并使用了它独占功能的 API,要么使用了共享锁的功能,而不会同时使用两套 API,即便是它最有名的子类 ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套 API 来实现的。

AbstractQueuedSynchronizer 分析

AbstractQueuedSynchronizer 类图如下

AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
static final class Node {/*... */}
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;//同步状态
/*
状态信息通过protected类型的getState,setState,compareAndSetState进行操作返回同步状态的当前值
*/
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。

Sync的实现公平锁和非公平锁

公平锁:每个线程抢占锁的顺序为先后调用 lock 方法的顺序依次获取锁,类似于排队吃饭。

非公平锁:每个线程抢占锁的顺序不定,谁运气好,谁就获取到锁,和调用 lock 方法的先后顺序无关。

公平锁的实现

我们来看一下公平锁的代码

1
2
3
4
5
6
7
8
9
10
11
12
static final class FairSync extends Sync {
// ...
final void lock() {
acquire(1); // 在AbstractQueuedSynchronizer 类当中实现
}
// ...
// 在AbstractQueuedSynchronizer 实现的
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

上面的代码表示的是 一个 volatile 修饰的标志位叫做key ,用来表示有没有线程拿走了锁(tryAcquire),当锁不存在的时候,将线程放到一个安全的队列,队列里维护一堆被挂起的线程,当锁被归还时,能通知到这些被挂起的线程,可以来竞争获取锁了。

  • 获取到锁的情景
    我们来看 tryAcquire

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // AbstractQueuedSynchronizer 留空了,需要让子类去实现
    protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
    }
    // FairSync 子类实现的 获取锁的方法
    protected final boolean tryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    int c = getState(); // 拿到标志位
    if (c == 0) {
    //如果队列中没有其他线程 说明没有线程正在占有锁!
    if (!hasQueuedPredecessors() &&
    //修改一下状态位,注意:这里的 acquires 是在 lock 的时候传递来的,从上面的图中可以知道,这个值是写死的 1
    compareAndSetState(0, acquires)) {
    // 如果通过 CAS 操作将状态为更新成功则代表当前线程获取锁,因此,将当前线程设置到 AQS 的一个变量中,说明这个线程拿走了锁。
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    // 如果不为 0 意味着,锁已经被拿走了,但是,因为 ReentrantLock 是重入锁,是可以重复 lock,unlock 的,只要成对出现行。一次。这里还要再判断一次 获取锁的线程是不是当前请求锁的线程。
    else if (current == getExclusiveOwnerThread()) {
    // 如果是的,累加在 state 字段上就可以了。
    int nextc = c + acquires;
    if (nextc < 0)
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }
  • 没有获取到锁的情景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 如果 已经有一个线程获取到了锁,但是没有线程排队
    // 即 AQS head == tail == null
    // 这里就是 tail == null 因此 这里执行enq方法
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    // 表示目前没有线程争抢 pred 节点 如果有 则执行enq方法
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    enq(node);
    return node;
    }
    private Node enq(final Node node) {
    for (;;) {
    Node t = tail;
    if (t == null) {
    // 因为end 发生 addWaiter 当中,必定是获取锁失败的情况下
    // 所以为tail 为空 表示有线程获取到了锁,所以这里新建了一个
    // 当head 为空的时候 head 会新建一个Node 节点 治理 head = tail = Node(empty)
    // Node 节点 然后在 acquireQueued 方法中获取到锁的线程set到了head 当中 完成更新
    if (compareAndSetHead(new Node()))
    tail = head;
    } else {
    // 当首节点为新建好之后 会变成如下 Node(empty) -> <- Node 这样
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }
  • AQS 节点的变化

    • 当我们只有一个线程进行加锁的时候 如果加锁成功,即是tryAcquire 进行 hasQueuedPredecessors 的判断。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      public final boolean hasQueuedPredecessors() {
      // 首个线程进行竞争的时候, AQS 的tail 和 head 均为空 则会走判断 h!=t 则表示 加锁成功,直接返回。
      // 当第二个线程来竞争的时候 发现法 statue 不是 0, 则直接进入大 addWaiter 的排队进程中

      // 当 head == tail 的时候 证明 头结点已经有一个 在排队了
      // 这里有个问题 h <-> t h -> next == null 为什么表示 不需要排队,
      // 这里涉及到解锁的过程
      Node t = tail;
      Node h = head;
      Node s;
      return h != t &&
      ((s = h.next) == null || s.thread != Thread.currentThread());
      }
    • 用当前线程去构造一个 Node 对象,mode 是一个表示 Node 类型的字段,仅仅表示这个节点是独占的,还是共享的,或者说,AQS 的这个队列中,哪些节点是独占的,哪些是共享的。(这里 lock 调用的是 AQS 独占的 API,当然,可以写死是独占状态的节点。)

    • 创建好节点后,将节点加入到队列尾部,此处,在队列不为空的时候,先尝试通过 cas 方式修改尾节点为最新的节点,如果修改失败,意味着有并发,这个时候才会进入 enq 中死循环,“自旋”方式修改。

    • 将线程的节点接入到队里中后,当然还需要做一件事: 将当前线程挂起!这个事,由 acquireQueued 来做。

  • Node节点类型
    在解释 acquireQueued 之前,我们需要先看下 AQS 中队列的内存结构,我们知道,队列由 Node 类型的节点组成,其中至少有两个变量,一个封装线程,一个封装节点类型。

    1
    2
    3
    4
    Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode; // 节点类型
    this.thread = thread; // 封装的线程
    }

    而实际上,它的内存结构是这样的(第一次节点插入时,第一个节点是一个空节点,代表有一个线程已经获取锁,事实上,队列的第一个节点就是代表持有锁的节点):

    黄色节点为队列默认的头节点,每次有线程竞争失败,进入队列后其实都是插入到队列的尾节点(tail 后面)后面。这个从 enq 方法可以看出来,上文中有提到 enq 方法为将节点插入队列的方法:

  • 循环获取锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    /*
    仔细看看这个方法是个无限循环,感觉如果p == head && tryAcquire(arg)条件不满足循环将永远无法结束,当然不会出现死循环,奥秘在于第12行的parkAndCheckInterrupt会把当前线程挂起,从而阻塞住线程的调用栈。
    */
    for (;;) {
    // 这里的 表示 他的上一个节点 如果排队成功 那么 node 的上一个节点应该是有的 头结点 始终是个Node(empty)
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    // 如果当前的节点是 head 说明他是队列中第一个“有效的”节点,因此尝试获取,上文中有提到这个类是交给子类去扩展的。
    setHead(node);// 成功后,将上图中的黄色节点移除,Node1 变成头节点。 也就是将头结点变成 empty
    p.next = null; // help GC
    failed = false;
    return interrupted; // 这里返回 false 表示上层方法不需要改变 当前线程的 interrupted 状态
    }

    if (shouldParkAfterFailedAcquire(p, node) &&
    // 否则,检查前一个节点的状态为,看当前获取锁失败的线程是否需要挂起。
    parkAndCheckInterrupt())
    /*
    上面的方法是调用 LockSupport.park(this)
    如前面所述,LockSupport.park最终把线程交给系统(Linux)内核进行阻塞。当然也不是马上把请求不到锁的线程进行阻塞,还要检查该线程的状态,比如如果该线程处于Cancel状态则没有必要,具体的检查在shouldParkAfterFailedAcquire中
    */
    interrupted = true;
    }
    } finally {
    if (failed) // 如果有异常
    cancelAcquire(node);// 取消请求,对应到队列操作,就是将当前节点从队列中移除。
    }
    }

    Node 节点中,除了存储当前线程,节点类型,队列中前后元素的变量,还有一个叫 waitStatus 的变量,改变量用于描述节点的状态,为什么需要这个状态呢?

    原因是:

    • AQS 的队列中,在有并发时,肯定会存取一定数量的节点,每个节点代表了一个线程的状态,有的线程可能“等不及”获取锁了,需要放弃竞争,退出队列,有的线程在等待一些条件满足,满足后才恢复执行(这里的描述很像某个 J.U.C 包下的工具类,ReentrankLock 的 Condition,事实上,Condition 同样也是 AQS 的子类)等等,总之,各个线程有各个线程的状态,但总需要一个变量来描述它,这个变量就叫 waitStatus, 它有四种状态:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 0 代表无状态
    /** 节点取消 因为超时或中断,该线程已经被取消 */
    static final int CANCELLED = 1;
    /** 节点等待触发 线程的后继线程正/已被阻塞,当该线程release或cancel时要重新这个后继线程(unpark)*/
    static final int SIGNAL = -1;
    /** 等待条件触发 表明该线程被处于条件队列,就是因为调用了>- Condition.await而被阻塞 */
    static final int CONDITION = -2;
    /** 节点状态需要向后传播 传播共享锁*/
    static final int PROPAGATE = -3;

    只有当前节点的前一个节点为 SIGNAL 时,才能当前节点才能被挂起。

    • 规则1:如果前继的节点状态为SIGNAL,表明当前节点需要unpark,则返回成功,此时acquireQueued方法的第12行(parkAndCheckInterrupt)将导致线程阻塞
    • 规则2:如果前继节点状态为CANCELLED(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回false,acquireQueued方法的无限循环将递归调用该方法,直至规则1返回true,导致线程阻塞
    • 规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,返回false后进入acquireQueued的无限循环,与规则2同
    • 总体看来,shouldParkAfterFailedAcquire就是靠前继节点判断当前线程是否应该被阻塞,如果前继节点处于CANCELLED状态,则顺便删除这些节点重新构造队列。

    我们来看一下代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
    // 如果前继的节点状态为`SIGNAL`,表明当前节点需要`unpark` 返回 true
    return true;
    if (ws > 0) {
    // 如果前继节点状态为`CANCELLED`(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,
    // 返回false,`acquireQueued`方法的无限循环将递归调用该方法,直至规则1返回true,导致线程阻塞
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    // 如果前继节点状态为非`SIGNA`L、非`CANCELLED`,则设置前继的状态为`SIGNAL`,返回false后进入`acquireQueued`的无限循环,
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }

到此为止,一个线程对于锁的一次竞争才告于段落,结果有两种

  • 要么成功获取到锁(不用进入到 AQS 队列中
  • 要么,获取失败,被挂起,等待下次唤醒后继续循环尝试获取锁

值得注意的是,AQS 的队列为 FIFO 队列,所以,每次被 CPU 假唤醒,且当前线程不是出在头节点的位置,也是会被挂起的。AQS 通过这样的方式,实现了竞争的排队策略

  • 释放锁

    我们来看一下释放锁的代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    // AbstractQueuedSynchronizer 当中定义的
    public final boolean release(int arg) {
    // 同样 release 由实现类来实现 这里用是Sync抽象类的实现
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    // 释放锁,成功后,找到 AQS 的头节点,并唤醒它即可:
    unparkSuccessor(h);
    return true;
    }
    return false;
    }
    // Sync 的实现
    protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常。
    // 这段可以理解为没有 lock 直接unlock 则会直接报错
    if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
    boolean free = false;
    // 因为是重入的关系,不是每次释放锁 c 都等于 0,直到最后一次释放锁时,才通知 AQS 不需要再记录哪个线程正在获取锁。
    if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);
    }
    // 更新状态
    setState(c);
    return free;
    }

    当发现 锁已经需要释放的时候,我们调用 unparkSuccessor 寻找的顺序是从队列尾部开始往前去找的最前面的一个 waitStatus 小于 0 的节点。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    // 从尾到头寻找最后一个节点 表示 s是个沉睡的节点
    if (s == null || s.waitStatus > 0) {
    s = null;
    // 找到第一个阻塞的节点
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    LockSupport.unpark(s.thread);
    }

到此,ReentrantLocklockunlock 方法已经基本解析完毕了,唯独还剩下一个非公平锁 NonfairSync 没说,其实,它和公平锁的唯一区别就是获取锁的方式不同,一个是按前后顺序一次获取锁,一个是抢占式的获取锁,那 ReentrantLock 是怎么实现的呢?再看两段代码:

非公平锁的实现

非公平锁的 lock 方法的处理方式是: 在 lock 的时候先直接 cas 修改一次 state 变量(尝试获取锁),成功就返回,不成功再排队,从而达到不排队直接抢占的目的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);

}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// Sync 的非公平锁获取方式
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 抢占式获取锁 这里是和公平锁的区别
if (/*!hasQueuedPredecessors() &&*/compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

参考



支付宝打赏 微信打赏

赞赏一下