Java 锁系列(五)——ReentrantReadWriteLock源码分析

Java 锁系列(五)——ReentrantReadWriteLock源码分析

一、ReentrantReadWriteLock概述

ReentrantReadWriteLock 是读写锁的实现,读锁可以在没有写锁的时候被多个线程同时持有,写锁就是和 ReentrantLock 类似的独占锁,它同样有包含公平锁和不公平锁两种实现方式,在实例化时将同时实例化 ReadLockWriteLock 实例,本文主要聚焦于 WriteLock 的实现分析,因为 ReentrantReadWriteLock 中的部分实现与 ReentrantLock 一样基于 AbstractQueuedSynchronized 的实现,对于一些相同的逻辑本文不再赘述。

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

1.1 锁状态码存储

ReentrantLock 中,通过 state 参数保存锁状态码,状态码存储着当前锁重入的次数。在 ReentrantReadWriteLock 中同时包含读和写两种锁类型,需要两个状态码来保存锁状态。ReentrantReadWriteLockstate 低 16 位用于存储写锁状态,高16位用于存储读锁状态。

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 返回以计数表示读锁的状态  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** 返回以计数表示写锁的状态  */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

state 存储的读锁状态为所有读锁状态的总和,每个读锁都通过 HoldCounter 对象实例存储自己当前的锁状态。

二、Sync 的公平和非公平实现

**公平锁: ** 多个线程相互竞争时要排队,多个线程按照申请锁的顺序来获取锁。

**非公平锁: ** 多个线程相互竞争时,先尝试插队,插队失败再排队。

Sync 包含 FairSyncNonfairSync 两个子类,分别复写了如下两个方法:

  • boolean readerShouldBlock():如果当前线程在试图获得读锁或操作时时,方法返回 true 表示当前线程需要排队,返回 false 表示线程可以先插队获取锁,或者当前队列为空不需要排队。
  • boolean writerShouldBlock():如果当前线程在尝试获取写锁或其他操作时,方法返回 true 表示当前线程需要排队,返回 false 表示线程可以在排队前先插队获取锁,或者当前队列为空不需要排队。

2.1 FairSync 公平锁

FairSyncReentrantReadWriteLock 公平锁的实现。FairSync 通过 hasQueuedPredecessors 方法检查当前 AQS 队列,hasQueuedPredecessors 方法在有其他线程的节点排队在队列首部则返回 true ,否则返回 false。所以,在公平锁中获取读、写锁时,只要队列首部是其他线程都需要进行排队,除非队列为空或者队列首部为当期线程,以此保证锁竞争公平。

final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}

2.2 NonfairSync 非公平锁

NonfairSyncReentrantLock 非公平锁的实现。在获取独占锁相关操作时,恒定返回 false ,无论在哪种情况下都将尝试插队;在获取共享锁相关操作时,如果队列不为空,且队列首部节点在等待独占锁,则将返回 true 不进行插队,此操作是为了防止独占锁饥饿。

// 独占锁无论如何都尝试插队
final boolean writerShouldBlock() {
    return false;
}
// 如果队列不为空,且队列首部节点在等待独占锁,则返回true进行排队(避免独占锁饥饿)
final boolean readerShouldBlock() {
    /* As a heuristic to avoid indefinite writer starvation,
     * block if the thread that momentarily appears to be head
     * of queue, if one exists, is a waiting writer.  This is
     * only a probabilistic effect since a new reader will not
     * block if there is a waiting writer behind other enabled
     * readers that have not yet drained from the queue.
     */
    return apparentlyFirstQueuedIsExclusive();
}

问:独占锁是如何产生锁饥饿的?

独占锁饥饿原因在于独占锁仅能被一个线程获取,共享锁可以被多个线程获取。

如果共享锁允许无限插队,假如当前有10个共享锁,执行了一段时间释放了5个共享锁,但是又有5个共享锁插队进来了,如此循环往复,一直有线程持有着共享锁,导致共享锁一直没有被释放,此时独占锁就无法被持有。

apparentlyFirstQueuedIsExclusive 在队列不为空,且 head 后续节点不是共享锁状态时返回 true

final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

三、WriteLock 实现

3.1 lock 实现流程

lock 用于获取锁,不响应中断。WriteLock 获取锁的流程整体上和 ReentrantLock 获取锁的流程相同,不同点体现在 tryAcquire 方法。

