Java 锁系列(七)——StampedLock源码分析

Java 锁系列(七)——StampedLock源码分析

一、StampedLock概述

StampedLock 是读写锁的实现,对比 ReentrantReadWriteLock 主要不同是该锁不允许重入,多了乐观读的功能,使用上会更加复杂一些,且仅支持非公平锁,但是具有更好的性能表现。StampedLock 的状态由版本和模式组成。 获取锁方法返回一个邮戳,表示和控制与锁状态相关的访问; 这些方法的“尝试”邮戳可能会返回特殊值 0 来表示获取锁失败。 锁释放和转换方法需要标记作为参数,如果它们与锁的状态不匹配则失败。本文对 StampedLock 的实现源码进行分析。

1.1 属性分析

state 属性的作用类似于 ReentrantLockReentrantWriteReadLock 锁的 state 属性,在 StampedLock 中按位将 state 分为了3个区域,其中低7位为共享锁的持有计数,第8位为独占锁的持有计数,高57位(包含独占锁持有计数位)表示当前锁的版本状态,用于乐观锁的校验。state 作为获取锁时响应的邮戳。state 是一个线程安全的属性,成功对该值进行修改则意味着获取锁的成功,类中的常量大都用于辅助对 state 进行位运算。

PS:整个 state 都作为获取锁时的邮戳返回,但是只有前 57 位作为锁版本,共享锁持有计数不参与锁版本计算,因为共享锁的计数对数据不会有影响。

// cpu核心数
private static final int NCPU = Runtime.getRuntime().availableProcessors();
// 在当前节点加入队列前,如果当前持有的是独占锁且队列头尾节点(whead和wtail)相等,先让其自旋一定的次数
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
// 在阻塞线程前,如果队列头尾节点(whead和wtail)相等,先让其自旋一定的次数
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
// 阻塞线程前,最大的自旋次数
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
/** The period for yielding when waiting for overflow spinlock */
private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
// 用于共享锁持有计数的位数,读锁占低7位
private static final int LG_READERS = 7;
// 共享锁获取时增加的持有计数
private static final long RUNIT = 1L;
// 独占锁持有计数的掩码,第7位,不允许重入,独占锁计数最大只能为1
private static final long WBIT  = 1L << LG_READERS;
// 共享锁持有计数的掩码
private static final long RBITS = WBIT - 1L;
//共享锁持有计数最大值,大于等于当前值进行溢出处理
private static final long RFULL = RBITS - 1L;
// 共享和独占锁持有计数的掩码,用于进行与操作判断是否持有锁
private static final long ABITS = RBITS | WBIT;
// 锁版本的掩码(包括了独占锁计数位)
private static final long SBITS = ~RBITS; // note overlap with ABITS
// state 的初始值,防止初始tryOptimisticRead的值为0
private static final long ORIGIN = WBIT << 1;

