AQS原理、Condition、ReentrantLock和ReentrantReadWriteLock

锁的前世今生#

先来一个镇楼图,来自美团技术:

锁的升级过程可以参见《多线程之volatile和synchronized》

image-20200615024301986

乐观锁与悲观锁#

  • 悲观锁:对于同一个数据的并发操作,悲观锁认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。Java中,synchronized关键字和Lock的实现类都是悲观锁。(不能并发修改,只能排队顺序)-------适合写操作多的场景,先加锁可以保证写操作时数据正确
  • 乐观锁:而乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据。如果这个数据没有被更新,当前线程将自己修改的数据成功写入。如果数据已经被其他线程更新,则根据不同的实现方式执行不同的操作(例如报错或者自动重试)。 (可能并发修改,如果真并发修改了,再议)--------------适合读操作多(写少)的场景,不加锁的特点能够使其读操作的性能大幅提升。(没有加锁开销)
  • 乐观锁在Java中是通过使用无锁编程来实现,最常采用的是CAS算法,Java原子类中的递增操作就通过CAS自旋实现的。

AQS#

AQS是啥#

AbstractQueuedSynchronizer抽象队列同步器简称AQS,它是实现同步器的基础组件,juc下面Lock的实现以及一些并发工具类就是通过AQS来实现的,这里我们通过AQS的类图先看一下大概,下面我们总结一下AQS的实现原理。

先说一句话描述:

AQS是一个通过双向链表实现的FIFO队列,并通过CAS的方式变更一个volatile的state字段和队列头尾来维护状态的同步器,它通过模板方法的方式暴露了一些留给子类实现的方法,可以实现多种并发工具的构建。AQS是juc很多类库的基础设施。

下面是详细描述:

(1)AQS是一个通过内置的FIFO双向队列来完成线程的排队工作(内部通过结点head和tail记录队首和队尾元素,元素的结点类型为Node类型,后面我们会看到Node的具体构造)。

1
2
3
4
5
/*等待队列的队首结点(懒加载,这里体现为竞争失败的情况下,加入同步队列的线程执行到enq方法的时候会创
建一个Head结点)。该结点只能被setHead方法修改。并且结点的waitStatus不能为CANCELLED*/
private transient volatile Node head;
/**等待队列的尾节点,也是懒加载的。(enq方法)。只在加入新的阻塞结点的情况下修改*/
private transient volatile Node tail;

(2)其中Node中的thread用来存放进入AQS队列中的线程引用,Node结点内部的SHARED表示标记线程是因为获取共享资源失败被阻塞添加到队列中的;Node中的EXCLUSIVE表示线程因为获取独占资源失败被阻塞添加到队列中的。waitStatus表示当前线程的等待状态:

​ ①CANCELLED=1:表示线程因为中断或者等待超时,需要从等待队列中取消等待;

​ ②SIGNAL=-1:当前线程thread1占有锁,队列中的head(仅仅代表头结点,里面没有存放线程引用)的后继结点node1处于等待状态,如果已占有锁的线程thread1释放锁或被CANCEL之后就会通知这个结点node1去获取锁执行。

​ ③CONDITION=-2:表示结点在等待队列中(这里指的是等待在某个lock的condition上,关于Condition的原理下面会写到),当持有锁的线程调用了Condition的signal()方法之后,结点会从该condition的等待队列转移到该lock的同步队列上,去竞争lock。(注意:这里的同步队列就是我们说的AQS维护的FIFO队列,等待队列则是每个condition关联的队列)

​ ④PROPAGTE=-3:表示下一次共享状态获取将会传递给后继结点获取这个共享同步状态。

(3)AQS中维持了一个单一的volatile修饰的状态信息state(AQS通过Unsafe的相关方法,以原子性CAS的方式由线程去获取这个state)。AQS提供了getState()、setState()、compareAndSetState()函数修改值(实际上调用的是unsafe的compareAndSwapInt方法)。下面是AQS中的部分成员变量以及更新state的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//这就是我们刚刚说到的head结点,懒加载的(竞争失败需要构建同步队列的时候,才会创建这个head),如果头节点存在,它的waitStatus不能为CANCELLED
private transient volatile Node head;
//当前同步队列尾节点的引用,也是懒加载的,只有调用enq方法的时候会添加一个新的wait node
private transient volatile Node tail;
//AQS核心:同步状态
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

(4)AQS的设计师基于模板方法模式的。使用时候需要继承同步器并重写指定的方法,并且通常将子类推荐为定义同步组件的静态内部类,子类重写这些方法之后,AQS工作时使用的是提供的模板方法,在这些模板方法中调用子类重写的方法。其中子类可以重写的方法:

1
2
3
4
5
6
7
8
9
10
//独占式的获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
//独占式的释放同步状态,等待获取同步状态的线程可以有机会获取同步状态
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
//共享式的获取同步状态
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
//尝试将状态设置为以共享模式释放同步状态。 该方法总是由执行释放的线程调用。
protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
//当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占
protected int isHeldExclusively(int arg) { throw new UnsupportedOperationException();}

(5)AQS的内部类ConditionObject是通过结合锁实现线程同步,ConditionObject可以直接访问AQS的变量(state、queue),ConditionObject是个条件变量 ,每个ConditionObject对应一个队列用来存放线程调用condition条件变量的await方法之后被阻塞的线程。