lock 获取锁时,首先执行的是 tryAcquire 方法, tryAcquire 将直接尝试获取锁,此时不通过 AQS 队列获取锁,线程在 AQS 队列中并不存在对应的 node 节点。如果未获取到锁,此时将进入 AQS 队列排队,addWaiter 步骤中可同时进行队列初始化,队列排队的核心逻辑在 acquireQueued 方法,在该方法中将循环判断当前线程是否时候 AQS 队列的第二个 node 线程(第一个 node 为持有锁线程),如果是的话执行 tryAcquire 进获取锁尝试,如果获取锁失败,将进入 shouldParkAfterFailedAcquire 修改前一个节点的 node 节点状态,然后再次进入循环根据前一个节点状态阻塞当前节点线程,直到被唤醒。

3.1.1 tryAcquire 方法实现

tryAcquire 用于以独占的方式获取锁,返回 boolean 表示获取锁是否成功。tryAcquire 中先获取了当前锁的状态,如果当前锁已被持有,将判断当前持有的锁是不是独占锁、当前线程是否是持有锁的线程和锁状态是否超过最大值,都满足则进行重入;如果当前锁未被持有,则通过 writerShouldBlock 判断是否跳过排队直接尝试获取锁,如果 writerShouldBlock 返回 true 则进行 CAS 获取锁尝试。

protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    // 取得当前独占锁状态码
    int w = exclusiveCount(c);
    // 当前已经持有了独占锁或者共享锁
    if (c != 0) {
        // 当前未持有独占锁,或者持有锁的不是当前线程,则获取锁失败
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 独占锁值超过了最大值
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 修改锁状态
        setState(c + acquires);
        return true;
    }
    // writerShouldBlock 判断是否非公平,是否需要插队
    if (writerShouldBlock() ||
        // CAS 修改锁状态
        !compareAndSetState(c, c + acquires))
        return false;
    // CAS获取锁成功,修改持有锁的线程
    setExclusiveOwnerThread(current);
    return true;
}

3.2 lockInterruptibly 实现流程

lockInterruptibly 用于获取锁,如果收到中断信号则中止获取。除了 tryAcquire 所有实现流程和 ReentrantLocklockInterruptibly 实现流程相同。整体的实现和 lock 方法也是大同小异,只是在 lockInterruptibly 中 AQS 队列接收到中断信号直接终止获取锁,并抛出 InterruptedException 异常,而 lock 仅仅保留了中断标志。

3.3 tryLock 实现流程

不携带时间参数的 tryLock 方法用于尝试获取锁,非公平,不阻塞线程。方法中通过 tryWriteLock 方法来尝试获取锁,整体上获取锁流程和 tryAcquire 方法相同,不同的是去除了判断是否通过 CAS 插队获取锁的判断,无论如何都进行获取锁尝试,所以这是一个非公平的获取锁尝试。

final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    // 锁已被持有
    if (c != 0) {
        // 独占锁状态
        int w = exclusiveCount(c);
        // 被持有的不是独占锁,或者持有锁的不是当前线程
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 锁状态已经是最大值了
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    // 尝试CAS修改锁状态
    if (!compareAndSetState(c, c + 1))
        return false;
    // 修改成功,修改持有锁线程
    setExclusiveOwnerThread(current);
    return true;
}

携带时间参数的 tryLock 用于在给定的等待时间内尝试获取锁,获取成功则立即返回 true。超过等待时间获取失败或者接收到中断信号则返回 false 。整体执行流程和 ReentrantLock 相同,只是 tryAcquire 方法的逻辑有些不同。

带时间参数的 tryLock 将时间转为纳秒级时间单位,然后传给 tryAcquireNanostryAcquireNanos 方法中先直接调用 tryAcquire 尝试获取锁,获取锁失败再调用 doAcquireNanos 方法进行排队。doAcquireNanos 将创建 node 节点加入同步队列,然后以独占定时模式获取锁,尝试获取锁失败后将阻塞指定时间的获取锁线程。当阻塞时间到之后再次尝试获取锁,阻塞时间完成或者在阻塞时接收到中断信号时停止获取锁尝试,最终方法返回值将指定是否成功获取到锁。

3.4 unlock 实现流程

unlock 通过 tryRelease 方法进行锁状态修改和释放锁,如果完全释放了锁,将进行 AQS 队列判断。如果队列已经初始化,将更新当前节点的节点状态为 0,并尝试唤醒 head 后继的第一个等待获取锁的节点。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

3.4.1 tryRelease 方法实现

