抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

摘要:本文主要了解了JUC中的AQS抽象类。

环境

Windows 10 企业版 LTSC 21H2
Java 1.8

1 简介

1.1 定义

AQS(AbstractQueuedSynchronizer,抽象的队列同步器)定义了一套多线程访问共享资源的同步器框架,是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石。

1.2 抽象

AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。

1.3 原理

AQS最重要的组成部分是FIFO队列和state变量,通过FIFO队列来完成资源获取线程的排队工作,通过state整型变量表示持有锁的状态:
20250725092231-原理

抢到资源的线程直接处理业务逻辑,抢不到资源的线程进入等待队列,这个队列是CLH队列的变体。队列将请求共享资源的线程封装成队列的Node结点,通过CAS自旋以及LockSupport的凭证机制,维护state变量的状态,使并发达到同步的控制效果。

CLH(Craig Landin Hagersten,三个科学家名字)队列的原版是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO,头节点在初始化后变为空节点。

2 架构

源码:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
// 内部封装Node节点
static final class Node {
// 标记线程以共享的模式等待锁
static final Node SHARED = new Node();
// 标记线程以独占的模式等待锁
static final Node EXCLUSIVE = null;
// waitStatus取值为1表示线程取消,即超时和中断,被取消的节点不会阻塞
static final int CANCELLED = 1;
// waitStatus取值为-1表示后继节点已经准备完成,等待线程释放资源
static final int SIGNAL = -1;
// waitStatus取值为-2表示线程在Condition队列中阻塞,当其他线程调用了Condition中的唤醒方法后,将节点从Condition队列转移到CLH等待队列
static final int CONDITION = -2;
// waitStatus取值为-3表示线程及后续线程无条件传播
static final int PROPAGATE = -3;
// 线程的等待状态,初始值为0
volatile int waitStatus;
// 前驱节点
volatile Node prev
// 后继节点
volatile Node next;
// 线程对象
volatile Thread thread;
...
}
// 头节点
private transient volatile Node head
// 尾节点
private transient volatile Node tail;
// 资源状态,等于0表示可获取,大于等于1表示已占用
private volatile int state;
// 获取资源状态
protected final int getState() {
return state;
}
// 设置资源状态
protected final void setState(int newState) {
state = newState;
}
// CAS设置资源状态
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
...
}

AQS使用volatile修饰的state整型变量表示同步状态,通过CLH同步队列来完成线程的排队工作。

对state值的修改是通过CAS完成的,当前线程根据state的值判断能否获取资源。如果获取失败,AQS会将当前线程thread以及等待状态waitStatus等信息封装成Node节点,并将其加CLH入同步队列,同时阻塞当前线程。当state的值变为可获取资源后,会把Node节点中的线程唤醒,再次尝试获取资源。

3 逻辑

3.1 原理

Lock接口的实现类基本都是通过聚合队列同步器的子类完成线程访问控制。

源码:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class ReentrantLock implements Lock, java.io.Serializable {
...
abstract static class Sync extends AbstractQueuedSynchronizer {
...
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
...
}
static final class NonfairSync extends Sync {
...
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
...
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
...
}

ReentrantLock类的内部聚合了Sync类,Sync类继承自AQS类,并且非公平锁NonfairSync和公平锁FairSync都继承自Sync,默认创建的是非公平锁NonfairSync。

3.2 阶段

整个ReentrantLock的加锁过程,可以分为三个阶段:

  • 尝试加锁。
  • 加锁失败,线程入队列。
  • 线程入队列后,进入阻赛状态。

3.3 举例

三个客户在银行办理业务,使用默认的非公平锁:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public static void main(String[] args) {
Lock lock = new ReentrantLock();
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "办理业务");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "离开");
} finally {
lock.unlock();
}
}, "A").start();
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "办理业务");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "离开");
} finally {
lock.unlock();
}
}, "B").start();
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "办理业务");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "离开");
} finally {
lock.unlock();
}
}, "C").start();
}

3.4 分析

3.4.1 线程A获取资源

线程A进入,调用lock()方法:

java
1
2
3
4
5
6
7
8
9
final void lock() {
// 使用CAS设置state为1
if (compareAndSetState(0, 1))
// 表示获取资源成功,将当前线程设为占用线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 表示获取资源失败,继续抢占资源
acquire(1);
}

因为线程A是第一个获取资源的线程,所以获取资源成功,将当前线程设为占用线程,执行业务。

3.4.2 线程B获取资源

线程B进入,调用lock()方法。

因为线程B是第二个获取资源的线程,线程A已经将state从0改为了1,所以使用compareAndSetState()方法设置失败。

3.4.3 线程B抢占资源

线程B获取失败,调用acquire()方法抢占资源:

java
1
2
3
4
5
6
7
8
public final void acquire(int arg) {
// 抢占资源
if (!tryAcquire(arg) &&
// 加入等待队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 线程阻塞
selfInterrupt();
}

调用tryAcquire()方法抢占资源:

java
1
2
3
4
protected final boolean tryAcquire(int acquires) {
// 调用非公平锁的尝试抢占方法
return nonfairTryAcquire(acquires);
}

调用非公平锁的nonfairTryAcquire()方法,返回false表示占用失败:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final boolean nonfairTryAcquire(int acquires) {
// 记录当前线程
final Thread current = Thread.currentThread();
// 记录当前资源状态
int c = getState();
// 0表示当前资源可用
if (c == 0) {
// 使用CAS设置state为请求数
if (compareAndSetState(0, acquires)) {
// 表示获取资源成功,将当前线程设为占用线程
setExclusiveOwnerThread(current);
return true;
}
}
// 大于等于1表示当前资源被占用,判断当前线程是否为占用线程(可重入锁的情况)
else if (current == getExclusiveOwnerThread()) {
// 当前线程为占用线程,记录资源状态
int nextc = c + acquires
// 判断是否溢出
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置state为新的资源状态
setState(nextc);
return true;
}
return false;
}

因为线程B是第二个获取资源的线程,线程A已经将state从0改为了1,所以使用nonfairTryAcquire()方法占用失败。

3.4.4 线程B等待

线程B抢占失败,调用addWaiter()方法将当前线程加入等待队列:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node addWaiter(Node mode) {
// 将当前线程和传入的独占模式封装为节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 尾节点不为空,表示CLH队列已经初始化,CAS操作将当前节点设为尾节点
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾节点为空,表示CLH队列还未初始化,初始化队列
enq(node);
return node;
}

因为线程B是第一个进入等待的线程,尾节点为空,所以使用enq()方法初始化队列:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 尾节点为空,通过CAS设置头节点和尾节点为空节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 尾节点不为空,通过CAS将当前节点作为新的尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

初始化CLH队列后,头节点为空节点,尾节点为当前节点。

3.4.5 线程B阻塞

线程B得到当前节点后,作为参数传入acquireQueued()方法加入CLH队列:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
final boolean acquireQueued(final Node node, int arg) {
// 记录当前节点是否取消,默认为true,表示取消
boolean failed = true;
try {
// 标记当前节点是否中断,默认为false,表示当前节点没有中断
boolean interrupted = false
// 自旋
for (;;) {
// 获取当前节点的上一节点
final Node p = node.predecessor();
// 如果上一节点是头节点,表示当前节点即将被唤醒,尝试抢占资源
if (p == head && tryAcquire(arg)) {
// 将当前节点设为新头节点,置空当前节点的上一节点,并取消当前节点同当前线程的绑定
setHead(node);
// 将原头节点的下一节点置空,方便GC回收
p.next = null; // help GC
// 标记当前节点为false,表示没有取消
failed = false;
// 返回false,表示当前节点没有中断
return interrupted;
}
// 如果上一节点不是头节点,或者抢占资源失败,处理上一节点并阻塞当前节点
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 标记为true,表示当前节点中断
interrupted = true;
}
} finally {
// 当前节点如果被取消,执行取消操作
if (failed)
cancelAcquire(node);
}
}

因为线程B是第一个进入等待的线程,上一节点为头节点,调用tryAcquire()方法尝试获取资源。获取成功则将当前节点作为头节点并移除上一节点,获取失败则阻塞。

获取资源失败调用shouldParkAfterFailedAcquire()方法处理上一节点:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 记录上一节点的等待状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果上一节点的等待状态为-1,表示当前线程可以被阻塞,返回true,继续判断
return true;
if (ws > 0) {
// 如果上一节点的等待状态为1,表示上一节点被取消,循环移除被取消的上一节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 上述条件不满足,表示上一节点的等待状态为0或者-3,通过CAS将等待状态设置为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 返回false,重新进入自旋
return false;
}

因为线程B是第一个进入等待的线程,上一节点为头节点,头节点为空节点,等待状态为0,所以两次进入shouldParkAfterFailedAcquire()方法:

  • 第一次进入将上一节点的等待状态设置为-1后返回false,条件判断为false重新进入自旋。
  • 第二次进入检测到上一节点的等待状态为-1,返回true,调用parkAndCheckInterrupt()方法。

调用parkAndCheckInterrupt()方法阻塞当前节点:

java
1
2
3
4
5
6
private final boolean parkAndCheckInterrupt() {
// 阻塞当前节点
LockSupport.park(this);
// 返回线程的中断状态
return Thread.interrupted();
}

3.4.6 线程C获取资源

线程C进入,调用lock()方法。

因为线程C是第三个获取资源的线程,线程A已经将state从0改为了1,所以使用compareAndSetState()方法设置失败。

3.4.7 线程C抢占资源

线程C获取失败,调用acquire()方法抢占资源,源码同上。

调用tryAcquire()方法抢占资源,源码同上。

调用非公平锁的nonfairTryAcquire()方法,返回false表示占用失败,源码同上。

因为线程C是第三个获取资源的线程,线程A已经将state从0改为了1,所以使用nonfairTryAcquire()方法占用失败。

3.4.8 线程C等待

线程C抢占失败,调用addWaiter()方法将当前线程加入等待队列,源码同上。

因为线程C是第二个进入等待的线程,线程B已经完成了队列初始化,尾节点不为空,使用CAS将当前节点作为新的尾节点。

3.4.9 线程C阻塞

线程C得到当前节点后,作为参数传入acquireQueued()方法加入CLH队列,源码同上。

因为线程C是第二个进入等待的线程,上一节点为B节点,B节点不是头节点,不能获取资源。

调用shouldParkAfterFailedAcquire()方法处理上一节点,源码同上。

因为线程C是第二个进入等待的线程,上一节点为B节点,等待状态为-1,返回true,调用parkAndCheckInterrupt()方法。

调用parkAndCheckInterrupt()方法阻塞当前节点,源码同上。

3.4.10 线程A执行完毕

线程A执行完毕,调用unlock()方法:

java
1
2
3
public void unlock() {
sync.release(1);
}

调用release()方法:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final boolean release(int arg) {
// 调用tryRelease()方法尝试释放资源
if (tryRelease(arg)) {
// 获取头节点
Node h = head;
// 如果头节点不为空,并且等待状态不为0,表示需要唤醒其他线程
if (h != null && h.waitStatus != 0)
// 唤醒线程
unparkSuccessor(h);
return true;
}
// 释放失败返回false
return false;
}

3.4.11 线程A释放资源

调用tryRelease()方法释放资源:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected final boolean tryRelease(int releases) {
// 记录资源状态
int c = getState() - releases;
// 如果当前线程不为占用线程则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 标记资源空闲,默认为false
boolean free = false
// 资源状态为0则标记资源空闲为true,并将占用线程置空
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 设置资源状态
setState(c);
// 返回资源空闲
return free;
}

3.4.12 线程A唤醒线程

线程B和线程C已经进入等待队列,调用unparkSuccessor()方法唤醒线程:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void unparkSuccessor(Node node) {
// 记录头节点的等待状态
int ws = node.waitStatus;
// 如果头节点的等待状态小于0,则将头节点的等待状态设为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 记录头节点的下一节点
Node s = node.next;
// 判断下一节点是否为空或者下一节点的等待状态是否大于0
if (s == null || s.waitStatus > 0) {
s = null;
// 遍历下一节点,找到不为空并且等待状态小于等于0的节点,将其设为下一节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果下一节点不为空,则唤醒下一节点中的线程
if (s != null)
LockSupport.unpark(s.thread);
}

头节点的下一节点为B节点,线程B被唤醒。

3.4.13 线程B抢占资源

线程B在parkAndCheckInterrupt()方法中被唤醒后,返回中断状态为false,重新进入自旋。

线程B的上一节点为头节点,进入tryAcquire()方法抢占资源。

抢占资源成功后,将当前线程设为占用线程,将当前节点设为头节点,同时解除同线程B的绑定,执行业务。

3.4.14 线程B执行完毕

线程B执行完毕,调用unlock()方法释放资源并唤醒线程。

头节点的下一节点为C节点,线程C被唤醒。

3.4.15 线程C抢占资源

线程C在parkAndCheckInterrupt()方法中被唤醒后,返回中断状态为false,重新进入自旋。

线程C的上一节点为头节点,进入tryAcquire()方法抢占资源。

抢占资源成功后,将当前线程设为占用线程,将当前节点设为头节点,同时解除同线程C的绑定,执行业务。

3.4.16 线程C执行完毕

线程B执行完毕,调用unlock()方法释放资源并唤醒线程。

头节点的下一节点为空,不会有任何线程被唤醒。

4 公平锁与非公平锁

4.1 非公平锁

非公平锁在资源可用时不会判断当前队列是否有线程在等待,刚加入的线程可以与被唤醒的线程一起竞争资源。

源码:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
...
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
...
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
...
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

4.2 公平锁

公平锁在资源可用时会判断当前队列是否有线程在等待,刚加入的线程不可以与被唤醒的线程一起竞争资源。

源码:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final void lock() {
acquire(1);
}
...
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
...
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

tryAcquire()方法中使用hasQueuedPredecessors()方法判断等待队列中是否存在有效节点:

  • 返回false表示没有,取反后为true表示当前节点不需要排队,需要执行占用资源的操作。
  • 返回true表示有,取反后为false表示当前节点需要排队,需要执行加入等待队列的操作。

源码:

java
1
2
3
4
5
6
7
8
9
10
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

说明:

  • 判断头节点和尾节点是否相同,如果相同,说明队列未初始化或者当前队列只有一个空节点,返回false,取反后为true,尝试占用资源。
  • 判断头节点和尾节点是否相同,如果不相同,说明存在两个不同节点。
    • 继续判断下一节点是否为空节点,如果成立,说明下一节点为空,可能上个线程刚刚将头节点初始化,尚未给尾节点赋值,返回true,取反后为false,需要排队。
    • 继续判断下一节点是否为空节点,如果不成立,说明下一节点不为空。
      • 继续判断下一节点封装的线程是否不等于当前线程,如果成立,说明下一线程不为当前线程,返回true,取反后为false,需要排队。
      • 继续判断下一节点封装的线程是否不等于当前线程,如果不成立,说明下一线程为当前线程,返回false,取反后为true,尝试占用资源。

5 自定义同步器

5.1 实现方法

不同的自定义同步器争用共享资源的方式也不同,自定义同步器在实现时只需要实现共享资源state的获取与释放即可,至于具体线程等待队列的维护(如获取资源失败入队和唤醒出队等),AQS已经在底层实现好了。

自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease和tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

5.2 举例说明

以ReentrantLock为例,state初始化为0,表示未锁定状态。

线程A调用lock()方法获取资源,同时调用tryAcquire()占用资源,并将state的值加1。

其他线程再调用tryAcquire()方法占用资源就会失败,直到线程A调用unlock()方法释放资源,并将state的值减0,其它线程才有机会获取该锁。

在释放锁之前,A线程可以重复获取资源,state的值会累加,这就是可重入锁。获取多少次就要释放多少次,这样才能保证state最后的值是0。

评论