借助 ReentrantLock 来解释。

1. AQS概念

AQS是JUC提供的一个用于构建锁和同步容器的基础类。JUC包内的许多类都是基于AQS构建,例如ReentrantLock、

Semaphore、CountDownLatch、ReentrantReadWriteLock、FutureTask等。AQS解决了在实现同步容器时设计的大

量细节问题。

AQS是CLH队列的一个变种,主要原理和CLH队列差不多,这也是前面对CLH队列进行长篇大论介绍的原因。AQS队列

内部维护的是一个FIFO的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的前驱节点和直接

的后驱节点。所以双向链表可以从任意一个节点开始很方便地访问前驱节点和后驱节点。每个节点其实是由线程封装

的,当线程争抢锁失败后会封装成Node加入到AQS队列中去;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞

的节点(线程)。

image-20250225141910899

1.1 核心成员

AQS出于“分离变与不变”的原则,基于模板模式实现。AQS为锁获取、锁释放的排队和出队过程提供了一系列的模板

方法。由于JUC的显式锁种类丰富,因此AQS将不同锁的具体操作抽取为钩子方法,供各种锁的子类(或者其内部

类)去实现。

1.1.1 状态标志位

AQS中维持了一个单一的volatile修饰的状态信息state,AQS使用int类型的state标示锁的状态,可以理解为锁的同步

状态。state因为使用volatile保证了操作的可见性,所以任何线程通过getState()获得状态都是可以得到最新值。

1.1.2 队列节点类

AQS是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。节点类型通过内部类Node定义,其核心的成员

如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    volatile int waitStatus;


    volatile Node prev;


    volatile Node next;


    volatile Thread thread;


    Node nextWaiter;
    
    ......
}
  1. waitStatus

    每个节点与等待线程关联,每个节点维护一个状态waitStatus,waitStatus的各种值以常量的形式进行定义。

    waitStatus的各常量值具体如下:

    • static final int CANCELLED = 1

      waitStatus值为1时表示该线程节点已释放(超时、中断),已取消的节点不会再阻塞。表示线程因为中断或

      者等待超时,需要从等待队列中取消等待。由于该节点线程等待超时或者被中断,需要从同步队列中取消等

      待,因此该线程被置1。节点进入了取消状态,该类型节点不会参与竞争,且会一直保持取消状态。

    • static final int SIGNAL = -1

      waitStatus为SIGNAL(‒1)时表示其后驱节点处于等待状态,当前节点对应的线程如果释放了同步状态或者被取消,将会通知后驱节点,使后驱节点的线程得以运行。

    • static final int CONDITION = -2

      waitStatus为‒2时,表示该线程在条件队列中阻塞(Condition有使用),表示节点在等待队列中(这里指的是等待在某个锁的CONDITION上,关于CONDITION的原理后面会讲到),当持有锁的线程调CONDITION的signal()方法之后,节点会从该CONDITION的等待队列转移到该锁的同步队列上,去竞争锁(注意:这里的同步队列就是我们说的AQS维护的FIFO队列,等待队列则是每个CONDITION关联的队列)。

    • static final int PROPAGATE = -3

      waitStatus为‒3时,表示下一个线程获取共享锁后,自己的共享状态会被无条件地传播下去,因为共享锁可能出现同时有N个锁可以用,这时直接让后面的N个节点都来工作。这种状态在CountDownLatch中使用到了。为什么当一个节点的线程获取共享锁后,要唤醒后继共享节点?共享锁是可以多个线程共有的,当一个节点的线程获取共享锁后,必然要通知后继共享节点的线程也可以获取锁了,这样就不会让其他等待的线程等很久,这种向后通知(传播)的目的也是尽快通知其他等待的线程尽快获取锁。

    waitStatus初始时为0,表示当前节点处于初始状态。

    Node节点的waitStatus状态为以上5种状态的一种。

  2. volatile Thread thread

    Node的thread成员用来存放进入AQS队列中的线程引用;Node的nextWaiter成员用来指向自己的后继等待节点,此成员只有线程处于条件等待队列中的时候使用。

  3. 抢占类型常量标识

    1
    2
    3
    4
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

SHARED表示线程是因为获取共享资源时阻塞而被添加到队列中的;EXCLUSIVE表示线程因为获取独占资源时阻 塞而被添加到队列中的。