tryRelease 用于进行释放锁操作,返回值表示是否完全释放了锁。方法首先判断当前线程是否是持有锁线程,如果未持有锁将抛出 IllegalMonitorStateException 异常,然后计算锁状态,如果修改后的锁状态值为 0,表示锁被完全释放,此时将清除持有锁的线程。

protected final boolean tryRelease(int releases) {
    // 当前线程是否持有锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 计算锁状态
    int nextc = getState() - releases;
    // 锁状态为0表示锁被完全释放,不为0表示只是减扣除重入,线程依旧持有锁
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        // 清除持有锁线程
        setExclusiveOwnerThread(null);
    // 修改锁状态
    setState(nextc);
    return free;
}

3.5 newCondition 实现流程

WriteLockReentrantLock 都是实例化的 ConditionObject 类,除了部分方法有差别,整体工作流程和原理相同,本文不做赘述。

四、ReadLock 实现

ReadLock 一些全局变量的介绍:

  • firstReader :第一个持有共享锁的线程,释放锁后为 null
  • firstReaderHoldCount :第一个持有共享锁的线程的持有锁计数,即锁状态;
  • readHolds :当前线程持有的 ReadLock 锁的数量, 仅在构造函数和 readObject 中初始化,每当线程的共享锁状态下降到 0 时删除;
  • cachedHoldCounter :最后一个成功获取共享锁线程的缓存;

HoldCounter 对象用于记录持有共享锁的线程,记录了持有锁的线程的线程id和锁计数。

4.1 lock 实现流程

共享锁通过 acquireShared 方法获取锁。获取锁时,先进行获取锁尝试,获取锁失败进入 AQS 队列排队。在获取锁的流程中,创建节点和加入 AQS 进行排队的流程和 WriteLock 相同,区别体现在 tryAcquireShared 获取锁的方法中。

获取锁时,如果其他线程持有了独占锁则直接获取失败,否则将调用 readerShouldBlock 方法判断是否尝试插队获取锁。获取锁时通过 CAS 模式更新 state 写状态的值,更新成功则表示获取锁成功。除此之外还需更新当前线程对应的 ReadLock 锁状态,即锁计数,该值通过 HoldCounter 存储,并通过 cachedHoldCounter 缓存最后一个获取共享锁线程的 HoldCounter。首个持有共享锁的线程较为特殊,通过 firstReaderfirstReaderHoldCount 存储持有锁的线程和锁状态计数。

4.1.1 tryAcquireShared 方法实现

在当前方法中仅初步的进行共享锁获取,获取锁失败将进入 fullTryAcquireShared 方法进行进一步的锁状态获取。

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    // 已获取独占锁,且持有锁线程的不是当前线程
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        // 获取锁失败,需要排队
        return -1;
    // 取得共享锁状态
    int r = sharedCount(c);
    // readerShouldBlock:判断是否需要插队获取锁,公平和非公平锁实现不同
    if (!readerShouldBlock() &&
        // 锁状态小于最大值
        r < MAX_COUNT &&
        // CAS修改锁状态
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            // 当前线程为第一个持有共享锁的线程
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 当前线程为第一个持有共享锁的线程,直接增加共享锁状态
            firstReaderHoldCount++;
        } else {
            // 取得最后一次获取readLock锁线程的缓存
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                // 缓存的不是当前线程,更新缓存,HoldCounter如果不存在将创建 
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                // 缓存的readLock锁已被释放,重新添加
                readHolds.set(rh);
            // 增加锁计数
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

4.1.2 fullTryAcquireShared 方法实现

完整版本的获取共享锁流程,处理 CAS 未命中和未在 tryAcquireShared 中处理的可重入读取。这段代码与 tryacquirered 中的代码在一定程度上是冗余的。

在当前方法中进行循环获取锁,获取锁时将会有以下三种情况不同的情况:

  1. 有线程已经持有了独占锁,如果持有独占锁的线程不是当前线程,直接返回 -1 获取锁失败,持有独占锁的线程是当前线程则正常进行共享锁获取。
  2. 未被获取独占锁,但是 readerShouldBlocktrue,不允许插队获取锁。如果当前线程为第一个持有共享锁的线程,则正常进入获取锁流程。如果非第一次持有锁线程,则需要先判断当前线程是否已经持有了共享锁,未持有共享锁时将清除 readHolds 中的引用,并直接返回 -1 后续通过排队进行锁获取。
  3. 未被获取独占锁,但是 readerShouldBlockfalse,允许插队获取锁。此时将直接进入获取锁流程。

获取独占锁时通过 CAS 直接尝试修改 state 的值,如果修改成功更新当前持有共享锁线程的状态和缓存,修改失败则进入下一次循环。

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
    	// 已获取独占锁,且持有锁线程的不是当前线程
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                // 获取锁失败,需要排队
                return -1;
        } else if (readerShouldBlock()) {
            // 不允许插队获取锁
            // 当前为第一个持有共享锁的线程
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    // 取得当前线程的 HoldCounter,先从缓存获取,再从readHolds获取
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        // 锁状态为0删除readHolds的锁引用
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 锁状态为0则返回获取锁失败
                if (rh.count == 0)
                    return -1;
            }
        }
        // 共享锁计数为最大值
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // CAS 修改锁状态
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 修改前共享锁状态为0,当前为第一个持有锁的线程
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 当前为持有锁线程,当前共享锁的状态加1
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                // 当前缓存的最后一个获取共享锁的线程不是当前线程,更新缓存
                if (rh == null || rh.tid != getThreadId(current))
                    // HoldCounter如果不存在将创建
                    rh = readHolds.get();
                // 获取到的缓存中锁状态为0,需要重新添加到readHolds
                else if (rh.count == 0)
                    readHolds.set(rh);
                // 增加锁计数
                rh.count++;
                cachedHoldCounter = rh;
            }
            return 1;
        }
    }
}