如果从来没有听说过AQS的同学要知道这个,可以先看下ReentrantLockReentrantLock实现了一个Lock接口,并持有一个私有抽象静态Sync对象,这个抽象Sync又有两个实现,分别是公平锁FairLock和非公平锁NonFiarLock。缺省是非公平锁。

  • 公平锁:多个线程按照申请锁的顺序去获得锁,后申请锁的线程需要排队,等它之前的线程获得锁并释放后,它才能获得锁;
  • 非公平锁:线程获得锁的顺序于申请锁的顺序无关,申请锁的线程可以直接尝试获得锁,谁抢到就是谁的;

image-20200614222312482

具体的结构代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ReentrantLock implements Lock, java.io.Serializable {

/** Synchronizer providing all implementation mechanics */
private final Sync sync; // 实际ReentrantLock到底是公平还是非公平是new的时候指定的实现类。--------模板方法的设计模式

/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();

而抽象类Sync继承的AbstractQueuedSynchronizer类(简称AQS,抽象队列同步器)是是一个可以用来实现线程同步的基础工具。也就是说我们常用的ReentrantLock底层是由AQS实现的。

也就是说:AQS使用模板方法的设计模式提供了同步功能的基础设施,可以用它来完成锁同步等功能。在Java的并发包中大量使用。

同步的抽象与各种实现:#

器AQS是公共逻辑,各种Lock的实现算是自定义的业务逻辑:

AQS和Sync、FairSync、NonfairSync都是公共的抽象逻辑,而Lock、ReadLock、ReentrantLock都算是业务逻辑。这些业务逻辑是有各个场景的特点,基于我们的公共抽象逻辑基础设施来实现的。

image-20200608153944872

比如CountdownLatch里面也是有AQS和实现的。

此外ReentrantLock底层的Lock接口还保证了ReentrantLock的行为具有以下方法实现:

image-20200607134100117

从ReentrantLock说起#

ReentrantLock作为JUC包提供的可重入锁,和Synchronized关键字的区别如下:

条件队列是指wait/notify或者await/signal

image-20200615023855502

简单对比二者的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// **************************Synchronized的使用方式**************************
// 1.用于代码块
synchronized (this) {}
// 2.用于对象
synchronized (object) {}
// 3.用于方法
public synchronized void test () {}
// 4.可重入
for (int i = 0; i < 100; i++) {
synchronized (this) {}
}
// **************************ReentrantLock的使用方式**************************
public void test () throw Exception {
// 1.初始化选择公平锁、非公平锁
ReentrantLock lock = new ReentrantLock(true);
// 2.可用于代码块
lock.lock();
try {
// 3.支持多种加锁方式,比较灵活; 具有可重入特性
if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
} finally {
// 4.在finally中手动释放锁
lock.unlock()
}
}

https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

ReentrantLock的源码#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync; // 一个抽象的Sync,有Fair和NonFair两个实现
// 继承AQS,有一些默认实现
abstract static class Sync extends AbstractQueuedSynchronizer {
//...... 一些AQS的抽象方法实现
}

// 一些Sync抽象方法的实现(主要是非公平加锁和公平加锁,这里以非公平为例)
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1)) // 1.上来就先CAS修改爷爷类AQS的state,去尝试加锁--------------非公平的体现,此处CAS保证原子性
setExclusiveOwnerThread(Thread.currentThread()); // 修改成功的话去设置AQS当前锁的独占线程是我
else
acquire(1); // 修改失败的就去AQS执行aquire尝试获得锁,AQS又会tryAcquire委派下来。具体看下面AQS
}

// 实现AQS源码获取一次锁,公平和非公平是两个实现,此处是非公平
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires); // nonfairTryAcquire的实现:判断状态为0,独占是当前线程就重入+1返回true。不是重入的话就CAS抢占锁并设置AQS独占线程是我返回true,state=1或者没抢到返回false。
}
}

}
  • 概要极简总结:
    • ReentrantLock内部使用了一个AQS的模板实现Sync,这个Sync又针对公平锁和非公平锁有两种实现。
    • 如非公平锁一上来就先CAS去修改AQS的锁状态state(unsafe用内存偏移量去修改,保证原子性),再去acquire(1)
    • 公平的是上来就去acquire(1)
    • acquire(1)可以看下面AQS的部分

AQS的内部#

开门见山:

  • AQS内部有一个双向链表实现一个FIFO的同步队列来维护当前获取锁失败的线程。
  • 使用一个volatile的int类型的同步状态state和一系列方法实现同步。(state的各个值什么含义是给子类去实现的)
  • AQS内部还有一个当前独占线程的标识,来标识谁在占用同步状态

AbstractQueuedSynchronizer使用了模板方法的设计模式,把大部分的流程都实现了,但关键步骤使用抽象方法、抛异常的方式,交给子类去强制实现个性化定制。如:

1
2
3
4
5
6
7
8
9
10
// 独占地获取锁和释放锁(非共享读写锁)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

// 还有更多,见下图

image-20200607143948752

image-20200607144001939

AQS提供的模板方法主要分为三类:

  • 独占式地获取和释放锁;
  • 共享式地获取和释放锁;
  • 查询AQS的同步队列中正在等待的线程情况;