1.1.3 模板流程

AQS定义了两种资源共享方式:

  • Exclusive(独享锁):只有一个线程能占有锁资源,如ReentrantLock。独享锁又可分为公平锁和非公平锁。

  • Share(共享锁):多个线程可同时占有锁资源,如SemaphoreCountDownLatchCyclicBarrierReadWriteLock的Read锁。

AQS为不同的资源共享方式提供了不同的模板流程,包括共享锁、独享锁模板流程。这些模板流程完成了具体线程进

出等待队列的基础(如获取资源失败入队/唤醒出队等)、通用逻辑。基于基础、通用逻辑,AQS提供一种实现阻塞锁

和依赖FIFO等待队列的同步器的框架,AQS模板为ReentrantLockCountDownLatchSemaphore提供了优秀的解

决方案。自定义的同步器只需要实现共享资源state的获取与释放方式即可,这些逻辑都编写在钩子方法中。无论是共

享锁还是独享锁,AQS在执行模板流程时会回调自定义的钩子方法。

自定义同步器时,AQS中需要重写的钩子方法大致如下:

  • **tryAcquire(int):**独占锁钩子,尝试获取资源。若成功则返回true,若失败则返回false。
  • **tryRelease(int):**独占锁钩子,尝试释放资源。若成功则返回true,若失败则返回false
  • **tryAcquireShared(int):**共享锁钩子,尝试获取资源,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • **tryReleaseShared(int):**共享锁钩子,尝试释放资源。若成功则返回true,若失败则返回false
  • **isHeldExclusively():**独占锁钩子,判断该线程是否正在独占资源。只有用到condition条件队列时才需要去实现它。

以上钩子方法的默认实现会抛出UnsupportedOperationException异常。除了这些钩子方法外,AQS类中的其他方法

都是final类型的方法,所以无法被其他类继承,只有这几个方法可以被其他类继承。

2. ReentrantLock 实现

1
public class ReentrantLock implements Lock, java.io.Serializable {}

经过观察,ReentrantLock把所有Lock接口的操作都委派到一个Sync类上,该类继承

AbstractQueuedSynchronizer

1
2
3
4
5
6
7
8
private final Sync sync;

/**
 * Base of synchronization control for this lock. Subclassed
 * into fair and nonfair versions below. Uses AQS state to
 * represent the number of holds on the lock.
 */
abstract static class Sync extends AbstractQueuedSynchronizer {}

ReentrantLock为了支持公平锁和非公平锁两种模式,为Sync又定义了两个子类。

image-20250225144709035

ReentrantLock的默认构造器(无参数构造器)被初始化为一个NonfairSync对象,即使用非公平同步器,所以,默

认情况下ReentrantLock为非公平锁。带参数的构造器可以根据fair参数的值具体指定ReentrantLock的内部同步

器使用FairSync还是NonfairSync

2.1 加锁(非公平锁)

首先,创建 ReentrantLock 其实是创建了一个同步器,这个同步器是 ReentrantLock 的内部类,继承自 AQS

1
2
3
public ReentrantLock() {
    sync = new NonfairSync();
}

调用 lock,会使用 cas 将 state(默认为 0)修改为 1,如果修改成功则加锁成功,并把当前线程设为锁的持有者,如果修改失败,则进入 acquire 方法

1
2
3
4
5
6
7
8
9
10
11
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire 方法首先会调用 aqs 的 tryAcquire,被 ReentrantLock 重写,最终调用了 ReentrantLocknonfairTryAcquire 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

这个方法首先判断 state 是否等于 0,等于 0 则使用 cas 修改为 acquires,逻辑和之前一样,成功就说明加锁成功,

else if 中判断是不是本来就是当前线程持有锁,是的话就代表锁重入了。如果都不是,就返回 false。

返回false说明获取锁失败,此时进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

对于 addWaiter 方法,是在当前线程后面创建并添加一个等待节点到队列中,节点的类型由参数 mode(这里是

Node.EXCLUSIVE) 指定。如果尾节点不为空,则尝试使用 CAS 操作将当前节点设置为尾节点,如果成功则返回当前

节点,否则调用 enq()方法将节点添加到队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

enq()方法通过CAS自旋将节点的添加到队列尾部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

如果此时队尾为空,则进行队列的初始化,new一个空的Node对象作为头节点,此时tail和head都指向一个空Node,