问:为什么需要循环进行锁获取?

与独占锁不同,共享锁允许多个线程同时持有。独占锁通过 CAS 修改锁状态失败,则表示其他线程修改了锁状态,锁已被其他线程获取,需要进入排队。但是共享锁通过 CAS 修改锁状态失败,并不一定导致当前线程不能获取共享锁,除非其他线程获取了独占锁,所以获取锁失败需要通过循环再次尝试获取,确定锁的状态。

问:ReentrantReadWriteLock如何进行锁降级?

从上述源码分析,如果当前线程持有 WriteLock,是有权再获取ReadLock锁的,所以可以进行锁降级。

降级流程为:获取WriteLock->获取ReadLock->释放->WriteLock

4.1.3 doAcquireShared 方法实现

为当前线程创建 node 节点并加入同步队列尾部进行排队。排队获取锁逻辑与获取独占锁的 acquireQueued 方法相同 。

private void doAcquireShared(int arg) {
    // 创建共享节点,并插入队列尾部
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 当前线程是否被中断
        boolean interrupted = false;
        for (;;) {
            // 取得前一个节点
            final Node p = node.predecessor();
            // 前一个节点为head,表示当前节点可以尝试获取锁
            if (p == head) {
                // 尝试获取锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 更新head节点和引用,如果next为共享节点进行唤醒
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    // 设置中断状态
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // shouldParkAfterFailedAcquire:检查和更新未能获取锁的节点的状态, 如果线程应该阻塞返回true
            // parkAndCheckInterrupt:挂起线程,被唤醒后检查线程是否中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 保存中断状态
                interrupted = true;
        }
    } finally {
        if (failed)
            // 取消获取锁尝试
            cancelAcquire(node);
    }
}

4.1.4 setHeadAndPropagate 方法实现

当前方法在获取共享锁成功后被调用,用于更新 head 引用,并判断下一个等待节点是否是共享锁节点,如果是共享锁需要进行唤醒。

在创建锁成功后进行调用,propagate值恒大于0

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    // 更新head节点,删除node的prev和thread引用
    setHead(node);
    // propagate大于0、原/新头节点为null或未被取消
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 下一个节点为共享锁节点
        if (s == null || s.isShared())
            // 进行节点唤醒
            doReleaseShared();
    }
}

4.2 lockInterruptibly 实现流程

lockInterruptibly 通过 acquireSharedInterruptibly 方法获取共享锁,整体流程和 ReentrantLocklockInterruptibly 方法相同。

doAcquireSharedInterruptibly 方法为当前线程创建 node 节点并加入同步队列尾部进行排队,该方法响应中断。排队获取锁逻辑与获取独占锁的 doAcquireInterruptibly 方法相同 。

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 创建共享节点,并插入队列尾部
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 取得前一个节点
            final Node p = node.predecessor();
            // 前一个节点为head,表示当前节点可以尝试获取锁
            if (p == head) {
                // 尝试获取锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 更新head节点和引用,如果next为共享节点进行唤醒
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // shouldParkAfterFailedAcquire:检查和更新未能获取锁的节点的状态,如果线程应该阻塞返回true
            // parkAndCheckInterrupt:挂起线程,被唤醒后检查线程是否中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 中断时抛出异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            // 取消获取锁尝试
            cancelAcquire(node);
    }
}

