一、ReentrantReadWriteLock概述
ReentrantReadWriteLock
是读写锁的实现,读锁可以在没有写锁的时候被多个线程同时持有,写锁就是和 ReentrantLock
类似的独占锁,它同样有包含公平锁和不公平锁两种实现方式,在实例化时将同时实例化 ReadLock
和 WriteLock
实例,本文主要聚焦于 WriteLock
的实现分析,因为 ReentrantReadWriteLock
中的部分实现与 ReentrantLock
一样基于 AbstractQueuedSynchronized
的实现,对于一些相同的逻辑本文不再赘述。
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
1.1 锁状态码存储
在 ReentrantLock
中,通过 state
参数保存锁状态码,状态码存储着当前锁重入的次数。在 ReentrantReadWriteLock
中同时包含读和写两种锁类型,需要两个状态码来保存锁状态。ReentrantReadWriteLock
将 state
低 16 位用于存储写锁状态,高16位用于存储读锁状态。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 返回以计数表示读锁的状态 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回以计数表示写锁的状态 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
state
存储的读锁状态为所有读锁状态的总和,每个读锁都通过 HoldCounter
对象实例存储自己当前的锁状态。
二、Sync 的公平和非公平实现
**公平锁: ** 多个线程相互竞争时要排队,多个线程按照申请锁的顺序来获取锁。
**非公平锁: ** 多个线程相互竞争时,先尝试插队,插队失败再排队。
Sync
包含 FairSync
和 NonfairSync
两个子类,分别复写了如下两个方法:
boolean readerShouldBlock()
:如果当前线程在试图获得读锁或操作时时,方法返回true
表示当前线程需要排队,返回false
表示线程可以先插队获取锁,或者当前队列为空不需要排队。boolean writerShouldBlock()
:如果当前线程在尝试获取写锁或其他操作时,方法返回true
表示当前线程需要排队,返回false
表示线程可以在排队前先插队获取锁,或者当前队列为空不需要排队。
2.1 FairSync 公平锁
FairSync
是 ReentrantReadWriteLock
公平锁的实现。FairSync
通过 hasQueuedPredecessors
方法检查当前 AQS 队列,hasQueuedPredecessors
方法在有其他线程的节点排队在队列首部则返回 true
,否则返回 false
。所以,在公平锁中获取读、写锁时,只要队列首部是其他线程都需要进行排队,除非队列为空或者队列首部为当期线程,以此保证锁竞争公平。
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
2.2 NonfairSync 非公平锁
NonfairSync
是 ReentrantLock
非公平锁的实现。在获取独占锁相关操作时,恒定返回 false
,无论在哪种情况下都将尝试插队;在获取共享锁相关操作时,如果队列不为空,且队列首部节点在等待独占锁,则将返回 true
不进行插队,此操作是为了防止独占锁饥饿。
// 独占锁无论如何都尝试插队
final boolean writerShouldBlock() {
return false;
}
// 如果队列不为空,且队列首部节点在等待独占锁,则返回true进行排队(避免独占锁饥饿)
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
问:独占锁是如何产生锁饥饿的?
独占锁饥饿原因在于独占锁仅能被一个线程获取,共享锁可以被多个线程获取。
如果共享锁允许无限插队,假如当前有10个共享锁,执行了一段时间释放了5个共享锁,但是又有5个共享锁插队进来了,如此循环往复,一直有线程持有着共享锁,导致共享锁一直没有被释放,此时独占锁就无法被持有。
apparentlyFirstQueuedIsExclusive
在队列不为空,且 head
后续节点不是共享锁状态时返回 true
。
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
三、WriteLock 实现
3.1 lock 实现流程
lock
用于获取锁,不响应中断。WriteLock
获取锁的流程整体上和 ReentrantLock
获取锁的流程相同,不同点体现在 tryAcquire
方法。
lock
获取锁时,首先执行的是 tryAcquire
方法, tryAcquire
将直接尝试获取锁,此时不通过 AQS 队列获取锁,线程在 AQS 队列中并不存在对应的 node
节点。如果未获取到锁,此时将进入 AQS 队列排队,addWaiter
步骤中可同时进行队列初始化,队列排队的核心逻辑在 acquireQueued
方法,在该方法中将循环判断当前线程是否时候 AQS 队列的第二个 node 线程(第一个 node
为持有锁线程),如果是的话执行 tryAcquire
进获取锁尝试,如果获取锁失败,将进入 shouldParkAfterFailedAcquire
修改前一个节点的 node
节点状态,然后再次进入循环根据前一个节点状态阻塞当前节点线程,直到被唤醒。
3.1.1 tryAcquire 方法实现
tryAcquire
用于以独占的方式获取锁,返回 boolean 表示获取锁是否成功。tryAcquire
中先获取了当前锁的状态,如果当前锁已被持有,将判断当前持有的锁是不是独占锁、当前线程是否是持有锁的线程和锁状态是否超过最大值,都满足则进行重入;如果当前锁未被持有,则通过 writerShouldBlock
判断是否跳过排队直接尝试获取锁,如果 writerShouldBlock
返回 true
则进行 CAS 获取锁尝试。
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
// 取得当前独占锁状态码
int w = exclusiveCount(c);
// 当前已经持有了独占锁或者共享锁
if (c != 0) {
// 当前未持有独占锁,或者持有锁的不是当前线程,则获取锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 独占锁值超过了最大值
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 修改锁状态
setState(c + acquires);
return true;
}
// writerShouldBlock 判断是否非公平,是否需要插队
if (writerShouldBlock() ||
// CAS 修改锁状态
!compareAndSetState(c, c + acquires))
return false;
// CAS获取锁成功,修改持有锁的线程
setExclusiveOwnerThread(current);
return true;
}
3.2 lockInterruptibly 实现流程
lockInterruptibly
用于获取锁,如果收到中断信号则中止获取。除了 tryAcquire
所有实现流程和 ReentrantLock
的 lockInterruptibly
实现流程相同。整体的实现和 lock
方法也是大同小异,只是在 lockInterruptibly
中 AQS 队列接收到中断信号直接终止获取锁,并抛出 InterruptedException
异常,而 lock
仅仅保留了中断标志。
3.3 tryLock 实现流程
不携带时间参数的 tryLock
方法用于尝试获取锁,非公平,不阻塞线程。方法中通过 tryWriteLock
方法来尝试获取锁,整体上获取锁流程和 tryAcquire
方法相同,不同的是去除了判断是否通过 CAS 插队获取锁的判断,无论如何都进行获取锁尝试,所以这是一个非公平的获取锁尝试。
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
// 锁已被持有
if (c != 0) {
// 独占锁状态
int w = exclusiveCount(c);
// 被持有的不是独占锁,或者持有锁的不是当前线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 锁状态已经是最大值了
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
// 尝试CAS修改锁状态
if (!compareAndSetState(c, c + 1))
return false;
// 修改成功,修改持有锁线程
setExclusiveOwnerThread(current);
return true;
}
携带时间参数的 tryLock
用于在给定的等待时间内尝试获取锁,获取成功则立即返回 true
。超过等待时间获取失败或者接收到中断信号则返回 false
。整体执行流程和 ReentrantLock
相同,只是 tryAcquire
方法的逻辑有些不同。
带时间参数的 tryLock
将时间转为纳秒级时间单位,然后传给 tryAcquireNanos
。tryAcquireNanos
方法中先直接调用 tryAcquire
尝试获取锁,获取锁失败再调用 doAcquireNanos
方法进行排队。doAcquireNanos
将创建 node
节点加入同步队列,然后以独占定时模式获取锁,尝试获取锁失败后将阻塞指定时间的获取锁线程。当阻塞时间到之后再次尝试获取锁,阻塞时间完成或者在阻塞时接收到中断信号时停止获取锁尝试,最终方法返回值将指定是否成功获取到锁。
3.4 unlock 实现流程
unlock
通过 tryRelease
方法进行锁状态修改和释放锁,如果完全释放了锁,将进行 AQS 队列判断。如果队列已经初始化,将更新当前节点的节点状态为 0,并尝试唤醒 head
后继的第一个等待获取锁的节点。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
3.4.1 tryRelease 方法实现
tryRelease
用于进行释放锁操作,返回值表示是否完全释放了锁。方法首先判断当前线程是否是持有锁线程,如果未持有锁将抛出 IllegalMonitorStateException
异常,然后计算锁状态,如果修改后的锁状态值为 0,表示锁被完全释放,此时将清除持有锁的线程。
protected final boolean tryRelease(int releases) {
// 当前线程是否持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 计算锁状态
int nextc = getState() - releases;
// 锁状态为0表示锁被完全释放,不为0表示只是减扣除重入,线程依旧持有锁
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 清除持有锁线程
setExclusiveOwnerThread(null);
// 修改锁状态
setState(nextc);
return free;
}
3.5 newCondition 实现流程
WriteLock
和 ReentrantLock
都是实例化的 ConditionObject
类,除了部分方法有差别,整体工作流程和原理相同,本文不做赘述。
四、ReadLock 实现
ReadLock
一些全局变量的介绍:
firstReader
:第一个持有共享锁的线程,释放锁后为null
;firstReaderHoldCount
:第一个持有共享锁的线程的持有锁计数,即锁状态;readHolds
:当前线程持有的ReadLock
锁的数量, 仅在构造函数和 readObject 中初始化,每当线程的共享锁状态下降到 0 时删除;cachedHoldCounter
:最后一个成功获取共享锁线程的缓存;
HoldCounter
对象用于记录持有共享锁的线程,记录了持有锁的线程的线程id和锁计数。
4.1 lock 实现流程
共享锁通过 acquireShared
方法获取锁。获取锁时,先进行获取锁尝试,获取锁失败进入 AQS 队列排队。在获取锁的流程中,创建节点和加入 AQS 进行排队的流程和 WriteLock
相同,区别体现在 tryAcquireShared
获取锁的方法中。
获取锁时,如果其他线程持有了独占锁则直接获取失败,否则将调用 readerShouldBlock
方法判断是否尝试插队获取锁。获取锁时通过 CAS 模式更新 state
写状态的值,更新成功则表示获取锁成功。除此之外还需更新当前线程对应的 ReadLock
锁状态,即锁计数,该值通过 HoldCounter
存储,并通过 cachedHoldCounter
缓存最后一个获取共享锁线程的 HoldCounter
。首个持有共享锁的线程较为特殊,通过 firstReader
和 firstReaderHoldCount
存储持有锁的线程和锁状态计数。
4.1.1 tryAcquireShared 方法实现
在当前方法中仅初步的进行共享锁获取,获取锁失败将进入 fullTryAcquireShared
方法进行进一步的锁状态获取。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 已获取独占锁,且持有锁线程的不是当前线程
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// 获取锁失败,需要排队
return -1;
// 取得共享锁状态
int r = sharedCount(c);
// readerShouldBlock:判断是否需要插队获取锁,公平和非公平锁实现不同
if (!readerShouldBlock() &&
// 锁状态小于最大值
r < MAX_COUNT &&
// CAS修改锁状态
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// 当前线程为第一个持有共享锁的线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 当前线程为第一个持有共享锁的线程,直接增加共享锁状态
firstReaderHoldCount++;
} else {
// 取得最后一次获取readLock锁线程的缓存
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
// 缓存的不是当前线程,更新缓存,HoldCounter如果不存在将创建
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 缓存的readLock锁已被释放,重新添加
readHolds.set(rh);
// 增加锁计数
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
4.1.2 fullTryAcquireShared 方法实现
完整版本的获取共享锁流程,处理 CAS 未命中和未在 tryAcquireShared
中处理的可重入读取。这段代码与 tryacquirered
中的代码在一定程度上是冗余的。
在当前方法中进行循环获取锁,获取锁时将会有以下三种情况不同的情况:
- 有线程已经持有了独占锁,如果持有独占锁的线程不是当前线程,直接返回
-1
获取锁失败,持有独占锁的线程是当前线程则正常进行共享锁获取。 - 未被获取独占锁,但是
readerShouldBlock
为true
,不允许插队获取锁。如果当前线程为第一个持有共享锁的线程,则正常进入获取锁流程。如果非第一次持有锁线程,则需要先判断当前线程是否已经持有了共享锁,未持有共享锁时将清除readHolds
中的引用,并直接返回-1
后续通过排队进行锁获取。 - 未被获取独占锁,但是
readerShouldBlock
为false
,允许插队获取锁。此时将直接进入获取锁流程。
获取独占锁时通过 CAS 直接尝试修改 state
的值,如果修改成功更新当前持有共享锁线程的状态和缓存,修改失败则进入下一次循环。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
// 已获取独占锁,且持有锁线程的不是当前线程
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
// 获取锁失败,需要排队
return -1;
} else if (readerShouldBlock()) {
// 不允许插队获取锁
// 当前为第一个持有共享锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
// 取得当前线程的 HoldCounter,先从缓存获取,再从readHolds获取
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
// 锁状态为0删除readHolds的锁引用
if (rh.count == 0)
readHolds.remove();
}
}
// 锁状态为0则返回获取锁失败
if (rh.count == 0)
return -1;
}
}
// 共享锁计数为最大值
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS 修改锁状态
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 修改前共享锁状态为0,当前为第一个持有锁的线程
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 当前为持有锁线程,当前共享锁的状态加1
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
// 当前缓存的最后一个获取共享锁的线程不是当前线程,更新缓存
if (rh == null || rh.tid != getThreadId(current))
// HoldCounter如果不存在将创建
rh = readHolds.get();
// 获取到的缓存中锁状态为0,需要重新添加到readHolds
else if (rh.count == 0)
readHolds.set(rh);
// 增加锁计数
rh.count++;
cachedHoldCounter = rh;
}
return 1;
}
}
}
问:为什么需要循环进行锁获取?
与独占锁不同,共享锁允许多个线程同时持有。独占锁通过 CAS 修改锁状态失败,则表示其他线程修改了锁状态,锁已被其他线程获取,需要进入排队。但是共享锁通过 CAS 修改锁状态失败,并不一定导致当前线程不能获取共享锁,除非其他线程获取了独占锁,所以获取锁失败需要通过循环再次尝试获取,确定锁的状态。
问:ReentrantReadWriteLock如何进行锁降级?
从上述源码分析,如果当前线程持有 WriteLock,是有权再获取ReadLock锁的,所以可以进行锁降级。
降级流程为:获取WriteLock->获取ReadLock->释放->WriteLock
4.1.3 doAcquireShared 方法实现
为当前线程创建 node
节点并加入同步队列尾部进行排队。排队获取锁逻辑与获取独占锁的 acquireQueued
方法相同 。
private void doAcquireShared(int arg) {
// 创建共享节点,并插入队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 当前线程是否被中断
boolean interrupted = false;
for (;;) {
// 取得前一个节点
final Node p = node.predecessor();
// 前一个节点为head,表示当前节点可以尝试获取锁
if (p == head) {
// 尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 更新head节点和引用,如果next为共享节点进行唤醒
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 设置中断状态
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// shouldParkAfterFailedAcquire:检查和更新未能获取锁的节点的状态, 如果线程应该阻塞返回true
// parkAndCheckInterrupt:挂起线程,被唤醒后检查线程是否中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 保存中断状态
interrupted = true;
}
} finally {
if (failed)
// 取消获取锁尝试
cancelAcquire(node);
}
}
4.1.4 setHeadAndPropagate 方法实现
当前方法在获取共享锁成功后被调用,用于更新 head
引用,并判断下一个等待节点是否是共享锁节点,如果是共享锁需要进行唤醒。
在创建锁成功后进行调用,propagate值恒大于0
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 更新head节点,删除node的prev和thread引用
setHead(node);
// propagate大于0、原/新头节点为null或未被取消
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 下一个节点为共享锁节点
if (s == null || s.isShared())
// 进行节点唤醒
doReleaseShared();
}
}
4.2 lockInterruptibly 实现流程
lockInterruptibly
通过 acquireSharedInterruptibly
方法获取共享锁,整体流程和 ReentrantLock
的 lockInterruptibly
方法相同。
doAcquireSharedInterruptibly
方法为当前线程创建 node
节点并加入同步队列尾部进行排队,该方法响应中断。排队获取锁逻辑与获取独占锁的 doAcquireInterruptibly
方法相同 。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 创建共享节点,并插入队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 取得前一个节点
final Node p = node.predecessor();
// 前一个节点为head,表示当前节点可以尝试获取锁
if (p == head) {
// 尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 更新head节点和引用,如果next为共享节点进行唤醒
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// shouldParkAfterFailedAcquire:检查和更新未能获取锁的节点的状态,如果线程应该阻塞返回true
// parkAndCheckInterrupt:挂起线程,被唤醒后检查线程是否中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 中断时抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
// 取消获取锁尝试
cancelAcquire(node);
}
}
4.3 tryLock 实现流程
不携带时间参数的 tryLock
方法用于尝试获取锁,非公平,不阻塞线程。与 fullTryAcquireShared
方法相同,为了避免由于并发修改时 CAS 失败,需要循环进行锁获取,除非可以确定无法获取锁(当前已经获取了独占锁,且持有锁线程不是当前线程)。
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
// 已获取独占锁,且持有锁线程的不是当前线程
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
// 取得共享锁状态
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 尝试获取共享锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 获取共享锁成功
if (r == 0) {
//当前线程为第一个持有共享锁的线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 当前线程为第一个持有共享锁的线程,直接增加共享锁状态
firstReaderHoldCount++;
} else {
// 从缓存中获取当前线程的HoldCounter
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
// 未获取到,从readHolds取值或者创建
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 状态为0,表示已被从readHolds删除,重新加入
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
带时间参数的 tryLock
将时间转为纳秒级时间单位,然后传给 tryAcquireSharedNanos
,整体获取锁流程和获取独占锁的流程相同。tryAcquireSharedNanos
方法中先直接调用 tryAcquireShared
尝试获取锁,获取锁失败再调用 doAcquireSharedNanos
方法进行排队。doAcquireSharedNanos
将创建 node
节点加入同步队列,然后以独占定时模式获取锁,尝试获取锁失败后将阻塞指定时间的获取锁线程。当阻塞时间到之后再次尝试获取锁,阻塞时间完成或者在阻塞时接收到中断信号时停止获取锁尝试,最终方法返回值将指定是否成功获取到锁。
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 时间错误
if (nanosTimeout <= 0L)
return false;
// 计算到期时间
final long deadline = System.nanoTime() + nanosTimeout;
//创建共享节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 取得当前节点前一个节点
final Node p = node.predecessor();
if (p == head) {
// 尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 更新head节点和引用,如果next为共享节点进行唤醒
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
// 超时了
if (nanosTimeout <= 0L)
return false;
// shouldParkAfterFailedAcquire:检查和更新未能获取锁的节点的状态,如果线程应该阻塞返回true
// parkAndCheckInterrupt:挂起线程,被唤醒后检查线程是否中断
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 中断抛出异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
// 取消获取锁尝试
cancelAcquire(node);
}
}
4.4 unlock 实现流程
unlock
通过 releaseShared
方法进行释放锁。先调用 tryReleaseShared
方法进行锁状态修改,如果所有共享锁和独占锁都被完全释放,再调用 doReleaseShared
修改 head
节点状态并唤醒后续节点。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
4.4.1 tryReleaseShared 方法实现
进行共享锁状态和当前共享锁计数修改,如果共享锁状态为0,表示所有共享锁都被完全释放。
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 第一个持有锁的线程为当前线程,删除锁引用
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 取得当前线程对应的HoldCounter
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
//当前锁已被完全释放删除HoldCounter
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 锁状态-1
--rh.count;
}
// 可能存在多个共享锁同时unlock,将造成CAS失败,需要重试
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
// CAS修改status
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
4.4.2 doReleaseShared 方法实现
当所有共享锁都被完全释放时执行,修改 head
节点状态,如果 head
状态为 SIGNAL
将唤醒下一个 node
进行获取锁尝试。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//修改head的状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// head为SIGNAL状态,需要唤醒后续线程
unparkSuccessor(h);
}
// 修改head的状态为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果head节点改变则需要从新进行状态修改
if (h == head) // loop if head changed
break;
}
}
即使有其他线程正在进行获取/释放锁,也要确保
unlock
状态得到传播,这是源码注释的内容,暂时未理解为什么要这么做。逻辑实现上就是当
head
节点状态为 0 时设置状态为PROPAGATE
,判断h == head
这两个步骤,保证状态传播。如果head
被修改表示有其他线程通过AQS
队列获取锁,此时按理不再需要进行锁状态修改了,不是很明白。
4.5 newCondition 实现流程
ReadLock
不包含 Condition
功能,共享锁允许多个线程同时获取,没有线程等待和礼让的需求。调用当前方法将抛出 UnsupportedOperationException
异常。