然后进入第二次循环,此时tail不为空,让node的前驱指针指向tail,并通过CAS让tail指向node,再将旧的尾节点的

后驱指针指向node。

注意:

这里就说明AQS的抽象的队列结构的头节点是个虚结点。

然后是 acquireQueued 方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  1. 他设置两个标志位,第一个是failed,默认为 true,如果加锁成功,设为 false,还有个是 interrupted,表示循环过程中是否被打断过,默认为 false,然后进入了一个死循环,获得当前节点的前驱节点p,如果 p 是头节点(这里的头节点是个虚结点),说明当前节点是第一个节点,则调用tryAcquire获取一次锁,如果成功,则把当前节点设为虚结点,也就是头节点,并返回 interrupted 标志位。

  2. 如果获取失败,则首先进入shouldParkAfterFailedAcquire方法,他首先判断前驱节点 p 的状态位是否为 SIGNAL(-1),是就返回 true,如果 p 的状态位大于 0,则表示 p 已经被取消,并循环向前查找取消节点,把取消节点从队列中剔除,否则就把前驱节点的状态为设为 SIGNAL(-1),返回 false,所以对于返回值来说,如果前驱节点的状态位是-1,返回 true,不是就返回 false,

  3. 如果返回 true,则调用parkAndCheckInterrupt,阻塞当前线程,当线程被唤醒时,返回其的中断状态。

  4. 如果线程被唤醒,并且中断标志位为true,那么就会将interrupted设为true,表示线程在此过程中被中断过,最终会返回这个标记,如果为true,则会调用selfInterrupt方法进行自我中断。

    1
    2
    3
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

对于 finally 块中的内容,如果获取锁失败,进入cancelAcquire方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36


private void cancelAcquire(Node node) {
  // 将无效节点过滤
	if (node == null)
		return;
  // 设置该节点不关联任何线程,也就是虚节点
	node.thread = null;
	Node pred = node.prev;
  // 通过前驱节点,跳过取消状态的node
	while (pred.waitStatus > 0)
		node.prev = pred = pred.prev;
  // 获取过滤后的前驱节点的后继节点
	Node predNext = pred.next;
  // 把当前node的状态设置为CANCELLED
	node.waitStatus = Node.CANCELLED;
  // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
  // 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
	if (node == tail && compareAndSetTail(node, pred)) {
		compareAndSetNext(pred, predNext, null);
	} else {
		int ws;
    // 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
    // 如果1和2中有一个为true,再判断当前节点的线程是否为null
    // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
		if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
			Node next = node.next;
			if (next != null && next.waitStatus <= 0)
				compareAndSetNext(pred, predNext, next);
		} else {
      // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
			unparkSuccessor(node);
		}
		node.next = node; // help GC
	}
}

当前的流程:

  • 获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED。

  • 根据当前节点的位置,考虑以下三种情况:

    1. 当前节点是尾节点。

      img

    2. 当前节点是Head的后继节点。

      img

    3. 当前节点不是Head的后继节点,也不是尾节点。

      img

2.1.1 加公平锁

1
2
3
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

与非公平锁的区别在于tryAcquire方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

在能获取锁的时候,即c == 0,会先调用hasQueuedPredecessors()判断队列里面是否有其他正在等待获取锁的线程,如果有就加入队列进行等待,否则就直接尝试获取锁。

2.1.2 lockInterruptibly()

1
2
3
4
5
6
7
8
9
10
11
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

可以看到这个方法在调用时会先判断一次当前线程的中断标志位,如果被中断则抛出中断异常,否则尝试获取锁。

如果没有获取到锁就进入doAcquireInterruptibly方法,这个方法和acquireQueued很像

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

区别就在于他没有定义变量来记录线程是否被中断,而是当shouldParkAfterFailedAcquireparkAndCheckInterrupt同时返回true的时候抛出中断异常,而在acquireQueued方法中此时是将interrupted变量设为true。

2.1.3 tryLock()

1
2
3
public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

这样看来就像非公平锁重写的tryAcquire钩子方法,只进行一次锁获取,失败直接返回false。

2.1.4 tryLock(long timeout, TimeUnit unit)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这里可以看到这个方法能响应中断,具体流程是先尝试获取一次锁,失败则进入doAcquireNanos方法,这个方法和acquireQueued类似,在规定时间内不断获取锁,同时也会调用LockSupport.parkNanos方法暂停一段时间。