双向链表实现的同步队列#

  • AQS里面有一个volatile int类型的锁状态state,多线程CAS竞争修改。
  • AQS同步器内部有一个同步队列,每次线程获取锁失败就会增加一个Node到队尾,释放锁成功就会唤起队列最前面的Node中的线程,这个线程再去尝试。
  • 队列中的头节点head就是当前已经获取了锁,正在执行的线程对应的节点;而之后的这些节点,则对应着获取锁失败,正在排队的线程。
  • AQS持有链表的headtail节点,每个Node节点里面除了prenext还有当前的线程。
  • 当一个线程获取锁失败,它会被封装成一个Node,加入同步队列的尾部排队,同时线程会进入阻塞状态。
  • 而当头节点对应的线程释放锁时,它会唤醒它的下一个节点。
  • 被唤醒的节点对应的线程开始尝试获取锁,若获取成功,它就会将自己置为head,然后将原来的head移出队列。

image-20200607152436844

AQS加锁的源码#

acquire入口—核心方法#

ReentrantLocklock()方法调用了AQS的下面方法:

1
2
3
4
5
6
7
8
// 原文注释说:这是一个独占模式、忽略被打断。通过至少一次成功的tryAcquire就能拿到锁。
// 否则线程入队,反复调用tryAcquire被阻塞、解阻塞,直到返回true。
// 这个方法可以用来实现Lock.lock()
public final void acquire(int arg) { // FiarSync这里直接传了1进来调用
if (!tryAcquire(arg) && // tryAcquire成功则啥也没干地结束,失败继续,公平锁实现的时候如果有等的更久的会不去抢--------只有自己的时候不会排队
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 先包装Node,addWaiter续尾,再acquireQueued去阻塞
selfInterrupt(); // 如果里面在队列等待锁的过程中,被别人interrupt了是无法响应的。解锁后这里要继续处理响应interruput方法。
}
  • 首先调用tryAcquire尝试获取一次锁,若返回true,表示获取成功,则acquire方法将直接返回(没有构建同步队列);若返回false,则会继续向后执行acquireQueued方法;-------公平和非公平此处tryAcquire的实现有差异,非公平先去CAS竞争state了一次再去判断重入,公平的直接判断没有等待的(hasQueuedPredecessors)的线程才acquire。
  • tryAcquire返回false后,将执行acquireQueued,但是这个方法传入的参数调用了addWaiter方法;
  • addWaiter方法的作用是将当前线封装成同步队列的节点,然后加入到同步队列的尾部进行排队,并返回此节点;(CAS去设置head、tail,失败则for死循环再来一轮。第一个head是一个空节点)
  • addWaiter方法执行完成后,将它的返回值作为参数,调用acquireQueued方法。acquireQueued方法的作用是让当前线程在同步队列中阻塞,然后在被其他线程唤醒时去获取锁;-----这里会阻塞park
  • 若线程被唤醒并成功获取锁后,将从acquireQueued方法中退出,同时返回一个boolean值表示当前线程是否被中断,若被中断,则会执行下面的selfInterrupt方法,响应中断;

tryAcquire子类实现的模板#

tryAcquire是一个模板方法,留给子类的公平锁、非公平锁按场景去实现。不同的场景根据这个arg去修改state字段。

1
2
3
4
// 模板方法,留给子类去实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

公平锁实现的时候如果有等的更久的会不去抢,非公平锁上来就CAS抢state从0为1。成功就返回,没成功判断重入,重入返回true,否则false。

如果tryAcquire没竞争到锁,下面开始排队:

addWaiter-线程如何排队— 自旋添加到尾部,直到成功为止#

获取锁失败后到达addWaiter添加一个等待者:

将当前线程new 一个Node放到队列尾部,如果队列为空创建一个傀儡节点再添加尾部(傀儡节点就代表现在正在运行的那个线程)。如果加入失败就自旋(enq)直到添加成功,最后返回此节点。

(CAS去setTail一遍,失败的话后面enq循环一遍一遍CAS保证成功入队续上)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 拿出尾,作为前序节点,第一次是null
Node pred = tail;
if (pred != null) { // 如果有尾,说明不是第一次,否则是第一次队列未初始化执行下面enq循环再试
node.prev = pred; // 新节点的前指针,指向上一个尾
// CAS将新节点node设置为新尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node; // 设置成功,前尾的后指针指向当前新尾
return node;
} // 尾为空,执行下面enq初始化队列
}
enq(node); // 初始化并CAS续尾(CAS设置head、tail)
return node; // 返回
}

没初始化则初始化,然后入队尾,注意初始化的时候new了一个空的傀儡节点作为header,然后再来一轮for循环CAS续上的。(如果没抢到就再来for循环,直到CAS抢到再返回)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Inserts node into queue, initializing if necessary 把node进行一次入队,需要的时候进行初始化
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 拿出尾
if (t == null) { // Must initialize 没有尾,说明是第一次,需要初始化一个空的head再往后排
if (compareAndSetHead(new Node())) // head使用CAS初始化了一个虚拟的傀儡节点!这里是重点!!!!此时多线程的锁会竞争
tail = head; // 初始化首尾都是同一个,然后重新来for一次添加尾
} else { //第二轮以上for,初始化过,往最后排
node.prev = t; // 当前节点的前一个设置为前尾
if (compareAndSetTail(t, node)) { // CAS去看前尾变化没,没变化就变更新尾,此时多线程的锁会竞争
t.next = node; // 前尾的next指向新尾,尾巴成功修改
return t; // CAS续成功才会返回,否则永远来续-----------------------唯一出口
}// CAS失败,说明前尾变更,被别的线程抢先续尾了,重新来入队续尾
}
}
}