4.3 tryLock 实现流程

不携带时间参数的 tryLock 方法用于尝试获取锁,非公平,不阻塞线程。与 fullTryAcquireShared 方法相同,为了避免由于并发修改时 CAS 失败,需要循环进行锁获取,除非可以确定无法获取锁(当前已经获取了独占锁,且持有锁线程不是当前线程)。

final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
    	// 已获取独占锁,且持有锁线程的不是当前线程
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        // 取得共享锁状态
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 尝试获取共享锁
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 获取共享锁成功
            if (r == 0) {
            	//当前线程为第一个持有共享锁的线程
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 当前线程为第一个持有共享锁的线程,直接增加共享锁状态
                firstReaderHoldCount++;
            } else {
                // 从缓存中获取当前线程的HoldCounter
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    // 未获取到,从readHolds取值或者创建
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    // 状态为0,表示已被从readHolds删除,重新加入
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

带时间参数的 tryLock 将时间转为纳秒级时间单位,然后传给 tryAcquireSharedNanos,整体获取锁流程和获取独占锁的流程相同。tryAcquireSharedNanos 方法中先直接调用 tryAcquireShared 尝试获取锁,获取锁失败再调用 doAcquireSharedNanos 方法进行排队。doAcquireSharedNanos 将创建 node 节点加入同步队列,然后以独占定时模式获取锁,尝试获取锁失败后将阻塞指定时间的获取锁线程。当阻塞时间到之后再次尝试获取锁,阻塞时间完成或者在阻塞时接收到中断信号时停止获取锁尝试,最终方法返回值将指定是否成功获取到锁。

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 时间错误
    if (nanosTimeout <= 0L)
        return false;
    // 计算到期时间
    final long deadline = System.nanoTime() + nanosTimeout;
    //创建共享节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 取得当前节点前一个节点
            final Node p = node.predecessor();
            if (p == head) {
                // 尝试获取锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 更新head节点和引用,如果next为共享节点进行唤醒
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            // 超时了
            if (nanosTimeout <= 0L)
                return false;
            // shouldParkAfterFailedAcquire:检查和更新未能获取锁的节点的状态,如果线程应该阻塞返回true
            // parkAndCheckInterrupt:挂起线程,被唤醒后检查线程是否中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 中断抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            // 取消获取锁尝试
            cancelAcquire(node);
    }
}

4.4 unlock 实现流程

unlock 通过 releaseShared 方法进行释放锁。先调用 tryReleaseShared 方法进行锁状态修改,如果所有共享锁和独占锁都被完全释放,再调用 doReleaseShared 修改 head 节点状态并唤醒后续节点。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

4.4.1 tryReleaseShared 方法实现

进行共享锁状态和当前共享锁计数修改,如果共享锁状态为0,表示所有共享锁都被完全释放。

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 第一个持有锁的线程为当前线程,删除锁引用
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // 取得当前线程对应的HoldCounter
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
        	//当前锁已被完全释放删除HoldCounter
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        // 锁状态-1
        --rh.count;
    }
    // 可能存在多个共享锁同时unlock,将造成CAS失败,需要重试
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        // CAS修改status
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

4.4.2 doReleaseShared 方法实现

当所有共享锁都被完全释放时执行,修改 head 节点状态,如果 head 状态为 SIGNAL 将唤醒下一个 node 进行获取锁尝试。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                //修改head的状态
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
            	// head为SIGNAL状态,需要唤醒后续线程
                unparkSuccessor(h);
            }
            // 修改head的状态为PROPAGATE
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果head节点改变则需要从新进行状态修改
        if (h == head)                   // loop if head changed
            break;
    }
}

即使有其他线程正在进行获取/释放锁,也要确保 unlock 状态得到传播,这是源码注释的内容,暂时未理解为什么要这么做。

逻辑实现上就是当 head 节点状态为 0 时设置状态为 PROPAGATE ,判断 h == head 这两个步骤,保证状态传播。如果 head 被修改表示有其他线程通过 AQS 队列获取锁,此时按理不再需要进行锁状态修改了,不是很明白。

4.5 newCondition 实现流程

ReadLock 不包含 Condition 功能,共享锁允许多个线程同时获取,没有线程等待和礼让的需求。调用当前方法将抛出 UnsupportedOperationException 异常。

---------本文结束感谢您的阅读---------

评论

 热烈欢迎各位大佬专家莅临玖涯博客指导检查!

 交换友链的朋友请前往友情链接

12 : 111
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×