LockSupport类:

它的park开头的一系列方法,会让线程进入waiting状态,并且不会释放锁,但是waiting状态中可以响应中断,中断线程能直接返回且不会抛出异常,并可以通过isInterrupted()方法获取中断标记

2.2 解锁

调用ReentrantLockunlock()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

首先是tryRelease方法

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

返回true代表解锁成功,反正失败。

回到release方法中,进入if判断是否能调用unparkSuccessor方法。

这里的判断条件为什么是h != null && h.waitStatus != 0?

  • h == null Head表示还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。
  • h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
  • h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。

这里的入参是head节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void unparkSuccessor(Node node) {
	// 获取头结点waitStatus
	int ws = node.waitStatus;
	if (ws < 0)
		compareAndSetWaitStatus(node, ws, 0);
	// 获取当前节点的下一个节点
	Node s = node.next;
	// 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
	if (s == null || s.waitStatus > 0) {
		s = null;
		// 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
		for (Node t = tail; t != null && t != node; t = t.prev)
			if (t.waitStatus <= 0)
				s = t;
	}
	// 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark
	if (s != null)
		LockSupport.unpark(s.thread);
}

为什么要从后往前找第一个非Cancelled的节点呢?原因如下。

addWaiter方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	Node pred = tail;
	if (pred != null) {
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}

可以看到这里添加节点的时候,将尾指针指向新节点和将旧尾节点的后驱指针指向新节点这两步操作不是原子的,所以可能发生修改了尾指针但是还未执行pred.next = node;,就会导致遍历不到新加入的节点。

还有一点原因,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的Node。

当找到需要唤醒的线程后,调用LockSupport.unpark(s.thread);唤醒线程。

2.3 Condition 实现

ConditionObject类是实现条件队列的关键,每个ConditionObject对象都维护一个单独的条件等待对列。每个ConditionObject对应一个条件队列,它记录该队列的队首节点和尾节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final ConditionObject newCondition() {
    return new ConditionObject();
}

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

    /**
     * Creates a new {@code ConditionObject} instance.
     */
    public ConditionObject() { }
    .....
}

image-20250225163706202

在一个显式锁上,我们可以创建多个等待任务队列,这点和内置锁不同,Java内置锁上只有唯一的一个等待队列。比如,我们可以使用newCondition()创建两个等待队列

image-20250225163736707

2.3.1 await()

当线程调用await()方法时,说明当前线程的节点为当前AQS队列的队首节点,正好处于占有锁的状态,await()方法需要把该线程从AQS队列挪到Condition等待队列里

image-20250225163830720

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); //第一步
    int savedState = fullyRelease(node);// 第二步
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) { // 第三步
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //第四步
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled  ,第五步
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  1. 执行await()时,会新创建一个节点并放入到Condition队列尾部。

  2. 然后释放锁,并唤醒AQS同步队列中的队首节点的后一个节点。

  3. 然后执行while循环,将该节点的线程阻塞,直到该节点离开等待队列,重新回到同步队列成为同步节点后,线程才退出while循环。

  4. 退出循环后,开始调用acquireQueued()不断尝试拿锁。

  5. 拿到锁后,会清空Condition队列中被取消的节点。

创建一个新节点并放入Condition队列尾部的工作由addConditionWaiter()方法完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node addConditionWaiter() {
    Node t = lastWaiter;
    //如果尾节点取消,重新定位尾节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //创建一个新Node,作为等待节点
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //将新Node加入等待队列
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

2.3.2 signal()

线程在某个ConditionObject对象上调用signal()方法后,等待队列中的firstWaiter会被加入到同步队列中,等待节点被唤醒

image-20250225164431472

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}


private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

signal()方法的整体流程如下:

  1. 通过enq()方法自旋(该方法已经介绍过),将条件队列中的队首节点放入到AQS同步队列尾部,并获取它在AQS队列中的前驱节点。
  2. 如果前驱节点的状态是取消状态,或者设置前驱节点为Signal状态失败,就唤醒当前节点的线程;否则节点在同步队列的尾部,参与排队。
  3. 同步队列中的线程被唤醒后,表示重新获取了显式锁,然后继续执行condition.await()语句后面的临界区代码。