所以ReentrantLock排队的过程是:#

  • AQS使用一个双向链表来代表线程的先后顺序,head永远是空。AQS内部有status,0-free,>0-lock。Node内部有线程信息和waitStatus(CANCLE/SIGNAL/CONDITION/和共享锁的PROPAGATE)。
  • 初始状态(也就是锁未被任何线程占用的时候)线程A申请锁此时,成功获取到锁,无排队线程(tryAcquire直接拿到true并返回)
  • 线程B申请该锁,且上一个线程未释放:enq方法的for循环先创建一个空的Head(只有next指向)初始化队列再来一个for循环添加当前Node(包含自身线程信息,和pre指向,模式独占或者共享)。
  • 再来一个线程C申请该锁,且占有该锁的线程未释放:CAS来续尾成功就返回,失败的话去enq里面for的CAS续尾到成功为止。(成功后修改pre和自己的互相指向)

上面加入到队尾后,返回节点,传入下面的方法去获取锁。

acquireQueued—尾入队获取锁#

acquireQueued方法就是把获取锁失败的Node放入队列中,让这个线程不断进行“获锁”,直到它**“成功获锁”**或者“不再需要锁(如被中断)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 对队列里的节点node进行独占的、不可打断的获取锁。
* acquire()中和condition对象的各种await方法中被使用。
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) { // acquire方法传入了刚刚加入队尾的一个节点,await方法传入的是一个节点
boolean failed = true;
try {
boolean interrupted = false; // 是否被打断的标识
for (;;) {
final Node p = node.predecessor(); // 拿到当前Node的前序节点
// 如果前序是head才会去CAS抢占锁。(现在这个线程解锁后也成为新的head了)
if (p == head && tryAcquire(arg)) { //(tryAcquire是模板方法,子类FairSync等实现的,去尝试CAS修改state获取锁后,下面再来把自己变成head)
// 到这里就是已经获取到锁了,旧头换新头(上面是CAS的,这里没有线程安全问题)
setHead(node); // 这个是把当前节点设置为傀儡节点头,删除里面的thread等信息(已经拿到锁了,放thread没有意义,无助于GC)
p.next = null; // help GC 释放之前的傀儡头引用,帮助回收
failed = false;
return interrupted; //----------------------------唯一出口。 返回打断标识
}
// 自己不是老二(前序不是head)或者老二获取锁tryAcquire失败了,踏踏实实的先看是否应该park
// 如果可以的话park等待,然后阻塞检查是否打断。打断了进入if内,没打断就结束这一轮获取。(不可能让一直死循环for来判断,CPU该爆炸了)
// 不应该park的话就继续下一轮for再来抢占

// shouldParkAfterFailedAcquire方法判断当前线程是否能够进入等待状态,
// 若当前线程的节点不是头节点的下一个节点,则需要进入等待状态,
// 在此方法内部,当前线程会找到它的前驱节点中,第一个还在正常等待或执行的节点,
// 让其作为自己的直接前驱,然后在需要时将自己唤醒(因为其中有些线程可能被中断),
// 若找到,则返回true,表示自己可以进入等待状态了;
// 则继续调用parkAndCheckInterrupt方法,当前线程在这个方法中等待,
// 直到被其他线程唤醒,或者被中断后返回,返回时将返回一个boolean值,
// 表示这个线程是否被中断,若为true,则将执行下面一行代码,将中断标志置为true

if (shouldParkAfterFailedAcquire(p, node) && // 前面节点是SIGNAL,返回true可以阻塞自己
parkAndCheckInterrupt()) // 阻塞等待被唤醒,使用LockSupport.park() ------------------------------------此时外面lock.lock()会阻塞,业务代码等待。
interrupted = true;
}
} finally {
// 上面代码中只有一个return语句,且return的前一句就是failed = false;
// 所以只有当异常发生时,failed才会保持true的状态运行到此处;
// 异常可能是线程被中断,也可能是其他方法中的异常,
// 比如我们自己实现的tryAcquire方法
// 此时将取消线程获取锁的动作,将它从同步队列中移除
if (failed)
cancelAcquire(node);
}
}

方法的主流程如下:

image-20200615014513513

shouldParkAfterFailedAcquire 判断获取锁失败后是否应该park阻断#

首先记着这个方法是在一个死循环中,获取锁失败就来执行一遍。为了方式for死循环大量占用CPU,可以想象绝大部分节点必然是返回true,然后park的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 检查并更新无法获取锁的节点的状态。在所有循环里返回true就阻塞。要求传入的pred 就是node的前序Node.pre指向的那个
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 前序节点等待状态
// Node的WaitStatus,这里的SIGNAL是release方法写入的,下面再说
if (ws == Node.SIGNAL) // 前序节点已经release,发出信号了,返回true标识可以park阻塞---------------这是唯一一个可以阻塞的true返回
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { // 前序节点是取消状态,一直往前反向找到一个非取消的,这些Cancelled都被删掉了
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node; //--------------其实是删掉了所有中途取消的节点,继续外面的for死循环来进下一轮
} else { // 前一节点是0或者传播状态,我们需要一个信号,但是还没park阻塞,此轮CAS设置把前一节点为SIGNAL状态,让调用者重新再来一遍就会阻塞他。
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // pre设置为SIGNAL,下一轮来入口返回true,会阻塞
}
return false; //-----------删掉了cancel节点,或者SIGNAL了0或者传播节点,下一轮来阻塞
}

