一、StampedLock概述
StampedLock
是读写锁的实现,对比 ReentrantReadWriteLock
主要不同是该锁不允许重入,多了乐观读的功能,使用上会更加复杂一些,且仅支持非公平锁,但是具有更好的性能表现。StampedLock
的状态由版本和模式组成。 获取锁方法返回一个邮戳,表示和控制与锁状态相关的访问; 这些方法的“尝试”邮戳可能会返回特殊值 0 来表示获取锁失败。 锁释放和转换方法需要标记作为参数,如果它们与锁的状态不匹配则失败。本文对 StampedLock
的实现源码进行分析。
1.1 属性分析
state
属性的作用类似于 ReentrantLock
和 ReentrantWriteReadLock
锁的 state
属性,在 StampedLock
中按位将 state
分为了3个区域,其中低7位为共享锁的持有计数,第8位为独占锁的持有计数,高57位(包含独占锁持有计数位)表示当前锁的版本状态,用于乐观锁的校验。state
作为获取锁时响应的邮戳。state
是一个线程安全的属性,成功对该值进行修改则意味着获取锁的成功,类中的常量大都用于辅助对 state
进行位运算。
PS:整个
state
都作为获取锁时的邮戳返回,但是只有前 57 位作为锁版本,共享锁持有计数不参与锁版本计算,因为共享锁的计数对数据不会有影响。
// cpu核心数
private static final int NCPU = Runtime.getRuntime().availableProcessors();
// 在当前节点加入队列前,如果当前持有的是独占锁且队列头尾节点(whead和wtail)相等,先让其自旋一定的次数
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
// 在阻塞线程前,如果队列头尾节点(whead和wtail)相等,先让其自旋一定的次数
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
// 阻塞线程前,最大的自旋次数
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
/** The period for yielding when waiting for overflow spinlock */
private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
// 用于共享锁持有计数的位数,读锁占低7位
private static final int LG_READERS = 7;
// 共享锁获取时增加的持有计数
private static final long RUNIT = 1L;
// 独占锁持有计数的掩码,第7位,不允许重入,独占锁计数最大只能为1
private static final long WBIT = 1L << LG_READERS;
// 共享锁持有计数的掩码
private static final long RBITS = WBIT - 1L;
//共享锁持有计数最大值,大于等于当前值进行溢出处理
private static final long RFULL = RBITS - 1L;
// 共享和独占锁持有计数的掩码,用于进行与操作判断是否持有锁
private static final long ABITS = RBITS | WBIT;
// 锁版本的掩码(包括了独占锁计数位)
private static final long SBITS = ~RBITS; // note overlap with ABITS
// state 的初始值,防止初始tryOptimisticRead的值为0
private static final long ORIGIN = WBIT << 1;
// 中断信号
private static final long INTERRUPTED = 1L;
// 节点等待状态
private static final int WAITING = -1;
// 节点取消状态
private static final int CANCELLED = 1;
// 共享锁模式
private static final int RMODE = 0;
// 独占锁模式
private static final int WMODE = 1;
// 队列节点实体类
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait; // list of linked readers
volatile Thread thread; // non-null while possibly parked
volatile int status; // 0, WAITING, or CANCELLED
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}
// 队列头结点
private transient volatile WNode whead;
// 队列尾节点
private transient volatile WNode wtail;
// views
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;
// 锁状态
private transient volatile long state;
// 当共享锁持有计数达到饱和状态时,额外的共享锁计数
private transient int readerOverflow;
1.2 基础方法
根据前面参数分析结果,state
分段存储锁的状态信息,据此我们可以理解以下基础方法的逻辑。
boolean isWriteLocked(): 如果当前持有独占锁,则返回true 。
通过和 WBIT
进行与操作获取共享锁的持有计数,计数不为 0 表示当前持有独占锁。
public boolean isWriteLocked() {
return (state & WBIT) != 0L;
}
boolean isReadLocked(): 如果当前持有共享锁,则返回true 。
通过和 RBITS
进行与操作获取共享锁的持有计数,计数不为 0 表示当前持有共享锁。
public boolean isReadLocked() {
return (state & RBITS) != 0L;
}
int getReadLockCount(): 查询当前持有的共享锁计数。 该方法设计用于监视系统状态,而不是用于同步控制。
通过和 RBITS
进行与操作获取共享锁的持有计数,如果值大于或等于 RFULL
表示当前锁计数可能已经溢出了,需要和 readerOverflow
相加计算总的共享锁持有计数。
private int getReadLockCount(long s) {
long readers;
// 共享锁计数大于或等于最大读计数RFULL
if ((readers = s & RBITS) >= RFULL)
// 读计数加上溢出的值
readers = RFULL + readerOverflow;
return (int) readers;
}
二、获取独占锁
2.1 获取锁的接口方法
long writeLock(): 获取独占锁,如果该锁被另一个线程保持,则阻塞线程,直到拿到锁并返回邮戳。
writeLock
方法中先判断了当前锁是否已被持有,如果未被持有则 CAS 直接修改 state
的值,修改成功表示获取锁成功,返回当前 state
的值作为当前锁版本的邮戳。如果当前已经持有锁,或者CAS获取锁失败,则进入 acquireWrite
方法进行锁获取。
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
// 当前未持有锁
return ((((s = state) & ABITS) == 0L &&
// CAS修改state获取锁
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
// 当前持有锁或CAS获取失败,进入acquireWrite方法获取锁
next : acquireWrite(false, 0L));
}
long tryWriteLock(): 尝试获取独占锁,非公平,不阻塞线程。如果获取锁成功则返回邮戳,否则返回 0。
如果当前未持有任何锁,进行 CAS 获取锁,否则直接返回获取锁失败。
public long tryWriteLock() {
long s, next;
// 当前未持有锁
return ((((s = state) & ABITS) == 0L &&
// CAS修改state获取锁
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : 0L);
}
long tryWriteLock(long time, TimeUnit unit) throws InterruptedException: 在给定的等待时间内尝试获取独占锁,获取成功则立即返回邮戳。超过等待时间获取失败或者接收到中断信号则返回 0。
当前未处于中断状态,先通过 tryWriteLock
方法进行一次获取锁尝试,获取锁失败计算等待时间,通过 acquireWrite
方法进行锁获取。
public long tryWriteLock(long time, TimeUnit unit)
throws InterruptedException {
// 时间转换为纳秒
long nanos = unit.toNanos(time);
// 当前未处于中断状态
if (!Thread.interrupted()) {
long next, deadline;
// 先尝试直接获取锁
if ((next = tryWriteLock()) != 0L)
return next;
// 等待时间小于0直接返回获取锁失败
if (nanos <= 0L)
return 0L;
if ((deadline = System.nanoTime() + nanos) == 0L)
deadline = 1L;
// acquireWrite 进行定时获取锁,true表示响应中断
if ((next = acquireWrite(true, deadline)) != INTERRUPTED)
return next;
}
throw new InterruptedException();
}
long writeLockInterruptibly() throws InterruptedException: 调用后一直阻塞到获得独占锁,但是接受中断信号。
当未处于中断状态时通过 acquireWrite
方法进行获取锁,如果发生中断则抛出异常。
public long writeLockInterruptibly() throws InterruptedException {
long next;
// 当前未处于中断状态时通过acquireWrite方法进行获取锁
if (!Thread.interrupted() &&
(next = acquireWrite(true, 0L)) != INTERRUPTED)
return next;
throw new InterruptedException();
}
2.2 acquireWrite 方法实现
acquireWrite
方法用于获取独占锁,主要包含创建 node
插入同步队列尾部、阻塞线程进行排队两个步骤,在进行这两个步骤之前都将进行自旋获取锁操作。
在创建 node
节点插插入队列前,如果当前持有的是独占锁,且 wtail
和 whead
为同一个节点,则自旋 SPINS
次尝试获取锁。自旋完成之后再进行队列初始化(如果还没初始化的话)、为当前线程创建 node
节点、检查 wtail
节点是否改变和 CAS 设置当前节点为 wtail
节点4个小步骤,在每个小步骤进行前都会先CAS进行一次获取锁尝试。
阻塞线程部分逻辑包含两个嵌套的循环,外部循环用于进行操作,内部循环用于自旋获取 CAS 尝试获取锁。在阻塞线程前,如果 node
的前驱节点(p) 为 whead
节点,则自旋 HEAD_SPINS
次进行 CAS 尝试获取锁,只要 node
的前驱节点(p) 为 whead
节点的条件满足,将重复进入自旋,每一次进入自旋都会将自旋次数增大一倍,直到自旋次数大于或等于 MAX_HEAD_SPINS
,条件不满足时如果 whead
的 cowait
栈不为空,将唤醒 cowait
栈。
完成以上操作后,再次判断 head
节点是否变化,当前节点的前驱节点是否变化。如果当前节点的前驱节点状态未设置,将状态设置为 WAITING
,状态为 CANCELLED
则将前驱节点移出队列。都校验通过后,计算阻塞时间是否超时,再次判断前驱节是否等待状态、当前是否持有锁,whead
和前驱节点是否改变,都检查通过后将阻塞线程。
线程被唤醒后判断是否是接受到中断信号,如果 interruptible
为 true
,接受到中断信号将取消当前节点,并返回 INTERRUPTED
中断标志。
private long acquireWrite(boolean interruptible, long deadline) {
// node为新增节点,p为尾节点(即将成为node的前置节点)
WNode node = null, p;
// 为当前线程创建node节点,并加入到同步队列尾部
// 如果当前持有的是独占锁,且wtail和whead为同一个节点,先自旋尝试直接获取锁
// 每执行一步操作前都判断当前是否持有锁,直到当前线程成功加入队列进行排队
for (int spins = -1;;) {
long m, s, ns;
// 低8为为0,表示还未持有锁,直接CAS修改state获取锁
if ((m = (s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns;
}
else if (spins < 0)
// 当前持有的是独占锁,且wtail和whead为同一个节点,则自旋SPINS次获取锁
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
// 自旋计数-1
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
// wtail为null表示队列还未初始化,进行初始化队列
else if ((p = wtail) == null) { // initialize queue
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
// 创建当前线程的node节点
else if (node == null)
node = new WNode(WMODE, p);
// wtail节点改变,为node重新指定前驱节点
else if (node.prev != p)
node.prev = p;
// 将当前线程的node节点设置为wtail
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
// 阻塞当前线程进行排队
for (int spins = -1;;) {
// h为头节点,np为新增节点的前置节点,pp为前前置节点,ps为前置节点的状态
WNode h, np, pp; int ps;
// p = node.prev,如果当前节点的前驱节点是whead节点
if ((h = whead) == p) {
// 设置自旋的值
if (spins < 0)
spins = HEAD_SPINS;
// 自旋次数小于MAX_HEAD_SPINS时自旋次数x2
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
// 自旋获取锁
for (int k = spins;;) { // spin at head
long s, ns;
// 当前未获取锁
if (((s = state) & ABITS) == 0L) {
// CAS修改state,修改成功表示获取锁成功
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) {
// 获取锁成功,将node设置为whead
whead = node;
node.prev = null;
return ns;
}
}
else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)
break;
}
}
// 头结点不为空
else if (h != null) { // help release stale waiters
WNode c; Thread w;
// 循环唤醒whead的cowait栈
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
// whead结点未改变
if (whead == h) {
// 前驱节点改变,为前驱节点重新指定next为当前节点
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
// 如果当前节点的前驱节点状态为0,将其前驱节点设置为等待状态
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
//如果当前节点的前驱节点状态为取消
else if (ps == CANCELLED) {
// 重新设置前驱节点,移除原前驱节点
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time; // 0 argument to park means no timeout
// 传入0,表示阻塞直到UnSafe.unpark唤醒
if (deadline == 0L)
time = 0L;
// 已经等待超时,取消当前等待节点
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
// 设置线程Thread的parkblocker属性,表示当前线程被谁阻塞,用于监控线程使用
U.putObject(wt, PARKBLOCKER, this);
// 将其当前线程设置为当前节点
node.thread = wt;
// 当前节点的前驱节点为等待状态,并且队列的头节点不是前驱节点或者当前状态为有锁状态
// 队列头节点和当前节点的前驱节点未改变,则阻塞当前线程
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
U.park(false, time); // emulate LockSupport.park
// 将当前node的线程设置为null
node.thread = null;
//当前线程的监控对象也置为空
U.putObject(wt, PARKBLOCKER, null);
//如果传入的参数interruptible为true,并且当前线程中断,取消当前节点
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
2.3 cancelWaiter 方法实现
cancelWaiter
方法如果节点非空,则强制取消状态并在可能的情况下将其从队列中解开并唤醒任何 cowaiters(节点或组,如适用),并且在任何情况下如果锁空闲,则有助于释放当前的第一个 waiter。 (使用空参数调用作为释放的条件形式,目前不需要,但在未来可能的取消政策下可能需要)。 这是 AbstractQueuedSynchronizer 中取消方法的一种变体(请参阅 AQS 内部文档中的详细说明)。
private long cancelWaiter(WNode node, WNode group, boolean interrupted) {
// node和group不为空
if (node != null && group != null) {
Thread w;
//将其当前节点的状态设置为取消状态
node.status = CANCELLED;
// 遍历group的cowait栈,清除CANCELLED节点
for (WNode p = group, q; (q = p.cowait) != null;) {
if (q.status == CANCELLED) {
U.compareAndSwapObject(p, WCOWAIT, q, q.cowait);
// 从头开始遍历
p = group; // restart
}
else
p = q;
}
// node和group相同
if (group == node) {
// 唤醒group的cowait栈
for (WNode r = group.cowait; r != null; r = r.cowait) {
if ((w = r.thread) != null)
U.unpark(w); // wake up uncancelled co-waiters
}
// 将当前取消接节点的前驱节点的下一个节点设置为当前取消节点的next节点
for (WNode pred = node.prev; pred != null; ) { // unsplice
WNode succ, pp; // find valid successor
// 如果当前取消节点的下一个节点为空或者是取消状态,从尾节点开始寻找有效的节点
// 并重新指定下一个节点
while ((succ = node.next) == null ||
succ.status == CANCELLED) {
WNode q = null; // find successor the slow way
// 从wtail向前遍历到node,找到离node最近一个未被取消的节点
for (WNode t = wtail; t != null && t != node; t = t.prev)
if (t.status != CANCELLED)
q = t; // don't link if succ cancelled
if (succ == q || // ensure accurate successor
// 修改node的后继节点为最近一个未被取消的节点
U.compareAndSwapObject(node, WNEXT,
succ, succ = q)) {
// 当前节点后继节点为空,且当前节点为wtail节点
if (succ == null && node == wtail)
// 将当前节点的前置节点设置为wtail
U.compareAndSwapObject(this, WTAIL, node, pred);
// 退出循环
break;
}
}
if (pred.next == node) // unsplice pred link
// 修改node前置节点的next为node后最近一个未被取消的节点
U.compareAndSwapObject(pred, WNEXT, node, succ);
// 后继节点存在,且在线程阻塞状态
if (succ != null && (w = succ.thread) != null) {
succ.thread = null;
// 唤醒后继节点
U.unpark(w); // wake up succ to observe new pred
}
// 当前节点的前驱节点未被取消,或者前驱接点的前驱节点为空,退出循环
if (pred.status != CANCELLED || (pp = pred.prev) == null)
break;
// 重新设置当前取消节点的前驱节点
node.prev = pp; // repeat if new pred wrong/cancelled
// 重新设置当前取消节点的前驱节点
U.compareAndSwapObject(pp, WNEXT, pred, succ);
// 将其前驱节点设置为pp,重新循环
pred = pp;
}
}
}
WNode h; // Possibly release first waiter
// 头节点不为空,尝试唤醒头结点的后继节点
while ((h = whead) != null) {
long s; WNode q; // similar to release() but check eligibility
// 头节点的下一节点为空或者是取消状态,从尾节点开始寻找有效的节点(包括等待状态,和运行状态)
if ((q = h.next) == null || q.status == CANCELLED) {
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
// 如果头节点没有改变
if (h == whead) {
// 头节点的下一有效节点不为空,并且头节点的状态为0,并且当前StampedLock的不为写锁状态
// 并且头节点的下一节点为读模式,唤醒头结点的下一节点
if (q != null && h.status == 0 &&
((s = state) & ABITS) != WBIT && // waiter is eligible
(s == 0L || q.mode == RMODE))
// 唤醒头结点的下一有效节点,下面会介绍release方法
release(h);
break;
}
}
//如果当前线程被中断或者传入进来的interrupted为true,直接返回中断标志位,否则返回0
return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L;
}
2.4 release 方法实现
release
方法用于唤醒传入节点的后继节点。方法传入一个节点 h
,如果传入节点的状态为 WAITING
首先修改传入节点的 state
状态为 0,如果传入节点的后继节点为空或者被取消,从后往前遍历找到该节点后继第一个未被取消的节点 q
,如果 q
在阻塞状态则进行唤醒。
private void release(WNode h) {
// 传入的节点不为空
if (h != null) {
WNode q; Thread w;
// 取消等待状态
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
// 传入节点不存在后继节点,或后继节点被取消
if ((q = h.next) == null || q.status == CANCELLED) {
// 从后往前查找到离h最近的一个未被取消的节点
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
// 后继节点存在且在阻塞中
if (q != null && (w = q.thread) != null)
// 唤醒后继节点
U.unpark(w);
}
}
三、释放独占锁
void unlockWrite(long stamp): 如果锁状态与给定的邮戳匹配,则释放独占锁。
释放锁前先判断传入的邮戳是否与当前锁状态相等,且当前为持有写锁的状态。通过锁状态增加 WBIT
使表示独占锁持有锁的位(第8位)进位到锁版本位,如果 state
值溢出则设置为 ORIGIN
。state
修改完成表示独占锁已经释放,如果同步队列已经初始化,并且 head 节点的状态不为0,则唤醒下一个节点。
public void unlockWrite(long stamp) {
WNode h;
// 确定传入的锁状态是正确的,且当前持有独占锁
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
// 增加锁版本的值,如果位溢出则重置为ORIGIN
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
// 队列已经初始化,并且还有head节点状态不为0
if ((h = whead) != null && h.status != 0)
// 唤醒它的下一个节点
release(h);
}
boolean tryUnlockWrite(): 如果持有独占锁则释放锁,不需要邮戳。 此方法对于错误后的恢复可能很有用。
相比于 unlockWrite
少了检查邮戳是否正确的步骤,其他流程相同。
public boolean tryUnlockWrite() {
long s; WNode h;
// 确定当前持有独占锁
if (((s = state) & WBIT) != 0L) {
// 增加锁版本的值,如果位溢出则重置为ORIGIN
state = (s += WBIT) == 0L ? ORIGIN : s;
// 队列已经初始化,并且还有head节点状态不为0
if ((h = whead) != null && h.status != 0)
// 唤醒它的下一个节点
release(h);
return true;
}
return false;
}
四、获取共享锁
4.1 获取锁的接口方法
long readLock(): 获取共享锁,如果该锁被另一个线程保持,则阻塞线程,直到拿到锁并返回邮戳。
如果当前没有在在排队等待的线程,且未持有独占锁,共享锁计数未溢出,则通过 CAS 获取共享锁,否则通过 acquireRead
方法获取锁。
如果当前有独占锁线程正在排队是无法立即获得共享锁的,只能在后面进行排队,为防止独占锁饥饿。
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
// 当前不存在等待节点、共享锁计数未溢出,或者未持有锁
return ((whead == wtail && (s & ABITS) < RFULL &&
// CAS修改state获取锁
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
// 当前有节点在等待或CAS获取失败,进入acquireRead方法获取锁
next : acquireRead(false, 0L));
}
long tryReadLock(): 尝试获取共享锁,非公平,不阻塞线程。如果获取锁成功则返回邮戳,否则返回 0。
如果当前持有独占锁则直接获取锁失败,如果共享锁计数未溢出直接通过 CAS 获取共享锁,否则通过 tryIncReaderOverflow
方法获取锁,这是一个循环的过程,因为有 CAS 失败导致获取锁失败的可能性,所以需要确定获取锁成功或者目前已经持有了独占锁。
即使当前有独占锁线程在排队,也是可以立即获取到共享锁的。
public long tryReadLock() {
for (;;) {
long s, m, next;
// 当前持有独占锁,直接获取锁失败
if ((m = (s = state) & ABITS) == WBIT)
return 0L;
// 共享锁计数未溢出,或者未持有锁
else if (m < RFULL) {
// CAS 获取锁
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
// 共享锁计数溢出,通过tryIncReaderOverflow方法获取锁,
// 并使用readerOverflow存储溢出的计数
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
}
tryReadLock(long time, TimeUnit unit) throws InterruptedException: 在给定的等待时间内尝试获取共享锁,获取成功则立即返回邮戳。超过等待时间获取失败或者接收到中断信号则返回 0。
当前未处于中断状态,判断当前是否持有独占锁,未持有独占锁则先进行一次获取锁尝试,获取锁失败计算等待时间,通过 acquireRead
方法进行锁获取。
public long tryReadLock(long time, TimeUnit unit)
throws InterruptedException {
long s, m, next, deadline;
// 时间转换为纳秒
long nanos = unit.toNanos(time);
// 当前处于未中断状态
if (!Thread.interrupted()) {
// 当前未持有独占锁
if ((m = (s = state) & ABITS) != WBIT) {
// 共享锁计数未溢出,或者未持有锁
if (m < RFULL) {
// CAS 直接获取锁
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
// 锁状态溢出,通过tryIncReaderOverflow方法获取锁
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
// 等待时间小于0直接返回获取锁失败
if (nanos <= 0L)
return 0L;
if ((deadline = System.nanoTime() + nanos) == 0L)
deadline = 1L;
// acquireRead进行定时获取锁,true表示响应中断
if ((next = acquireRead(true, deadline)) != INTERRUPTED)
return next;
}
throw new InterruptedException();
}
long readLockInterruptibly() throws InterruptedException: 调用后一直阻塞到获得共享锁,但是接受中断信号。
当未处于中断状态时通过 acquireRead
方法进行获取锁,如果发生中断则抛出异常。
public long readLockInterruptibly() throws InterruptedException {
long next;
// 当前未处于中断状态时通过acquireRead方法进行获取锁
if (!Thread.interrupted() &&
(next = acquireRead(true, 0L)) != INTERRUPTED)
return next;
throw new InterruptedException();
}
4.2 tryIncReaderOverflow 方法实现
tryIncReaderOverflow
方法首先尝试 CAS 将 state
值设置为 RBITS
确保其他持有共享锁的线程无法对 state
进行操作,修改成功表示获取锁成功,进行 readerOverflow
增加和 state
恢复,如果获取锁失败返回 0。
private long tryIncReaderOverflow(long s) {
// 确定当前state的共享锁计数已经饱和
if ((s & ABITS) == RFULL) {
// CAS修改state为RBITS
if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
// 共享锁溢出计数+1
++readerOverflow;
// 修改锁状态
state = s;
// 返回当前锁状态
return s;
}
}
else if ((LockSupport.nextSecondarySeed() &
OVERFLOW_YIELD_RATE) == 0)
Thread.yield();
// 获取锁失败
return 0L;
}
问:RFULL = RBITS - 1为什么?
前面也许会有疑问,为什么共享锁的最大计数 RFULL = RBITS - 1,在此处应该明白,是为了当共享锁计数溢出时,还可以使用 CAS 对
state
进行 +1 操作,因为修改state
成功表示获取锁成功。
问:++readerOverflow操作线程安全吗?
线程安全,因为此时
state
的共享锁计数等于RBITS
,此时没有其他任何一个线程可以进行readerOverflow
值的修改。
4.3 acquireRead 方法实现
private long acquireRead(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) {
WNode h;
// 如果头尾节点相等,让其自旋
if ((h = whead) == (p = wtail)) {
for (long m, s, ns;;) {
// 当前持有共享锁,并且持有计数没有溢出,或者未持有锁
if ((m = (s = state) & ABITS) < RFULL ?
// CAS修改state获取锁
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
// 当前持有共享锁,并且持有计数溢出,通过tryIncReaderOverflow获取锁
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
return ns;
// 当前持有独占锁
else if (m >= WBIT) {
// 计数自减操作
if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else {
// 自旋结束,判断头尾节点是否相等
if (spins == 0) {
WNode nh = whead, np = wtail;
// 如果头尾节点没有改变或者头尾节点不相等,退出自旋
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
// 赋值进行轮训
spins = SPINS;
}
}
}
}
// 如果尾节点为null,进行队列初始化
if (p == null) { // initialize queue
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
// 如果当前节点为空,创建当前节点
else if (node == null)
node = new WNode(RMODE, p);
// 头尾节点相等或者尾节点不是共享锁节点
else if (h == p || p.mode != RMODE) {
// 如果当前节点的前驱节点不是尾节点,重新设置当前节点的前驱节点
if (node.prev != p)
node.prev = p;
// 将当前节点加入队列中,并且当前节点做为尾节点,如果成功退出循环
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
// 将当前节点加入尾节点的cowait栈
else if (!U.compareAndSwapObject(p, WCOWAIT,
node.cowait = p.cowait, node))
node.cowait = null;
else {
for (;;) {
WNode pp, c; Thread w;
// 如果头节点的cowait栈不为空,并且其线程不为null,将cowait栈第一个节点出栈并唤醒
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
U.unpark(w);
// 如果当前头节点为尾节点的前驱节点,或者头尾接地那相同,或者尾节点前驱节点为空
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
// 判断当前state是否未持有锁,或处于共享锁状态,且没有溢出
if ((m = (s = state) & ABITS) < RFULL ?
// CAS修改state获取锁
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
// 当前持有共享锁,并且持有计数溢出,通过tryIncReaderOverflow获取锁
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
// 当前未持有独占锁,进入循环
} while (m < WBIT);
}
// 如果头结点没有改变,并且尾节点的前驱节点没有改变
if (whead == h && p.prev == pp) {
long time;
//如果前前节点为空,或者头节点为前置节点,或者前置节点已经取消
// 外部循环重新开始创建node节点
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
// 未设置超时时间
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
// 已经超时了,取消当前节点
return cancelWaiter(node, p, false);
// 当期线程
Thread wt = Thread.currentThread();
// 设置线程Thread的parkblocker属性,表示当前线程被谁阻塞,用于监控线程使用
U.putObject(wt, PARKBLOCKER, this);
// 将其当前线程设置为当前节点
node.thread = wt;
// 检查之前的条件
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
// 阻塞当前线程
U.park(false, time);
// 当前节点的线程置空
node.thread = null;
// 当前线程的监控对象也置为空
U.putObject(wt, PARKBLOCKER, null);
// 如果interruptible为true,线程中断则取消当前节点
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
}
}
// 阻塞当前线程,在阻塞当前线程之前,如果头节点和尾节点相等,让其自旋一段时间获取写锁。
// 如果头结点不为空,释放头节点的cowait队列
for (int spins = -1;;) {
WNode h, np, pp; int ps;
// 如果头节点和尾节点相等
if ((h = whead) == p) {
// 设置自旋的初始值
if (spins < 0)
spins = HEAD_SPINS;
// 每次进入自旋,自旋次数*2,直到大于等于MAX_HEAD_SPINS
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
// 自旋获取读锁
for (int k = spins;;) { // spin at head
long m, s, ns;
// 当前未持有锁或者持有共享锁且持有计数未溢出
if ((m = (s = state) & ABITS) < RFULL ?
// CAS修改state获取锁
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
// 通过tryIncReaderOverflow方法获取锁
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
WNode c; Thread w;
// 将当前节点设置为whead
whead = node;
// 前置节点设置为空
node.prev = null;
// 唤醒node节点的cowait栈
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
return ns;
}
// 当前持有写锁,自旋次数-1
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
break;
}
}
// 如果whead不为空
else if (h != null) {
WNode c; Thread w;
// whead的cowait栈不为空
while ((c = h.cowait) != null) {
// 唤醒node节点的cowait栈
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
// 如果头结点没有改变
if (whead == h) {
// 当前节点的前置节点改变,更新前置节点的next
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
// 当前节点的前置节点状态为0,更新为WAITING
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
// 当前节点的前置节点已被取消,从同步队列中删除
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time;
if (deadline == 0L)
time = 0L;
// 已经超时,取消当前节点
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
// 将其当前线程的监控对象设置为当前StampedLock,监控此线程被那个对象阻塞
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
// 如果当前节点的前驱节点为等待状态,并且头尾节点不相等或者当前状态为独占锁状态
// 并且头结点不变,当前节点的前驱节点不变
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
//调用UnSafe的park方法阻塞当前线程
U.park(false, time);
// 将当前节点对应的线程置为空
node.thread = null;
// 将当前线程的监控对象置为空
U.putObject(wt, PARKBLOCKER, null);
// 如果传入进来的参数interruptible为true,并且当前线程被中断
if (interruptible && Thread.interrupted())
// 取消当前节点
return cancelWaiter(node, node, true);
}
}
}
}
五、释放共享锁
5.1 释放锁的接口方法
void unlockRead(long stamp): 如果锁状态与给定的邮戳匹配,则释放共享锁。
先判断传入邮戳的锁版本和当前 state
表示的版本是否一致,并确定当前邮戳和 state
为持有锁的状态,并且持有的是锁不是独占锁,否则将抛出 IllegalMonitorStateException
异常。
如果当前共享锁计数未溢出,则直接通过 CAS
修改 state
释放锁,否则通过 tryDecReaderOverflow
进行释放锁操作。释放锁之后,如果当前释放的是最后一个共享锁,且有同步队列包含节点,则进行节点唤醒。因为共享锁可能包含多个锁同时修改 state
,CAS 存在失败的可能,所以需要循环的去获取,直到获取成功或者确定无法获取锁,并抛出 IllegalMonitorStateException
异常。
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
// 传入邮戳和当前state的版本不一致
if (((s = state) & SBITS) != (stamp & SBITS) ||
// 邮戳为未持有锁状态,或当前状态为未持有锁或持有的是独占锁
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
// 共享锁状态未溢出
if (m < RFULL) {
// CAS修改state
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
// 如果当前释放的是最后一个共享锁,且whead节点不为空,状态不为0
if (m == RUNIT && (h = whead) != null && h.status != 0)
// 唤醒它的下一个节点
release(h);
break;
}
}
// 共享锁计数溢出,通过tryDecReaderOverflow方法释放
else if (tryDecReaderOverflow(s) != 0L)
break;
}
}
boolean tryUnlockRead(): 如果持有共享锁则进行一次锁释放,不需要邮戳。 此方法对于错误后的恢复可能很有用。
先判断当前 state
是否为持有共享锁的状态,未持有共享锁则直接返回 false
,如果持有共享锁进行锁释放。释放锁分共享锁计数是否溢出两种状况,未溢出时直接通过 CAS 修改 state
进行锁释放,如果计数溢出则通过 tryDecReaderOverflow
进行释放锁操作。释放锁之后,如果当前释放的是最后一个共享锁,且有同步队列包含节点,则进行节点唤醒。同理,CAS 可能失败,需要循环进行锁释放。
public boolean tryUnlockRead() {
long s, m; WNode h;
// 当前state为持有锁状态,并且持有的不是独占锁
while ((m = (s = state) & ABITS) != 0L && m < WBIT) {
// 共享锁计数未溢出
if (m < RFULL) {
// CAS修改state获取锁
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
// 如果当前释放的是最后一个共享锁,且whead节点不为空,状态不为0
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
// 唤醒它的下一个节点
return true;
}
}
// 共享锁计数溢出,通过tryDecReaderOverflow方法释放
else if (tryDecReaderOverflow(s) != 0L)
return true;
}
return false;
}
5.2 tryDecReaderOverflow 方法实现
tryDecReaderOverflow
方法首先尝试 CAS 将 state
值设置为 RBITS
来确保其他持有共享锁的线程无法对 state
进行操作,修改成功表示获取锁成功,根据溢出值是否大于 0 对 readerOverflow
或 state
的共享锁计数 -1,如果获取锁失败返回 0。
private long tryDecReaderOverflow(long s) {
// assert (s & ABITS) >= RFULL;
// 当前共享锁计数为溢出状态
if ((s & ABITS) == RFULL) {
// 将共享锁计数修改为 RBITS,即计数+1
if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
int r; long next;
// 共享锁计数溢出值大于0
if ((r = readerOverflow) > 0) {
// 溢出值-1
readerOverflow = r - 1;
next = s;
}
else
// state值-1
next = s - RUNIT;
state = next;
return next;
}
}
else if ((LockSupport.nextSecondarySeed() &
OVERFLOW_YIELD_RATE) == 0)
Thread.yield();
return 0L;
}
六、释放共享或独占锁
void unlock(long stamp): 如果锁状态与给定的邮戳匹配,则释放锁的相应模式。
如果当前持有的是共享锁,存在 CAS 失败的可能,所以需要循环进行锁释放,除非当前锁版本和邮戳的锁版本不同,或者当前未持有锁。检查通过先判断当前 state
持有的是否是独占锁,如果持有独占锁则直接修改 state
进行锁释放,否则再次对传入的邮戳进行检查。释放共享锁时区分溢出和未溢出,如果未溢出则通过 CAS 进行锁释放,如果溢出了通过 tryDecReaderOverflow
方法进行锁释放。
public void unlock(long stamp) {
long a = stamp & ABITS, m, s; WNode h;
// 当前锁版本和邮戳锁版本相同
while (((s = state) & SBITS) == (stamp & SBITS)) {
// 当前未持有锁
if ((m = s & ABITS) == 0L)
break;
// 当前持有独占锁,进行独占锁释放
else if (m == WBIT) {
if (a != m)
break;
// 直接修改state,如果溢出则重置为ORIGIN
// 只有一个线程可以持有独占锁,所以直接修改时线程安全的
state = (s += WBIT) == 0L ? ORIGIN : s;
// whead节点不为空,状态不为0
if ((h = whead) != null && h.status != 0)
// 唤醒whead后续的节点
release(h);
return;
}
// 传入邮戳错误,为未持有锁或者同时持有共享锁和独占锁的状态
else if (a == 0L || a >= WBIT)
break;
// 当前state表示共享锁计数未溢出,或者未持有锁
else if (m < RFULL) {
// CAS修改state释放锁
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
// 如果当前释放的是最后一个共享锁,且whead节点不为空,状态不为0
if (m == RUNIT && (h = whead) != null && h.status != 0)
// 唤醒whead后续的节点
release(h);
return;
}
}
// 排除以上情况,剩余情况是当前持有共享锁但是共享锁计数已经溢出
else if (tryDecReaderOverflow(s) != 0L)
return;
}
// 释放锁失败,抛出异常
throw new IllegalMonitorStateException();
}
七、乐观锁实现
long tryOptimisticRead(): 如果当前未持有独占锁则返回当前锁版本作为邮戳,用于在以后验证状态,如果已持有独占锁则返回0,也就表示获取乐观锁失败,否则就返回 state
前 57 位的值,作为锁版本。只要获取或者释放了独占锁,都会导致锁版本的改变。
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
boolean validate(long stamp): 如果自给定邮戳获取后未获取过独占锁(独占锁状态码未改变),则返回 true。 如果状态为 0,则始终返回 false。
public boolean validate(long stamp) {
// 定义内存屏障,避免代码重排序
U.loadFence();
// 比较邮储与state的锁版本是否相同
return (stamp & SBITS) == (state & SBITS);
}
八、锁升降级
long tryConvertToWriteLock(long stamp): 验证当前锁版本和锁持有状态和给定的邮戳是否匹配,如果不匹配、邮戳的锁状态有误或当前持有多个共享锁则返回 0。匹配时则分三种情况,当前未持有锁则获取独占锁,当前持有独占锁则不进行操作,当前仅持有一个共享锁则释放共享锁获取独占锁,最终返回独占锁的邮戳。
public long tryConvertToWriteLock(long stamp) {
long a = stamp & ABITS, m, s, next;
// 当前锁版本和邮戳锁版本相同
while (((s = state) & SBITS) == (stamp & SBITS)) {
// 当前未持有锁
if ((m = s & ABITS) == 0L) {
// 邮戳为持有锁的状态
if (a != 0L)
break;
// CAS修改state获取独占锁
if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
return next;
}
// 当前已经持有独占锁
else if (m == WBIT) {
// 传入邮戳的锁状态有错
if (a != m)
break;
// 直接返回邮戳
return stamp;
}
// 当前仅持有一个共享锁
else if (m == RUNIT && a != 0L) {
// CAS修改state释放共享锁获取独占锁
if (U.compareAndSwapLong(this, STATE, s,
next = s - RUNIT + WBIT))
return next;
}
// 否则则失败
else
break;
}
return 0L;
}
long tryConvertToReadLock(long stamp): 验证当前锁版本和锁持有状态和给定的邮戳是否匹配,如果不匹配或者邮戳的锁状态有误则返回 0。匹配时则分三种情况,当前未持有锁则获取共享锁,当前持有独占锁则释放独占锁获取共享锁,当前持有共享锁则不进行操作,最终返回共享锁的邮戳。
public long tryConvertToReadLock(long stamp) {
long a = stamp & ABITS, m, s, next; WNode h;
// 当前锁版本和邮戳锁版本相同
while (((s = state) & SBITS) == (stamp & SBITS)) {
// 当前未持有锁
if ((m = s & ABITS) == 0L) {
// 邮戳为持有锁的状态
if (a != 0L)
break;
// 当前state表示共享锁计数未溢出,或者未持有锁,CAS修改state获取锁
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
// 当前共享锁计数溢出,通过tryIncReaderOverflow获取锁
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
// 当前持有独占锁
else if (m == WBIT) {
// 传入邮戳的锁状态有错
if (a != m)
break;
// 持有独占锁只有一个线程,直接修改state释放独占锁获取共享锁
state = next = s + (WBIT + RUNIT);
// 独占锁被释放,如果有排队线程则进行唤醒
if ((h = whead) != null && h.status != 0)
// 唤醒线程
release(h);
return next;
}
// 当前邮戳为持有共享锁的邮戳,不做操作直接返回邮戳
else if (a != 0L && a < WBIT)
return stamp;
else
break;
}
return 0L;
}
long tryConvertToOptimisticRead(long stamp): 验证当前锁版本和锁持有状态和给定的邮戳是否匹配,如果匹配则进行一次锁释放,如果不匹配或者邮戳的锁状态有误则返回 0。该方法的逻辑和 unlock
方法的逻辑相似,如果当前未持有锁就直接返回锁版本,如果持有锁则进行一次锁释放,再返回锁版本。
其实这个接口基本上等于,unlock + tryOptimisticRead 两个方法结合,个人认为这个接口用处不大。释放锁成功,返回了当前锁版本,如果有独占锁在排队,将唤醒该线程获取锁,导致锁版本变化,最终乐观锁也是无法通过校验的。
如果在
release
唤醒时可以判断一下排队的是否是独占锁,如果是独占锁返回 0 使用上可能会有更好的效果,但是这样也许不是本意想要的,毕竟锁实实在在是释放成功了,还返回 0 就不符合锁降级的规则了。
public long tryConvertToOptimisticRead(long stamp) {
long a = stamp & ABITS, m, s, next; WNode h;
// 定义内存屏障,避免代码重排序
U.loadFence();
for (;;) {
// 当前锁版本和邮戳锁版本相同
if (((s = state) & SBITS) != (stamp & SBITS))
break;
// 当前state未持有锁
if ((m = s & ABITS) == 0L) {
// 邮戳为持有锁的状态
if (a != 0L)
break;
// 直接返回当前锁版本作为乐观锁邮戳
return s;
}
// 当前state持有写锁
else if (m == WBIT) {
// 邮戳为未持有锁的状态
if (a != m)
break;
// 直接修改state释放独占锁
state = next = (s += WBIT) == 0L ? ORIGIN : s;
// whead节点不为空,状态不为0
if ((h = whead) != null && h.status != 0)
// 唤醒whead后续的节点
release(h);
// 返回释放锁后的state作为乐观锁邮戳
// 共享锁持有计数
return next;
}
// 邮戳为未持有锁状态,或者邮戳状态错误
else if (a == 0L || a >= WBIT)
break;
// 当前state表示共享锁计数未溢出,或者未持有锁
else if (m < RFULL) {
// CAS释放锁
if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) {
// 如果当前释放的是最后一个共享锁,且whead节点不为空,状态不为0
if (m == RUNIT && (h = whead) != null && h.status != 0)
// 唤醒whead后续的节点
release(h);
// 重新计算锁版本
return next & SBITS;
}
}
// 共享锁计数溢出,通过tryDecReaderOverflow方法释放锁
else if ((next = tryDecReaderOverflow(s)) != 0L)
return next & SBITS;
}
return 0L;
}
九、LOCK视图
Lock asReadLock(): 返回此 StampedLock
的普通 Lock
视图,其中 Lock.lock
方法映射到 readLock
,其他方法也类似。 返回的 Lock
不支持 Condition
; 方法 Lock.newCondition()
抛出 UnsupportedOperationException
。
public Lock asReadLock() {
ReadLockView v;
return ((v = readLockView) != null ? v :
(readLockView = new ReadLockView()));
}
// view classes
final class ReadLockView implements Lock {
public void lock() { readLock(); }
public void lockInterruptibly() throws InterruptedException {
readLockInterruptibly();
}
public boolean tryLock() { return tryReadLock() != 0L; }
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return tryReadLock(time, unit) != 0L;
}
// unstampedUnlockRead逻辑和tryUnlockRead相似,只是释放锁失败抛出异常
public void unlock() { unstampedUnlockRead(); }
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
Lock asWriteLock(): 返回此 StampedLock
的普通 Lock
视图,其中 Lock.lock
方法映射到 writeLock
,其他方法也类似。 返回的 Lock
不支持 Condition
; 方法 Lock.newCondition()
抛出 UnsupportedOperationException
。
public Lock asWriteLock() {
WriteLockView v;
return ((v = writeLockView) != null ? v :
(writeLockView = new WriteLockView()));
}
final class WriteLockView implements Lock {
public void lock() { writeLock(); }
public void lockInterruptibly() throws InterruptedException {
writeLockInterruptibly();
}
public boolean tryLock() { return tryWriteLock() != 0L; }
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return tryWriteLock(time, unit) != 0L;
}
// unstampedUnlockWrite逻辑和tryUnlockWrite相似,只是释放锁失败抛出异常
public void unlock() { unstampedUnlockWrite(); }
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
ReadWriteLock asReadWriteLock(): 返回此 StampedLock
的 ReadWriteLock
视图,其中 ReadWriteLock.readLock()
方法映射到 asReadLock()
, ReadWriteLock.writeLock()
映射到 asWriteLock()
。
public ReadWriteLock asReadWriteLock() {
ReadWriteLockView v;
return ((v = readWriteLockView) != null ? v :
(readWriteLockView = new ReadWriteLockView()));
}
final class ReadWriteLockView implements ReadWriteLock {
public Lock readLock() { return asReadLock(); }
public Lock writeLock() { return asWriteLock(); }
}