// 中断信号
private static final long INTERRUPTED = 1L;
// 节点等待状态
private static final int WAITING   = -1;
// 节点取消状态
private static final int CANCELLED =  1;
// 共享锁模式
private static final int RMODE = 0;
// 独占锁模式
private static final int WMODE = 1;
// 队列节点实体类
static final class WNode {
    volatile WNode prev;
    volatile WNode next;
    volatile WNode cowait;    // list of linked readers
    volatile Thread thread;   // non-null while possibly parked
    volatile int status;      // 0, WAITING, or CANCELLED
    final int mode;           // RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}
// 队列头结点
private transient volatile WNode whead;
// 队列尾节点
private transient volatile WNode wtail;
// views
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;
// 锁状态
private transient volatile long state;
// 当共享锁持有计数达到饱和状态时,额外的共享锁计数
private transient int readerOverflow;

1.2 基础方法

根据前面参数分析结果,state 分段存储锁的状态信息,据此我们可以理解以下基础方法的逻辑。

boolean isWriteLocked(): 如果当前持有独占锁,则返回true 。

通过和 WBIT 进行与操作获取共享锁的持有计数,计数不为 0 表示当前持有独占锁。

public boolean isWriteLocked() {
    return (state & WBIT) != 0L;
}

boolean isReadLocked(): 如果当前持有共享锁,则返回true 。

通过和 RBITS 进行与操作获取共享锁的持有计数,计数不为 0 表示当前持有共享锁。

public boolean isReadLocked() {
    return (state & RBITS) != 0L;
}

int getReadLockCount(): 查询当前持有的共享锁计数。 该方法设计用于监视系统状态,而不是用于同步控制。

通过和 RBITS 进行与操作获取共享锁的持有计数,如果值大于或等于 RFULL 表示当前锁计数可能已经溢出了,需要和 readerOverflow 相加计算总的共享锁持有计数。

private int getReadLockCount(long s) {
    long readers;
    // 共享锁计数大于或等于最大读计数RFULL
    if ((readers = s & RBITS) >= RFULL)
        // 读计数加上溢出的值
        readers = RFULL + readerOverflow;
    return (int) readers;
}

二、获取独占锁

2.1 获取锁的接口方法

long writeLock(): 获取独占锁,如果该锁被另一个线程保持,则阻塞线程,直到拿到锁并返回邮戳。

writeLock 方法中先判断了当前锁是否已被持有,如果未被持有则 CAS 直接修改 state 的值,修改成功表示获取锁成功,返回当前 state 的值作为当前锁版本的邮戳。如果当前已经持有锁,或者CAS获取锁失败,则进入 acquireWrite 方法进行锁获取。

public long writeLock() {
    long s, next;  // bypass acquireWrite in fully unlocked case only
    // 当前未持有锁
    return ((((s = state) & ABITS) == 0L &&
             // CAS修改state获取锁
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            // 当前持有锁或CAS获取失败,进入acquireWrite方法获取锁
            next : acquireWrite(false, 0L));
}

long tryWriteLock(): 尝试获取独占锁,非公平,不阻塞线程。如果获取锁成功则返回邮戳,否则返回 0。

如果当前未持有任何锁,进行 CAS 获取锁,否则直接返回获取锁失败。

public long tryWriteLock() {
    long s, next;
    // 当前未持有锁
    return ((((s = state) & ABITS) == 0L &&
             // CAS修改state获取锁
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : 0L);
}

long tryWriteLock(long time, TimeUnit unit) throws InterruptedException: 在给定的等待时间内尝试获取独占锁,获取成功则立即返回邮戳。超过等待时间获取失败或者接收到中断信号则返回 0。

当前未处于中断状态,先通过 tryWriteLock 方法进行一次获取锁尝试,获取锁失败计算等待时间,通过 acquireWrite 方法进行锁获取。

public long tryWriteLock(long time, TimeUnit unit)
    throws InterruptedException {
    // 时间转换为纳秒
    long nanos = unit.toNanos(time);
    // 当前未处于中断状态
    if (!Thread.interrupted()) {
        long next, deadline;
        // 先尝试直接获取锁
        if ((next = tryWriteLock()) != 0L)
            return next;
        // 等待时间小于0直接返回获取锁失败
        if (nanos <= 0L)
            return 0L;
        if ((deadline = System.nanoTime() + nanos) == 0L)
            deadline = 1L;
        // acquireWrite 进行定时获取锁,true表示响应中断
        if ((next = acquireWrite(true, deadline)) != INTERRUPTED)
            return next;
    }
    throw new InterruptedException();
}

long writeLockInterruptibly() throws InterruptedException: 调用后一直阻塞到获得独占锁,但是接受中断信号。

当未处于中断状态时通过 acquireWrite 方法进行获取锁,如果发生中断则抛出异常。

public long writeLockInterruptibly() throws InterruptedException {
    long next;
    // 当前未处于中断状态时通过acquireWrite方法进行获取锁
    if (!Thread.interrupted() &&
        (next = acquireWrite(true, 0L)) != INTERRUPTED)
        return next;
    throw new InterruptedException();
}

2.2 acquireWrite 方法实现

acquireWrite 方法用于获取独占锁,主要包含创建 node 插入同步队列尾部、阻塞线程进行排队两个步骤,在进行这两个步骤之前都将进行自旋获取锁操作。

在创建 node 节点插插入队列前,如果当前持有的是独占锁,且 wtailwhead 为同一个节点,则自旋 SPINS 次尝试获取锁。自旋完成之后再进行队列初始化(如果还没初始化的话)、为当前线程创建 node 节点、检查 wtail 节点是否改变和 CAS 设置当前节点为 wtail 节点4个小步骤,在每个小步骤进行前都会先CAS进行一次获取锁尝试。

阻塞线程部分逻辑包含两个嵌套的循环,外部循环用于进行操作,内部循环用于自旋获取 CAS 尝试获取锁。在阻塞线程前,如果 node 的前驱节点(p) 为 whead 节点,则自旋 HEAD_SPINS 次进行 CAS 尝试获取锁,只要 node 的前驱节点(p) 为 whead 节点的条件满足,将重复进入自旋,每一次进入自旋都会将自旋次数增大一倍,直到自旋次数大于或等于 MAX_HEAD_SPINS,条件不满足时如果 wheadcowait 栈不为空,将唤醒 cowait 栈。

完成以上操作后,再次判断 head 节点是否变化,当前节点的前驱节点是否变化。如果当前节点的前驱节点状态未设置,将状态设置为 WAITING,状态为 CANCELLED 则将前驱节点移出队列。都校验通过后,计算阻塞时间是否超时,再次判断前驱节是否等待状态、当前是否持有锁,whead 和前驱节点是否改变,都检查通过后将阻塞线程。

线程被唤醒后判断是否是接受到中断信号,如果 interruptibletrue,接受到中断信号将取消当前节点,并返回 INTERRUPTED 中断标志。

private long acquireWrite(boolean interruptible, long deadline) {
    // node为新增节点,p为尾节点(即将成为node的前置节点)
    WNode node = null, p;
    // 为当前线程创建node节点,并加入到同步队列尾部
    // 如果当前持有的是独占锁,且wtail和whead为同一个节点,先自旋尝试直接获取锁
    // 每执行一步操作前都判断当前是否持有锁,直到当前线程成功加入队列进行排队
    for (int spins = -1;;) {
        long m, s, ns;
        // 低8为为0,表示还未持有锁,直接CAS修改state获取锁
        if ((m = (s = state) & ABITS) == 0L) {
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                return ns;
        }
        else if (spins < 0)
            // 当前持有的是独占锁,且wtail和whead为同一个节点,则自旋SPINS次获取锁
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            // 自旋计数-1
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        }
        // wtail为null表示队列还未初始化,进行初始化队列
        else if ((p = wtail) == null) { // initialize queue
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        // 创建当前线程的node节点
        else if (node == null)
            node = new WNode(WMODE, p);
        // wtail节点改变,为node重新指定前驱节点
        else if (node.prev != p)
            node.prev = p;
        // 将当前线程的node节点设置为wtail
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
            p.next = node;
            break;
        }
    }
    // 阻塞当前线程进行排队
    for (int spins = -1;;) {
        // h为头节点,np为新增节点的前置节点,pp为前前置节点,ps为前置节点的状态
        WNode h, np, pp; int ps;
        // p = node.prev,如果当前节点的前驱节点是whead节点
        if ((h = whead) == p) {
            // 设置自旋的值
            if (spins < 0)
                spins = HEAD_SPINS;
            // 自旋次数小于MAX_HEAD_SPINS时自旋次数x2
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            // 自旋获取锁
            for (int k = spins;;) { // spin at head
                long s, ns;
                // 当前未获取锁
                if (((s = state) & ABITS) == 0L) {
                    // CAS修改state,修改成功表示获取锁成功
                    if (U.compareAndSwapLong(this, STATE, s,
                                             ns = s + WBIT)) {
                        // 获取锁成功,将node设置为whead
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                }
                else if (LockSupport.nextSecondarySeed() >= 0 &&
                         --k <= 0)
                    break;
            }
        }
        // 头结点不为空
        else if (h != null) { // help release stale waiters
            WNode c; Thread w;
            // 循环唤醒whead的cowait栈
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        // whead结点未改变
        if (whead == h) {
        	// 前驱节点改变,为前驱节点重新指定next为当前节点
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            // 如果当前节点的前驱节点状态为0,将其前驱节点设置为等待状态
            else if ((ps = p.status) == 0)
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            //如果当前节点的前驱节点状态为取消 
            else if (ps == CANCELLED) {
                // 重新设置前驱节点,移除原前驱节点
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                long time; // 0 argument to park means no timeout
                // 传入0,表示阻塞直到UnSafe.unpark唤醒
                if (deadline == 0L)
                    time = 0L;
                // 已经等待超时,取消当前等待节点
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                // 设置线程Thread的parkblocker属性,表示当前线程被谁阻塞,用于监控线程使用
                U.putObject(wt, PARKBLOCKER, this);
                // 将其当前线程设置为当前节点
                node.thread = wt;
                // 当前节点的前驱节点为等待状态,并且队列的头节点不是前驱节点或者当前状态为有锁状态
                // 队列头节点和当前节点的前驱节点未改变,则阻塞当前线程
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                    whead == h && node.prev == p)
                    U.park(false, time);  // emulate LockSupport.park
                // 将当前node的线程设置为null
                node.thread = null;
                //当前线程的监控对象也置为空
                U.putObject(wt, PARKBLOCKER, null);
                //如果传入的参数interruptible为true,并且当前线程中断,取消当前节点
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

2.3 cancelWaiter 方法实现

cancelWaiter 方法如果节点非空,则强制取消状态并在可能的情况下将其从队列中解开并唤醒任何 cowaiters(节点或组,如适用),并且在任何情况下如果锁空闲,则有助于释放当前的第一个 waiter。 (使用空参数调用作为释放的条件形式,目前不需要,但在未来可能的取消政策下可能需要)。 这是 AbstractQueuedSynchronizer 中取消方法的一种变体(请参阅 AQS 内部文档中的详细说明)。

private long cancelWaiter(WNode node, WNode group, boolean interrupted) {
    // node和group不为空
    if (node != null && group != null) {
        Thread w;
        //将其当前节点的状态设置为取消状态
        node.status = CANCELLED;
        // 遍历group的cowait栈,清除CANCELLED节点
        for (WNode p = group, q; (q = p.cowait) != null;) {
            if (q.status == CANCELLED) {
                U.compareAndSwapObject(p, WCOWAIT, q, q.cowait);
                // 从头开始遍历
                p = group; // restart
            }
            else
                p = q;
        }
        // node和group相同
        if (group == node) {
            // 唤醒group的cowait栈
            for (WNode r = group.cowait; r != null; r = r.cowait) {
                if ((w = r.thread) != null)
                    U.unpark(w);       // wake up uncancelled co-waiters
            }
            // 将当前取消接节点的前驱节点的下一个节点设置为当前取消节点的next节点
            for (WNode pred = node.prev; pred != null; ) { // unsplice
                WNode succ, pp;        // find valid successor
                // 如果当前取消节点的下一个节点为空或者是取消状态,从尾节点开始寻找有效的节点
                // 并重新指定下一个节点
                while ((succ = node.next) == null ||
                       succ.status == CANCELLED) {
                    WNode q = null;    // find successor the slow way
                    // 从wtail向前遍历到node,找到离node最近一个未被取消的节点
                    for (WNode t = wtail; t != null && t != node; t = t.prev)
                        if (t.status != CANCELLED)
                            q = t;     // don't link if succ cancelled
                    if (succ == q ||   // ensure accurate successor
                        // 修改node的后继节点为最近一个未被取消的节点
                        U.compareAndSwapObject(node, WNEXT,
                                               succ, succ = q)) {
                        // 当前节点后继节点为空,且当前节点为wtail节点
                        if (succ == null && node == wtail)
                            // 将当前节点的前置节点设置为wtail
                            U.compareAndSwapObject(this, WTAIL, node, pred);
                        // 退出循环
                        break;
                    }
                }
                if (pred.next == node) // unsplice pred link
                    // 修改node前置节点的next为node后最近一个未被取消的节点
                    U.compareAndSwapObject(pred, WNEXT, node, succ);
                // 后继节点存在,且在线程阻塞状态
                if (succ != null && (w = succ.thread) != null) {
                    succ.thread = null;
                    // 唤醒后继节点
                    U.unpark(w);       // wake up succ to observe new pred
                }
                // 当前节点的前驱节点未被取消,或者前驱接点的前驱节点为空,退出循环
                if (pred.status != CANCELLED || (pp = pred.prev) == null)
                    break;
                // 重新设置当前取消节点的前驱节点
                node.prev = pp;        // repeat if new pred wrong/cancelled
                // 重新设置当前取消节点的前驱节点
                U.compareAndSwapObject(pp, WNEXT, pred, succ);
                // 将其前驱节点设置为pp,重新循环
                pred = pp;
            }
        }
    }
    WNode h; // Possibly release first waiter
    // 头节点不为空,尝试唤醒头结点的后继节点
    while ((h = whead) != null) {
        long s; WNode q; // similar to release() but check eligibility
        // 头节点的下一节点为空或者是取消状态,从尾节点开始寻找有效的节点(包括等待状态,和运行状态)
        if ((q = h.next) == null || q.status == CANCELLED) {
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 如果头节点没有改变
        if (h == whead) {
            // 头节点的下一有效节点不为空,并且头节点的状态为0,并且当前StampedLock的不为写锁状态
            // 并且头节点的下一节点为读模式,唤醒头结点的下一节点
            if (q != null && h.status == 0 &&
                ((s = state) & ABITS) != WBIT && // waiter is eligible
                (s == 0L || q.mode == RMODE))
                // 唤醒头结点的下一有效节点,下面会介绍release方法
                release(h);
            break;
        }
    }
    //如果当前线程被中断或者传入进来的interrupted为true,直接返回中断标志位,否则返回0
    return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L;
}

2.4 release 方法实现

release 方法用于唤醒传入节点的后继节点。方法传入一个节点 h,如果传入节点的状态为 WAITING 首先修改传入节点的 state 状态为 0,如果传入节点的后继节点为空或者被取消,从后往前遍历找到该节点后继第一个未被取消的节点 q,如果 q 在阻塞状态则进行唤醒。

private void release(WNode h) {
    // 传入的节点不为空
    if (h != null) {
        WNode q; Thread w;
        // 取消等待状态
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        // 传入节点不存在后继节点,或后继节点被取消
        if ((q = h.next) == null || q.status == CANCELLED) {
            // 从后往前查找到离h最近的一个未被取消的节点
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 后继节点存在且在阻塞中
        if (q != null && (w = q.thread) != null)
            // 唤醒后继节点
            U.unpark(w);
    }
}

三、释放独占锁

void unlockWrite(long stamp): 如果锁状态与给定的邮戳匹配,则释放独占锁。

释放锁前先判断传入的邮戳是否与当前锁状态相等,且当前为持有写锁的状态。通过锁状态增加 WBIT 使表示独占锁持有锁的位(第8位)进位到锁版本位,如果 state 值溢出则设置为 ORIGINstate 修改完成表示独占锁已经释放,如果同步队列已经初始化,并且 head 节点的状态不为0,则唤醒下一个节点。

public void unlockWrite(long stamp) {
    WNode h;
    // 确定传入的锁状态是正确的,且当前持有独占锁
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    // 增加锁版本的值,如果位溢出则重置为ORIGIN
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    // 队列已经初始化,并且还有head节点状态不为0
    if ((h = whead) != null && h.status != 0)
        // 唤醒它的下一个节点
        release(h);
}

boolean tryUnlockWrite(): 如果持有独占锁则释放锁,不需要邮戳。 此方法对于错误后的恢复可能很有用。

相比于 unlockWrite 少了检查邮戳是否正确的步骤,其他流程相同。

public boolean tryUnlockWrite() {
    long s; WNode h;
    // 确定当前持有独占锁
    if (((s = state) & WBIT) != 0L) {
    	// 增加锁版本的值,如果位溢出则重置为ORIGIN
        state = (s += WBIT) == 0L ? ORIGIN : s;
    	// 队列已经初始化,并且还有head节点状态不为0
        if ((h = whead) != null && h.status != 0)
            // 唤醒它的下一个节点
            release(h);
        return true;
    }
    return false;
}

四、获取共享锁

4.1 获取锁的接口方法

long readLock(): 获取共享锁,如果该锁被另一个线程保持,则阻塞线程,直到拿到锁并返回邮戳。

如果当前没有在在排队等待的线程,且未持有独占锁,共享锁计数未溢出,则通过 CAS 获取共享锁,否则通过 acquireRead 方法获取锁。

如果当前有独占锁线程正在排队是无法立即获得共享锁的,只能在后面进行排队,为防止独占锁饥饿。

public long readLock() {
    long s = state, next;  // bypass acquireRead on common uncontended case
    // 当前不存在等待节点、共享锁计数未溢出,或者未持有锁
    return ((whead == wtail && (s & ABITS) < RFULL &&
            // CAS修改state获取锁
             U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
            // 当前有节点在等待或CAS获取失败,进入acquireRead方法获取锁
            next : acquireRead(false, 0L));
}

long tryReadLock(): 尝试获取共享锁,非公平,不阻塞线程。如果获取锁成功则返回邮戳,否则返回 0。

如果当前持有独占锁则直接获取锁失败,如果共享锁计数未溢出直接通过 CAS 获取共享锁,否则通过 tryIncReaderOverflow 方法获取锁,这是一个循环的过程,因为有 CAS 失败导致获取锁失败的可能性,所以需要确定获取锁成功或者目前已经持有了独占锁。

即使当前有独占锁线程在排队,也是可以立即获取到共享锁的。

public long tryReadLock() {
    for (;;) {
        long s, m, next;
        // 当前持有独占锁,直接获取锁失败
        if ((m = (s = state) & ABITS) == WBIT)
            return 0L;
        // 共享锁计数未溢出,或者未持有锁
        else if (m < RFULL) {
            // CAS 获取锁
            if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                return next;
        }
        // 共享锁计数溢出,通过tryIncReaderOverflow方法获取锁,
        // 并使用readerOverflow存储溢出的计数
        else if ((next = tryIncReaderOverflow(s)) != 0L)
            return next;
    }
}

tryReadLock(long time, TimeUnit unit) throws InterruptedException: 在给定的等待时间内尝试获取共享锁,获取成功则立即返回邮戳。超过等待时间获取失败或者接收到中断信号则返回 0。

当前未处于中断状态,判断当前是否持有独占锁,未持有独占锁则先进行一次获取锁尝试,获取锁失败计算等待时间,通过 acquireRead 方法进行锁获取。

public long tryReadLock(long time, TimeUnit unit)
    throws InterruptedException {
    long s, m, next, deadline;
    // 时间转换为纳秒
    long nanos = unit.toNanos(time);
    // 当前处于未中断状态
    if (!Thread.interrupted()) {
        // 当前未持有独占锁
        if ((m = (s = state) & ABITS) != WBIT) {
            // 共享锁计数未溢出,或者未持有锁
            if (m < RFULL) {
                // CAS 直接获取锁
                if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                    return next;
            }
            // 锁状态溢出,通过tryIncReaderOverflow方法获取锁
            else if ((next = tryIncReaderOverflow(s)) != 0L)
                return next;
        }
        // 等待时间小于0直接返回获取锁失败
        if (nanos <= 0L)
            return 0L;
        if ((deadline = System.nanoTime() + nanos) == 0L)
            deadline = 1L;
        // acquireRead进行定时获取锁,true表示响应中断
        if ((next = acquireRead(true, deadline)) != INTERRUPTED)
            return next;
    }
    throw new InterruptedException();
}

long readLockInterruptibly() throws InterruptedException: 调用后一直阻塞到获得共享锁,但是接受中断信号。

当未处于中断状态时通过 acquireRead 方法进行获取锁,如果发生中断则抛出异常。

public long readLockInterruptibly() throws InterruptedException {
    long next;
    // 当前未处于中断状态时通过acquireRead方法进行获取锁
    if (!Thread.interrupted() &&
        (next = acquireRead(true, 0L)) != INTERRUPTED)
        return next;
    throw new InterruptedException();
}

4.2 tryIncReaderOverflow 方法实现

tryIncReaderOverflow 方法首先尝试 CAS 将 state 值设置为 RBITS 确保其他持有共享锁的线程无法对 state 进行操作,修改成功表示获取锁成功,进行 readerOverflow 增加和 state 恢复,如果获取锁失败返回 0。

private long tryIncReaderOverflow(long s) {
    // 确定当前state的共享锁计数已经饱和
    if ((s & ABITS) == RFULL) {
        // CAS修改state为RBITS
        if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
            // 共享锁溢出计数+1
            ++readerOverflow;
            // 修改锁状态
            state = s;
            // 返回当前锁状态
            return s;
        }
    }
    else if ((LockSupport.nextSecondarySeed() &
              OVERFLOW_YIELD_RATE) == 0)
        Thread.yield();
    // 获取锁失败
    return 0L;
}

问:RFULL = RBITS - 1为什么?

前面也许会有疑问,为什么共享锁的最大计数 RFULL = RBITS - 1,在此处应该明白,是为了当共享锁计数溢出时,还可以使用 CAS 对 state 进行 +1 操作,因为修改 state 成功表示获取锁成功。

问:++readerOverflow操作线程安全吗?

线程安全,因为此时 state 的共享锁计数等于 RBITS,此时没有其他任何一个线程可以进行 readerOverflow 值的修改。

4.3 acquireRead 方法实现

private long acquireRead(boolean interruptible, long deadline) {
    WNode node = null, p;
    for (int spins = -1;;) {
        WNode h;
        // 如果头尾节点相等,让其自旋
        if ((h = whead) == (p = wtail)) {
            for (long m, s, ns;;) {
                // 当前持有共享锁,并且持有计数没有溢出,或者未持有锁
                if ((m = (s = state) & ABITS) < RFULL ?
                    // CAS修改state获取锁
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    // 当前持有共享锁,并且持有计数溢出,通过tryIncReaderOverflow获取锁
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                    return ns;
                // 当前持有独占锁
                else if (m >= WBIT) {
                    // 计数自减操作
                    if (spins > 0) {
                        if (LockSupport.nextSecondarySeed() >= 0)
                            --spins;
                    }
                    else {
                        // 自旋结束,判断头尾节点是否相等
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            // 如果头尾节点没有改变或者头尾节点不相等,退出自旋
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        // 赋值进行轮训
                        spins = SPINS;
                    }
                }
            }
        }
        // 如果尾节点为null,进行队列初始化
        if (p == null) { // initialize queue
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        // 如果当前节点为空,创建当前节点
        else if (node == null)
            node = new WNode(RMODE, p);
        // 头尾节点相等或者尾节点不是共享锁节点
        else if (h == p || p.mode != RMODE) {
            // 如果当前节点的前驱节点不是尾节点,重新设置当前节点的前驱节点
            if (node.prev != p)
                node.prev = p;
            // 将当前节点加入队列中,并且当前节点做为尾节点,如果成功退出循环
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;
            }
        }
        // 将当前节点加入尾节点的cowait栈
        else if (!U.compareAndSwapObject(p, WCOWAIT,
                                         node.cowait = p.cowait, node))
            node.cowait = null;
        else {
            for (;;) {
                WNode pp, c; Thread w;
                // 如果头节点的cowait栈不为空,并且其线程不为null,将cowait栈第一个节点出栈并唤醒
                if ((h = whead) != null && (c = h.cowait) != null &&
                    U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null) // help release
                    U.unpark(w);
                // 如果当前头节点为尾节点的前驱节点,或者头尾接地那相同,或者尾节点前驱节点为空
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    do {
                        // 判断当前state是否未持有锁,或处于共享锁状态,且没有溢出
                        if ((m = (s = state) & ABITS) < RFULL ?
                            // CAS修改state获取锁
                            U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + RUNIT) :
                            // 当前持有共享锁,并且持有计数溢出,通过tryIncReaderOverflow获取锁
                            (m < WBIT &&
                             (ns = tryIncReaderOverflow(s)) != 0L))
                            return ns;
                    // 当前未持有独占锁,进入循环
                    } while (m < WBIT);
                }
                // 如果头结点没有改变,并且尾节点的前驱节点没有改变
                if (whead == h && p.prev == pp) {
                    long time;
                    //如果前前节点为空,或者头节点为前置节点,或者前置节点已经取消
                    // 外部循环重新开始创建node节点
                    if (pp == null || h == p || p.status > 0) {
                        node = null; // throw away
                        break;
                    }
                    // 未设置超时时间
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        // 已经超时了,取消当前节点
                        return cancelWaiter(node, p, false);
                    // 当期线程
                    Thread wt = Thread.currentThread();
                    // 设置线程Thread的parkblocker属性,表示当前线程被谁阻塞,用于监控线程使用
                    U.putObject(wt, PARKBLOCKER, this);
                    // 将其当前线程设置为当前节点
                    node.thread = wt;
                   	// 检查之前的条件
                    if ((h != pp || (state & ABITS) == WBIT) &&
                        whead == h && p.prev == pp)
                        // 阻塞当前线程
                        U.park(false, time);
                    // 当前节点的线程置空
                    node.thread = null;
                    // 当前线程的监控对象也置为空
                    U.putObject(wt, PARKBLOCKER, null);
                    // 如果interruptible为true,线程中断则取消当前节点
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, p, true);
                }
            }
        }
    }
    // 阻塞当前线程,在阻塞当前线程之前,如果头节点和尾节点相等,让其自旋一段时间获取写锁。
    // 如果头结点不为空,释放头节点的cowait队列
    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        // 如果头节点和尾节点相等
        if ((h = whead) == p) {
            // 设置自旋的初始值
            if (spins < 0)
                spins = HEAD_SPINS;
            // 每次进入自旋,自旋次数*2,直到大于等于MAX_HEAD_SPINS
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            // 自旋获取读锁
            for (int k = spins;;) { // spin at head
                long m, s, ns;
                // 当前未持有锁或者持有共享锁且持有计数未溢出
                if ((m = (s = state) & ABITS) < RFULL ?
                    // CAS修改state获取锁
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    // 通过tryIncReaderOverflow方法获取锁
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                    WNode c; Thread w;
                    // 将当前节点设置为whead
                    whead = node;
                    // 前置节点设置为空
                    node.prev = null;
                    // 唤醒node节点的cowait栈
                    while ((c = node.cowait) != null) {
                        if (U.compareAndSwapObject(node, WCOWAIT,
                                                   c, c.cowait) &&
                            (w = c.thread) != null)
                            U.unpark(w);
                    }
                    return ns;
                }
                // 当前持有写锁,自旋次数-1
                else if (m >= WBIT &&
                         LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                    break;
            }
        }
        // 如果whead不为空
        else if (h != null) {
            WNode c; Thread w;
            // whead的cowait栈不为空
            while ((c = h.cowait) != null) {
                // 唤醒node节点的cowait栈
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        // 如果头结点没有改变
        if (whead == h) {
            // 当前节点的前置节点改变,更新前置节点的next
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            // 当前节点的前置节点状态为0,更新为WAITING
            else if ((ps = p.status) == 0)
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            // 当前节点的前置节点已被取消,从同步队列中删除
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                long time;
                if (deadline == 0L)
                    time = 0L;
                // 已经超时,取消当前节点
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                // 将其当前线程的监控对象设置为当前StampedLock,监控此线程被那个对象阻塞
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                // 如果当前节点的前驱节点为等待状态,并且头尾节点不相等或者当前状态为独占锁状态
                // 并且头结点不变,当前节点的前驱节点不变
                if (p.status < 0 &&
                    (p != h || (state & ABITS) == WBIT) &&
                    whead == h && node.prev == p)
                    //调用UnSafe的park方法阻塞当前线程
                    U.park(false, time);
                // 将当前节点对应的线程置为空
                node.thread = null;
                // 将当前线程的监控对象置为空
                U.putObject(wt, PARKBLOCKER, null);
                // 如果传入进来的参数interruptible为true,并且当前线程被中断
                if (interruptible && Thread.interrupted())
                    // 取消当前节点
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

五、释放共享锁

5.1 释放锁的接口方法

void unlockRead(long stamp): 如果锁状态与给定的邮戳匹配,则释放共享锁。

先判断传入邮戳的锁版本和当前 state 表示的版本是否一致,并确定当前邮戳和 state 为持有锁的状态,并且持有的是锁不是独占锁,否则将抛出 IllegalMonitorStateException 异常。

如果当前共享锁计数未溢出,则直接通过 CAS 修改 state 释放锁,否则通过 tryDecReaderOverflow 进行释放锁操作。释放锁之后,如果当前释放的是最后一个共享锁,且有同步队列包含节点,则进行节点唤醒。因为共享锁可能包含多个锁同时修改 state,CAS 存在失败的可能,所以需要循环的去获取,直到获取成功或者确定无法获取锁,并抛出 IllegalMonitorStateException 异常。

public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        // 传入邮戳和当前state的版本不一致
        if (((s = state) & SBITS) != (stamp & SBITS) ||
            // 邮戳为未持有锁状态,或当前状态为未持有锁或持有的是独占锁
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        // 共享锁状态未溢出
        if (m < RFULL) {
            // CAS修改state
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                // 如果当前释放的是最后一个共享锁,且whead节点不为空,状态不为0
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    // 唤醒它的下一个节点
                    release(h);
                break;
            }
        }
        // 共享锁计数溢出,通过tryDecReaderOverflow方法释放
        else if (tryDecReaderOverflow(s) != 0L)
            break;
    }
}

boolean tryUnlockRead(): 如果持有共享锁则进行一次锁释放,不需要邮戳。 此方法对于错误后的恢复可能很有用。

先判断当前 state 是否为持有共享锁的状态,未持有共享锁则直接返回 false,如果持有共享锁进行锁释放。释放锁分共享锁计数是否溢出两种状况,未溢出时直接通过 CAS 修改 state 进行锁释放,如果计数溢出则通过 tryDecReaderOverflow 进行释放锁操作。释放锁之后,如果当前释放的是最后一个共享锁,且有同步队列包含节点,则进行节点唤醒。同理,CAS 可能失败,需要循环进行锁释放。

public boolean tryUnlockRead() {
    long s, m; WNode h;
    // 当前state为持有锁状态,并且持有的不是独占锁
    while ((m = (s = state) & ABITS) != 0L && m < WBIT) {
        // 共享锁计数未溢出
        if (m < RFULL) {
            // CAS修改state获取锁
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                // 如果当前释放的是最后一个共享锁,且whead节点不为空,状态不为0
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                // 唤醒它的下一个节点
                return true;
            }
        }
        // 共享锁计数溢出,通过tryDecReaderOverflow方法释放
        else if (tryDecReaderOverflow(s) != 0L)
            return true;
    }
    return false;
}

5.2 tryDecReaderOverflow 方法实现

tryDecReaderOverflow 方法首先尝试 CAS 将 state 值设置为 RBITS 来确保其他持有共享锁的线程无法对 state 进行操作,修改成功表示获取锁成功,根据溢出值是否大于 0 对 readerOverflowstate 的共享锁计数 -1,如果获取锁失败返回 0。

private long tryDecReaderOverflow(long s) {
    // assert (s & ABITS) >= RFULL;
    // 当前共享锁计数为溢出状态
    if ((s & ABITS) == RFULL) {
        // 将共享锁计数修改为 RBITS,即计数+1
        if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
            int r; long next;
            // 共享锁计数溢出值大于0
            if ((r = readerOverflow) > 0) {
                // 溢出值-1
                readerOverflow = r - 1;
                next = s;
            }
            else
                // state值-1
                next = s - RUNIT;
             state = next;
             return next;
        }
    }
    else if ((LockSupport.nextSecondarySeed() &
              OVERFLOW_YIELD_RATE) == 0)
        Thread.yield();
    return 0L;
}

六、释放共享或独占锁

void unlock(long stamp): 如果锁状态与给定的邮戳匹配,则释放锁的相应模式。

如果当前持有的是共享锁,存在 CAS 失败的可能,所以需要循环进行锁释放,除非当前锁版本和邮戳的锁版本不同,或者当前未持有锁。检查通过先判断当前 state 持有的是否是独占锁,如果持有独占锁则直接修改 state 进行锁释放,否则再次对传入的邮戳进行检查。释放共享锁时区分溢出和未溢出,如果未溢出则通过 CAS 进行锁释放,如果溢出了通过 tryDecReaderOverflow 方法进行锁释放。

public void unlock(long stamp) {
    long a = stamp & ABITS, m, s; WNode h;
    // 当前锁版本和邮戳锁版本相同
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        // 当前未持有锁
        if ((m = s & ABITS) == 0L)
            break;
        // 当前持有独占锁,进行独占锁释放
        else if (m == WBIT) {
            if (a != m)
                break;
            // 直接修改state,如果溢出则重置为ORIGIN
            // 只有一个线程可以持有独占锁,所以直接修改时线程安全的
            state = (s += WBIT) == 0L ? ORIGIN : s;
            // whead节点不为空,状态不为0
            if ((h = whead) != null && h.status != 0)
                // 唤醒whead后续的节点
                release(h);
            return;
        }
        // 传入邮戳错误,为未持有锁或者同时持有共享锁和独占锁的状态
        else if (a == 0L || a >= WBIT)
            break;
        // 当前state表示共享锁计数未溢出,或者未持有锁
        else if (m < RFULL) {
            // CAS修改state释放锁
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                // 如果当前释放的是最后一个共享锁,且whead节点不为空,状态不为0
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    // 唤醒whead后续的节点
                    release(h);
                return;
            }
        }
        // 排除以上情况,剩余情况是当前持有共享锁但是共享锁计数已经溢出
        else if (tryDecReaderOverflow(s) != 0L)
            return;
    }
    // 释放锁失败,抛出异常
    throw new IllegalMonitorStateException();
}

七、乐观锁实现

long tryOptimisticRead(): 如果当前未持有独占锁则返回当前锁版本作为邮戳,用于在以后验证状态,如果已持有独占锁则返回0,也就表示获取乐观锁失败,否则就返回 state 前 57 位的值,作为锁版本。只要获取或者释放了独占锁,都会导致锁版本的改变。

public long tryOptimisticRead() {
    long s;
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

boolean validate(long stamp): 如果自给定邮戳获取后未获取过独占锁(独占锁状态码未改变),则返回 true。 如果状态为 0,则始终返回 false。

public boolean validate(long stamp) {
    // 定义内存屏障,避免代码重排序
    U.loadFence();
    // 比较邮储与state的锁版本是否相同
    return (stamp & SBITS) == (state & SBITS);
}

八、锁升降级

long tryConvertToWriteLock(long stamp): 验证当前锁版本和锁持有状态和给定的邮戳是否匹配,如果不匹配、邮戳的锁状态有误或当前持有多个共享锁则返回 0。匹配时则分三种情况,当前未持有锁则获取独占锁,当前持有独占锁则不进行操作,当前仅持有一个共享锁则释放共享锁获取独占锁,最终返回独占锁的邮戳。

public long tryConvertToWriteLock(long stamp) {
    long a = stamp & ABITS, m, s, next;
    // 当前锁版本和邮戳锁版本相同
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        // 当前未持有锁
        if ((m = s & ABITS) == 0L) {
            // 邮戳为持有锁的状态
            if (a != 0L)
                break;
            // CAS修改state获取独占锁
            if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
                return next;
        }
        // 当前已经持有独占锁
        else if (m == WBIT) {
            // 传入邮戳的锁状态有错
            if (a != m)
                break;
            // 直接返回邮戳
            return stamp;
        }
        // 当前仅持有一个共享锁
        else if (m == RUNIT && a != 0L) {
            // CAS修改state释放共享锁获取独占锁
            if (U.compareAndSwapLong(this, STATE, s,
                                     next = s - RUNIT + WBIT))
                return next;
        }
        // 否则则失败
        else
            break;
    }
    return 0L;
}

long tryConvertToReadLock(long stamp): 验证当前锁版本和锁持有状态和给定的邮戳是否匹配,如果不匹配或者邮戳的锁状态有误则返回 0。匹配时则分三种情况,当前未持有锁则获取共享锁,当前持有独占锁则释放独占锁获取共享锁,当前持有共享锁则不进行操作,最终返回共享锁的邮戳。

public long tryConvertToReadLock(long stamp) {
    long a = stamp & ABITS, m, s, next; WNode h;
    // 当前锁版本和邮戳锁版本相同
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        // 当前未持有锁
        if ((m = s & ABITS) == 0L) {
            // 邮戳为持有锁的状态
            if (a != 0L)
                break;
            // 当前state表示共享锁计数未溢出,或者未持有锁,CAS修改state获取锁
            else if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                    return next;
            }
            // 当前共享锁计数溢出,通过tryIncReaderOverflow获取锁
            else if ((next = tryIncReaderOverflow(s)) != 0L)
                return next;
        }
        // 当前持有独占锁
        else if (m == WBIT) {
            // 传入邮戳的锁状态有错
            if (a != m)
                break;
            // 持有独占锁只有一个线程,直接修改state释放独占锁获取共享锁
            state = next = s + (WBIT + RUNIT);
            // 独占锁被释放,如果有排队线程则进行唤醒
            if ((h = whead) != null && h.status != 0)
                // 唤醒线程
                release(h);
            return next;
        }
        // 当前邮戳为持有共享锁的邮戳,不做操作直接返回邮戳
        else if (a != 0L && a < WBIT)
            return stamp;
        else
            break;
    }
    return 0L;
}

long tryConvertToOptimisticRead(long stamp): 验证当前锁版本和锁持有状态和给定的邮戳是否匹配,如果匹配则进行一次锁释放,如果不匹配或者邮戳的锁状态有误则返回 0。该方法的逻辑和 unlock 方法的逻辑相似,如果当前未持有锁就直接返回锁版本,如果持有锁则进行一次锁释放,再返回锁版本。

其实这个接口基本上等于,unlock + tryOptimisticRead 两个方法结合,个人认为这个接口用处不大。释放锁成功,返回了当前锁版本,如果有独占锁在排队,将唤醒该线程获取锁,导致锁版本变化,最终乐观锁也是无法通过校验的。

如果在 release 唤醒时可以判断一下排队的是否是独占锁,如果是独占锁返回 0 使用上可能会有更好的效果,但是这样也许不是本意想要的,毕竟锁实实在在是释放成功了,还返回 0 就不符合锁降级的规则了。

public long tryConvertToOptimisticRead(long stamp) {
    long a = stamp & ABITS, m, s, next; WNode h;
    // 定义内存屏障,避免代码重排序
    U.loadFence();
    for (;;) {
        // 当前锁版本和邮戳锁版本相同
        if (((s = state) & SBITS) != (stamp & SBITS))
            break;
        // 当前state未持有锁
        if ((m = s & ABITS) == 0L) {
            // 邮戳为持有锁的状态
            if (a != 0L)
                break;
            // 直接返回当前锁版本作为乐观锁邮戳
            return s;
        }
        // 当前state持有写锁
        else if (m == WBIT) {
            // 邮戳为未持有锁的状态
            if (a != m)
                break;
            // 直接修改state释放独占锁
            state = next = (s += WBIT) == 0L ? ORIGIN : s;
            // whead节点不为空,状态不为0
            if ((h = whead) != null && h.status != 0)
                // 唤醒whead后续的节点
                release(h);
            // 返回释放锁后的state作为乐观锁邮戳
            // 共享锁持有计数
            return next;
        }
        // 邮戳为未持有锁状态,或者邮戳状态错误
        else if (a == 0L || a >= WBIT)
            break;
        // 当前state表示共享锁计数未溢出,或者未持有锁
        else if (m < RFULL) {
            // CAS释放锁
            if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) {
                // 如果当前释放的是最后一个共享锁,且whead节点不为空,状态不为0
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    // 唤醒whead后续的节点
                    release(h);
                // 重新计算锁版本
                return next & SBITS;
            }
        }
        // 共享锁计数溢出,通过tryDecReaderOverflow方法释放锁
        else if ((next = tryDecReaderOverflow(s)) != 0L)
            return next & SBITS;
    }
    return 0L;
}

九、LOCK视图

Lock asReadLock(): 返回此 StampedLock 的普通 Lock 视图,其中 Lock.lock 方法映射到 readLock ,其他方法也类似。 返回的 Lock 不支持 Condition ; 方法 Lock.newCondition() 抛出 UnsupportedOperationException

public Lock asReadLock() {
    ReadLockView v;
    return ((v = readLockView) != null ? v :
            (readLockView = new ReadLockView()));
}
// view classes
final class ReadLockView implements Lock {
    public void lock() { readLock(); }
    public void lockInterruptibly() throws InterruptedException {
        readLockInterruptibly();
    }
    public boolean tryLock() { return tryReadLock() != 0L; }
    public boolean tryLock(long time, TimeUnit unit)
        throws InterruptedException {
        return tryReadLock(time, unit) != 0L;
    }
    // unstampedUnlockRead逻辑和tryUnlockRead相似,只是释放锁失败抛出异常
    public void unlock() { unstampedUnlockRead(); }
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
}

Lock asWriteLock(): 返回此 StampedLock 的普通 Lock 视图,其中 Lock.lock 方法映射到 writeLock ,其他方法也类似。 返回的 Lock 不支持 Condition ; 方法 Lock.newCondition() 抛出 UnsupportedOperationException

public Lock asWriteLock() {
    WriteLockView v;
    return ((v = writeLockView) != null ? v :
            (writeLockView = new WriteLockView()));
}

final class WriteLockView implements Lock {
    public void lock() { writeLock(); }
    public void lockInterruptibly() throws InterruptedException {
        writeLockInterruptibly();
    }
    public boolean tryLock() { return tryWriteLock() != 0L; }
    public boolean tryLock(long time, TimeUnit unit)
        throws InterruptedException {
        return tryWriteLock(time, unit) != 0L;
    }
    // unstampedUnlockWrite逻辑和tryUnlockWrite相似,只是释放锁失败抛出异常
    public void unlock() { unstampedUnlockWrite(); }
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
}

ReadWriteLock asReadWriteLock(): 返回此 StampedLockReadWriteLock 视图,其中 ReadWriteLock.readLock() 方法映射到 asReadLock()ReadWriteLock.writeLock() 映射到 asWriteLock()

public ReadWriteLock asReadWriteLock() {
    ReadWriteLockView v;
    return ((v = readWriteLockView) != null ? v :
            (readWriteLockView = new ReadWriteLockView()));
}

final class ReadWriteLockView implements ReadWriteLock {
    public Lock readLock() { return asReadLock(); }
    public Lock writeLock() { return asWriteLock(); }
}
---------本文结束感谢您的阅读---------

评论

 热烈欢迎各位大佬专家莅临玖涯博客指导检查!

 交换友链的朋友请前往友情链接

12 : 111
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×