上面方法的流程图如下:

https://zhuanlan.zhihu.com/p/54297968

preview

image-20200608160739853

公平锁和非公平锁的体现#

“不管公平还是非公平模式下,ReentrantLock对于排队中的线程都能保证,排在前面的一定比排在后面的线程优先获得锁”但是,非公平模式不保证“队列中的第一个线程一定就比新来的(未加入到队列)的线程优先获锁”因为队列中的第一个线程尝试获得锁时,可能刚好来了一个线程也要获取锁,而这个刚来的线程都还未加入到等待队列,此时两个线程同时随机竞争,很有可能,队列中的第一个线程竞争失败(而该线程等待的时间其实比这个刚来的线程等待时间要久)。

因为非公平锁上来就CAS开抢:

1
2
3
4
5
6
7
8
9
10
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); //AQS,会调用下面的方法,直接开枪,可能和队列最前面那个一起竞争。
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

而公平锁hasQueuedPredecessors()返回false,没有排在自己前面的才能去抢:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && // 没有排在自己前面的,才能抢
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

// 线程的等待被取消
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
// SIGNAL后继节点即将被唤醒(表示该线程一切都准备好了,就等待锁空闲出来给我)
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
// 线程在等待某一个条件(Condition)被满足
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
//下一个共享获取应该是 无条件传播(当线程处在“SHARED”模式时,该字段才会被使用上,EXCLUSIVE独占模式不会)
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

/**
* Status field, taking on only the values
后继节点正在被park block,所以当前节点释放锁或者取消的时候必须要unpark唤醒后继。
为了避免竞争,获取方法必须首先表明他们需需要一个信号,
然后会尝试原子性的获取锁,如果失败才会block。
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
这个节点因为超时或者打断被取消,节点再也不会离开这个状态
特别是一个线程取消后再也不会block
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
条件:这个节点正在一个条件队列中,他不会用作同步队列的节点,直到被transfer,这时候状态会被设置为0

* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
传播:一个释放锁应该被传播到其他节点,在doReleaseShared中被设置来保证连续传播,除非其他操作介入。
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
waitStatus以数值型排列以简化使用,非负数代表节点不需要signal
大多数代码不想要检查特定的数值(只用判断符号)
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
正常同步代码初始化为0,contidion节点的时候设置为CONDITION(-2),使用CAS修改。
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;

Node 的类注释机翻

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<p>
等待队列是“ CLH”(Craig,Landin和Hagersten)锁定队列的变体。
CLH锁通常用于自旋锁。相反,我们将它们用于阻塞同步器,但是使用相同的基本策略,将有关线程的某些控制信息保存在其节点的前身中。
每个节点中的“状态”字段将跟踪线程是否应阻塞。
节点的前任释放时会发出信号。
否则,队列的每个节点都充当一个特定通知样式的监视器,其中包含一个等待线程。虽然状态字段不控制是否授予线程锁等。
线程可能会尝试获取它是否在队列中的第一位。
但是先行并不能保证成功。它只赋予了抗辩的权利。
因此,当前发布的竞争者线程可能需要重新等待。

<p>要加入CLH锁,您可以自动将其作为新尾部拼接。
要出队,您只需设置头字段。

<p>插入到CLH队列中,只需要对“ tail”执行一次原子操作,因此有一个从非排队到排队的简单原子分界点。
同样,出队仅涉及更新“头”。但是,节点需要花费更多的精力来确定其后继者是谁,部分原因是要处理由于超时和中断而可能导致的取消。
<p>“ prev”链接(在原始CLH锁中未使用)主要用于处理取消。
如果取消某个节点,则其后继节点(通常)会重新链接到未取消的前任节点。有关自旋锁情况下类似机制的解释,请参见Scott和Scherer的论文,网址为http://www.cs.rochester.edu/u/scott/synchronization/。<p>

我们还使用“下一个”链接来实现阻止机制。
每个节点的线程ID保留在其自己的节点中,因此,前任通过遍历下一个next以确定它是哪个线程,来发信号给下一个节点唤醒。
确定后继者必须避免与新排队的节点竞争以设置其前任节点的“ next”字段。----? Determination of successor must avoid races with newly queued nodes to set the "next" fields of their predecessors.

在必要时,可以通过在节点的后继者似乎为空时从原子更新的“尾部”反向检查来解决此问题。
(或者换句话说,next是一种优化,因此我们通常不需要反向扫描。)
<p>Cancellation 对基本算法引入了一些保守性。由于我们必须轮询其他节点的取消,因此我们可能会遗漏没有注意到已取消的节点在我们前面还是后面。要解决此问题,必须始终在取消时取消后继者,使他们能够稳定在新的前任者身上,除非我们能确定一个未取消的前任者将承担这一责任。 <p> CLH队列需要一个虚拟标头节点才能开始。但是,我们不会在构建过程中创建它们,因为如果没有争执,那将是浪费时间。取而代之的是,构造节点,并在第一次争用时设置头和尾指针。 <p>等待条件的线程使用相同的节点,但是使用附加的链接。条件只需要在简单(非并行)链接队列中链接节点,因为仅当它们专用时才可以访问它们。等待时,将节点插入条件队列。收到信号后,该节点将转移到主队列。状态字段的特殊值用于标记节点所在的队列。 <p>感谢Dave Dice,Mark Moir,Victor Luchangco,Bill Scherer和Michael Scott以及JSR-166专家组的成员,对本课程的设计提供了有益的想法,讨论和批评。

AQS锁释放的源码#

AbstractQueuedSynchronizer#release() 释放锁的方法,是底层释放锁的实现。

1
2
3
4
5
6
7
8
9
10
// unlock的实现,就一行代码调用这个AQS的release
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试释放锁,子类ReentrantLock实现了,如见下面(就是看独占线程ok,state-1,清除独占线程,返回free==0。)
Node h = head; // 拿到head,head不为空,而且不是刚初始化
if (h != null && h.waitStatus != 0) // waitStatus != 0 说明刚刚释放的过程中,又有一个新的来了,被后代把自己的waitStatus改成了SIGNAL
unparkSuccessor(h); // 解除后继的阻塞
return true;
}
return false;
}

ReentrantLock#tryRelease(1)释放锁,不用CAS,因为线程不一样释放不了。

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 锁的总数-释放数量 0的话就释放完了,>0的话说明有重入没释放完
if (Thread.currentThread() != getExclusiveOwnerThread()) // 解锁线程不是当前正在独占的线程就抛异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 释放完了,清空独占线程
free = true;
setExclusiveOwnerThread(null);
}
setState(c); // 给stat设置剩余量,返回是否释放完了
return free;
}

解锁时唤醒后代:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 解锁时唤醒后代
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0) // SIGNAL的,修改自己的wait状态为0
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
// 后代为空(可能是队列从前往后唤醒到最后一个了,或者后代被取消)
if (s == null || s.waitStatus > 0) {
s = null; // 从尾巴反向往前走,如果队列是右边这种,就一直往前捋到一个正常的为止(head--cancle---cancle--...---node---tail)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 后代不为空,解除park
if (s != null)
LockSupport.unpark(s.thread);
}

后继节点唤醒之后,又回到上面的acquireQueued()方法,去进行下一轮死循环,判断pre是不是head,是不是已经解锁了,CAS抢占锁。

抢到后:head的next置为null,GC回去干掉head。把唤起的自己这个node变成新head,清理新head里面的thread信息和head里的pre信息。

然后当前唤起节点获取到锁的线程开始继续执行后面的业务方法。。。

ReentrantReadWriteLock读写锁(共享锁)#

独享锁:也叫排他锁,是指该锁一次只能被一个线程所持有。如果线程T对数据A加上排它锁后,则其他线程不能再对A加任何类型的锁。获得排它锁的线程即能读数据又能修改数据。JDK中的synchronized和JUC中Lock的实现类就是互斥锁。

共享锁:是指该锁可被多个线程所持有。如果线程T对数据A加上共享锁后,则其他线程只能对A再加共享锁,不能加排它锁。获得共享锁的线程只能读数据,不能修改数据。

下面是JDK的读写分离锁代码实现:

读写分离锁,在读比较多(耗时)的场合比常规的重入锁更加有效率。

  • 读-读线程互不阻塞,多少各线程都可以并行一起读。
  • 但是当读-写或者写-写线程相互竞争的时候会阻塞获取锁才可以操作

ReentrantReadWriteLock内部:

  • 持有一个int类型的status锁状态。

  • 一个Sync同步器,同步器继承自AQS,并有公平非公平两种实现。

  • 持有一个共享的ReadLock,一个独占的WriteLock。

image-20200615143328743

可以看到ReentrantReadWriteLock有两把锁:ReadLock和WriteLock,由词知意,一个读锁一个写锁,合称“读写锁”。再进一步观察可以发现ReadLock和WriteLock是靠内部类Sync实现的锁。Sync是AQS的一个子类,这种结构在CountDownLatch、ReentrantLock、Semaphore里面也都存在。

在ReentrantReadWriteLock里面,读锁和写锁的锁主体都是Sync,但读锁和写锁的加锁方式不一样。读锁是共享锁写锁是独享锁读锁的共享锁可保证并发读非常高效而读写、写读、写写的过程互斥,因为读锁和写锁是分离的。所以ReentrantReadWriteLock的并发性相比一般的互斥锁有了很大提升。

类图如下:

image-20200615150705528

读写锁的status设计#

AQS的时候我们也提到了state字段(int类型,32位),该字段用来描述有多少线程获持有锁。

  • 独享锁中这个值通常是0或者1(如果是重入锁的话state值就是重入的次数),在共享锁中state就是持有锁的数量
  • 但是在ReentrantReadWriteLock中有读、写两把锁,所以需要在一个整型变量state上分别描述读锁和写锁的数量(或者也可以叫状态)。
  • 于是将state变量“按位切割”切分成了两个部分,高16位表示读锁状态(读锁个数),低16位表示写锁状态(写锁个数)。如下图所示:

image-20200615151414084

代码中搞了一个EXCLUSIVE_MASK,是2^16-1。

通过和state求&可以拿到低16位的数据。

通过和state>>16位拿到高16位的读锁数据。

写锁的代码#

了解了概念之后我们再来看代码,先看写锁的加锁源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero and owner is a different thread, fail. //读锁非0,写锁非0,或线程非当前,返回失败
* 2. If count would saturate, fail. (This can only happen if count is already nonzero.) // 数据超max,返回失败(max是2^16-1,因为status的int劈成两半一半给读,一半给写了)
* 3. Otherwise, this thread is eligible for lock if // 其他情况则可以获取锁并设置占有线程
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState(); // 获得锁的个数
int w = exclusiveCount(c); // 获取写锁的个数
if (c != 0) { // 已经有线程获取锁了
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) // 写锁为0,即读锁不为0. 或 非当前线程持有写锁,就返回失败
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 超过2^16-1个写线程最大数量,失败
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
// w=0,且当前线程需要阻塞,返回失败false(非公平writerShouldBlock永远返回false不用阻塞,公平锁看队列有没有排队,有就true排队)
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires)) // 否则CAS增加写线程数量失败也返回false
return false;
setExclusiveOwnerThread(current); // c=0,w=0 或者 c>0,w>0重入,设置当前线程占有,返回true
return true;
}
  • 这段代码首先取到当前锁的个数c(这个数可能很大,是由高16位和低16位组合起来的),然后再通过c来获取写锁的个数w。:
    • 因为写锁是低16位,所以取低16位的最大值与当前的c做与运算( int w = exclusiveCount©; return c & EXCLUSIVE_MASK;这个EXCLUSIVE_MASK是2^16-1,是全1 ),高16位和0与运算后是0,剩下的就是低位运算的值,同时也是持有写锁的线程数目。
  • 在取到写锁线程的数目后,首先判断是否已经有线程持有了锁。如果已经有线程持有了任何锁(c!=0),则查看当前写锁线程的数目,如果写线程数为0(即此时存在读锁)或者持有写锁的线程不是当前线程就返回失败(涉及到公平锁和非公平锁的实现)。
  • 如果本轮写后,写入锁的地位数量大于最大数(65535,2的16次方-1)就抛出一个Error。
  • 如果当前写线程数为0(那么读线程也应该为0,因为上面已经处理c!=0的情况),并且当前线程需要阻塞那么就返回失败;(非公平writerShouldBlock永远返回false,公平锁看队列有没有排队,有就true)如果通过CAS增加写线程数失败也返回失败。
  • 如果c=0,w=0或者c>0,w>0(重入),则设置当前线程或锁的拥有者,返回成功!

后面的添加队列阻塞和唤醒与ReentrantLock相同。

上面可以看到tryAcquire()除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取,原因在于:必须确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。

因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,然后等待的读写线程才能够继续访问读写锁,同时前次写线程的修改对后续的读写线程可见。

接着是读锁的代码:#

读锁的lock()方法调用了这个

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 上来先去try获取共享锁一下
doAcquireShared(arg);
}

下面是获取一次共享锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail. 写锁再用,fail
* 2. Otherwise, this thread is eligible for 写锁没用,就直接去看队列是否应该排队阻塞。不用的话直接CAS改读锁数量。
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant 没有检查重入,推迟到后面的全版本doAcquireShared中
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread 如果写锁占有,或者CAS失败,就去doAcquireShared里面轮循加锁
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1; // 其他线程占用写锁,返回-1
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

可以看到在tryAcquireShared(int unused)方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁。读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的值是“1<<16”。所以读写锁才能实现读读的过程共享,而读写、写读、写写的过程互斥。

获取一次读锁失败,进入下面进入队列阻塞,循环获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 添加节点
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg); // 前一个是头,获取一次
if (r >= 0) { // 成功了
setHeadAndPropagate(node, r); // 从前往后叫醒所有的节点
p.next = null; // help GC // 删掉头
if (interrupted) // 响应park时候的interrupt
selfInterrupt();
failed = false;
return; // 返回
}
}
// 前一个SIGNAL了才true阻塞,否则删掉前面所有CANCLE的,CAS修改pre的状态为SIGNAL,返回false再来一轮阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 阻塞
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

此时,我们再回头看一下互斥锁ReentrantLock中公平锁和非公平锁的加锁源码:

image-20200615164128985

我们发现在ReentrantLock虽然有公平锁和非公平锁两种,但是它们添加的都是独享锁。

根据源码所示,当某一个线程调用lock方法获取锁时,如果同步资源没有被其他线程锁住,那么当前线程在使用CAS更新state成功后就会成功抢占该资源。

而如果公共资源被占用且不是被当前线程占用,那么就会加锁失败。所以可以确定ReentrantLock无论读操作还是写操作,添加的锁都是都是独享锁。

AQS的ConditionObject、await/signal#

ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,它实现了java.util.concurrent.locks.Condition接口。(下文的Condition都指的是ConditionObject)

Condition的实现,主要包括:等待队列、等待和通知

Condition使用了一个等待队列来记录wait的节点们(和之前的同步队列不是一个)

API示例#

lock.newCondition(); 给一个锁创建一个条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 使用lock构造condition
ReentrantLock lock = new ReentrantLock();
Condition fullCondition = lock.newCondition();
Condition emptyCondition = lock.newCondition();

public T take(){
lock.lock(); // 先获取锁
if(queue.isEmpty()){
try {
// 通知因队列满而阻塞到fullCondition上的线程唤起
fullCondition.signalAll(); // 唤醒等待在Condition上所有的线程。
// 队列空的condition开始挂起
emptyCondition.await(); // 当前线程进入等待状态,直到被通知(signal)或者被中断时,当前线程进入运行状态,从await()返回;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// take元素
T obj = queue.remove();
//通知因队列满而阻塞到fullCondition上的线程唤起
fullCondition.signalAll();
lock.unlock(); // 解锁
return obj;
}

Condition的等待队列#

FIFO的队列,每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程。并不复杂。

如果一个线程调用了**Condition.await()**方法,那么该线程将会:

  • 构造节点加入等待队列(没有CAS设置尾巴,因为前面必然获取到锁了)
  • 释放当前线程的同步状态,唤醒后继节点,且当前线程进入WATING等待状态
  • 当从await()方法返回时,当前线程一定获取了Condition相关联的锁。

image-20200615174451189

节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
private transient Node firstWaiter; // 头
private transient Node lastWaiter; // 尾

// 添加waiter到队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// 整理尾巴,删掉CANCLE的
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 构造一个节点,续到tail后面即可
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

await等待#

需要先lock.lock() 然后 condition.await(),await会释放掉锁,线程进入等待队列,状态变成Wating。

下次唤醒后再重新获取锁(不一定能获取到,获取不到再次for死循环获取进入同步队列,park)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

// await核心API
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加一个尾巴
Node node = addConditionWaiter();
// 释放当前线程节点的同步状态,唤醒后继节点;
int savedState = fullyRelease(node);
int interruptMode = 0;
// 线程进入等待状态;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break; // 线程被【唤醒后】,从while循环中退出,继续执行后面的方法去获取锁
}
// 调用acquireQueued尝试获取同步状态;(ReentrantLock也使用了这个方法,就是获取一次,然后for死循环获取,或者park等待)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。

当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。

await具体执行流程如下:

  • 调用addConditionWaiter将当前线程加入等待队列;
  • 调用fullRelease释放当前线程节点的同步状态,唤醒后继节点;
  • 线程进入等待状态;
  • 线程被唤醒后,从while循环中退出,调用acquireQueued尝试获取同步状态;
  • 同步状态获取成功后,线程从await方法返回。

如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。(获取锁的时候,同步队列的一个header必然释放了。然后等待队列尾巴新增加了一个Node,线程是当前线程。但从内容上讲几乎相当于移动。)

image-20200615184534526

signal唤醒#

调用Condition的signal()方法将会唤醒再等待队列中的首节点,该节点也是到目前为止等待时间最长的节点。

1
2
3
4
5
6
7
public final void signal() {
if (!isHeldExclusively()) // 当前运行线程t1是否是获取了锁的线程,如果不是抛出异常IllegalMonitorStateException
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null) // 取得等待队列的头结点,头结点不为空执行doSignal
doSignal(first);
}

step1:前置检查,判断当前线程是否是获取了锁的线程,如果不是抛出异常IllegalMonitorStateException,否则,执行step2;

step2:取得等待队列的头结点,头结点不为空执行doSignal,否则,signal结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 调用transferForSignal将节点从等待队列的first移动到同步队列.将该节点从等待队列删除。
(first = firstWaiter) != null);
}

整个doSignal完成了这两个操作:调用transferForSignal将节点从等待队列移动到同步队列,并且,将该节点从等待队列删除。

怎么transfer的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //CAS修改CONDITION
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); // 这里和上面ReentrantLock的enq方法是一样的,初始化队列,或者加入同步队列尾巴,for死循环去续尾巴成功为止,没有park
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 到达这里unpark去唤醒这个节点里面的线程t2。
return true;
}
  • step1:将节点waitStatus设置为0,设置成功执行step2,否则(CANCLE)返回false;
  • step2:调用enq方法将该节点加入同步队列;
  • step3:使用LockSuppor.unpark()方法唤醒该节点的线程。

再次回顾我们AQS的enq方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Inserts node into queue, initializing if necessary node节点进行一次入队,需要的时候进行初始化
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 拿出尾
if (t == null) { // Must initialize 没有尾,说明是第一次,需要初始化一个空的head再往后排
if (compareAndSetHead(new Node())) // head使用CAS初始化了一个虚拟的傀儡节点!这里是重点!!!!此时多线程的锁会竞争
tail = head; // 初始化首尾都是同一个,然后重新来for一次添加尾
} else { //第二轮以上for,初始化过,往最后排
node.prev = t; // 当前节点的前一个设置为前尾
if (compareAndSetTail(t, node)) { // CAS去看前尾变化没,没变化就变更新尾,此时多线程的锁会竞争
t.next = node; // 前尾的next指向新尾,尾巴成功修改
return t; // CAS续成功才会返回,否则永远来续-----------------------唯一出口
}// CAS失败,说明前尾变更,被别的线程抢先续尾了,重新来入队续尾
}
}
}

整个signal系列方法将线程从等待队列移动到同步队列可以总结为下图:

就是把等待队列(wait队列)的第一个节点transfer,通过enq方法丢到同步队列的tail上,更新tail。然后再unpark这个tail里面的线程。

被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。
成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。

image-20200615184552315

Condition的signalAll()方法,将等待队列中的所有节点全部唤醒,相当于将等待队列中的每一个节点都执行一次signal()。

参考#

《Java并发编程的艺术》

https://www.cnblogs.com/tuyang1129/p/12670014.html

美团技术公众号的共享锁部分

https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

https://www.cnblogs.com/fsmly/p/11274572.html