Java 锁系列(四)——ReentrantLock源码Condition实现分析

Java 锁系列(四)——ReentrantLock源码Condition实现分析

一、Condition 概述

Condition 是个接口,依赖于 Lock 接口的实现,基本的方法就是 await()signal() 方法,是在 java 1.5 中才出现的,用于替代 Objectwait()notify() 实现线程间的协作,相比使用 Objectwait()notify(),使用 Conditionawait()signal() 这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,阻塞队列实际上是使用了Condition来模拟线程间协作。
调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用 Conditon中的await()对应Object的wait();

Condition 接口共包含以下这些方法:

方法功能
void await() throws InterruptedException使当前线程等待,直到它收到信号或被中断。
void awaitUninterruptibly()导致当前线程等待,直到它收到信号,不响应中断。
long awaitNanos(long nanosTimeout) throws InterruptedException使当前线程等待,直到它被发出信号或被中断,或者指定的等待时间过去。
boolean await(long time, TimeUnit unit) throws InterruptedException使当前线程等待,直到它被发出信号或被中断,或者指定的等待时间过去。
boolean awaitUntil(Date deadline) throws InterruptedException导致当前线程等待,直到它被发出信号或被中断,或者指定的截止日期过去。
void signal()唤醒一个等待线程。
void signalAll()唤醒所有等待的线程。

1.1 newCondition 方法实现

newCondition 执行链路为 newCondition -> sync.newCondition() -> new ConditionObject()

创建的是 ConditionObject 类的对象,ConditionObjectReentrantLock 的内部类。

二、await 等待实现

2.1 条件等待流程

使当前线程等待,直到它收到信号或被中断。

先创建一个条件等待节点,然后在 fullyRelease 方法中释放当前线程获取锁的状态,并存储锁的状态码。存储状态码完成,通过 fullyRelease 判断当前 node 是否在同步队列中,如果仅在等待队列中则阻塞线程。signal 方法的调用或者中断将唤醒线程,线程唤醒后进入同步队列从新排队等待获取锁。

boolean await(long time, TimeUnit unit) 方法将等待指定时间,程序执行流程和 四、awaitNanos 定时等待实现 相同,不多加赘述。

2.2 await线程等待

public final void await() throws InterruptedException {
    // 当前线程为中断状态
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加一个条件等待节点
    Node node = addConditionWaiter();
    // 释放锁,返回获得节点状态
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 如果等待节点不在同步队列中,阻塞线程(等待队列进行唤醒)
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 检查中断
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 进行获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        // 清理队列中取消的节点
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        // 根据中断状态判断判断是否抛出异常或者中断
        reportInterruptAfterWait(interruptMode);
}

2.3 addConditionWaiter 等待队列添加节点

为当前线程创建 CONDITION 状态的节点,并将节点插入队列。

private Node addConditionWaiter() {
    // 等待队列的末尾节点
    Node t = lastWaiter;
    // 如果节点已被取消,则清除节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 整理队列,清除已被取消的节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 创建节点
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 节点插入队列
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

2.4 unlinkCancelledWaiters 清除已取消节点

更新等待队列,从条件队列中取消链接已取消的等待节点。 仅在持有锁时调用。

private void unlinkCancelledWaiters() {
    // 取得首个节点
    Node t = firstWaiter;
    // 缓存上个节点
    Node trail = null;
    while (t != null) {
        // 下一个等待节点
        Node next = t.nextWaiter;
        // 当前节点已被取消
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            // 未缓存上个节点,表示当前节点为首节点
            if (trail == null)
                // 下一个节点做为首个节点
                firstWaiter = next;
            else
                // 跳过当前节点
                trail.nextWaiter = next;
            // 如果没有下一个节点,表示被取消的节点是last节点
            if (next == null)
                lastWaiter = trail;
        }
        else
            // 节点未被取消,更新上个节点的引用
            trail = t;
        t = next;
    }
}

2.5 fullyRelease 释放锁

使用当前状态值调用 release; 返回保存状态。 取消节点并在失败时抛出异常。

final long fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取锁状态码
        long savedState = getState();
        // 释放锁,直接释放所有重入
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            // 释放锁失败
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
        // 未释放锁,取消当前节点状态
            node.waitStatus = Node.CANCELLED;
    }
}

2.6 checkInterruptWhileWaiting 检查中断

如果是 signal 方法执行前中断则为 THROW_IE 否则为 REINTERRUP

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        // 如果是signal前中断则为 THROW_IE 否则为 REINTERRUPT
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

2.7 transferAfterCancelledWait 检查中断

transferAfterCancelledWait 是实际检查中断的方法。中断可以分为两种,如果修改 waitStatus 值成功,表示 signal 还未被执行,如果修改失败,表示 signal 已经执行。但是有可能 signal 中刚修改了 waitStatus ,还未将节点加入同步队列,线程就失去了CPU,所以需要进行判断,如果还未加入队列,执行 Thread.yield() 暂时让出CPU。

final boolean transferAfterCancelledWait(Node node) {
    // 值修改成功,表示还未执行 signal
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 加入同步队列
        enq(node);
        return true;
    }
    // 判断是否在同步队列中
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

三、awaitUninterruptibly 等待实现

导致当前线程等待,直到它收到信号,不响应中断。

整体执行流程和 await 相同,但是 awaitUninterruptibly 不需要包含检查中断和将节点加入同步队列的逻辑,因为当前方法不响应中断,仅收到 signal 信号才会停止等待,signal 中会将节点加入到同步队列。

public final void awaitUninterruptibly() {
    // 添加一个条件等待节点
    Node node = addConditionWaiter();
    // 释放锁,返回获得节点状态
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 保存中断状态,不响应中断
        if (Thread.interrupted())
            interrupted = true;
    }
    // 等待获取锁
    if (acquireQueued(node, savedState) || interrupted)
        // 中断当前线程
        selfInterrupt();
}

四、awaitNanos 定时等待实现

使当前线程等待,直到它被发出信号或被中断,或者指定的等待时间过去。

此方法的实现和 boolean awaitUntil(Date deadline)boolean await(long time, TimeUnit unit) 相同。

public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    // 当前线程为中断状态
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加一个条件等待节点
    Node node = addConditionWaiter();
    // 释放锁,返回获得节点状态
    int savedState = fullyRelease(node);
    // 取得等待到时的时间
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    // 如果等待节点不在同步队列中,阻塞线程(等待队列进行唤醒)
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            // 定时到,将节点加入同步队列
            transferAfterCancelledWait(node);
            break;
        }
        // 阻塞指定时间
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        // 检查中断
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        // 更新等待时间
        nanosTimeout = deadline - System.nanoTime();
    }
    // 进行获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        // 清理队列中取消的节点
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        // 根据中断状态判断判断是否抛出异常或者中断
        reportInterruptAfterWait(interruptMode);
    return deadline - System.nanoTime();
}

五、signal 信号实现

5.1 信号实现流程

signal 用于给 await 中阻塞的一个线程一个取消等待的信号,此时 doSignal 方法将节点从等待队列剔除,transferForSignal 方法中将节点加入同步队列。接收到信号的线程并不能立即获取到锁,还需要等待占有锁的线程释放锁。

5.2 signal 取消等待

public final void signal() {
    // 当前线程不是持有锁的线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        // 取消第一个节点的线程等待
        doSignal(first);
}

5.3 doSignal 取消等待

从等待队列中删除当前节点的引用。如果当前节点已被取消等待,则对当前节点唤醒失败,继续唤醒下一个等待节点。

private void doSignal(Node first) {
    do {
        // 当前节点已经是最后一个节点
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&	// 转换节点到同步队列,节点已被取消时返回false
             (first = firstWaiter) != null);
}

5.4 transferForSignal 转换节点

修改当前节点的状态,将当前节点插入到同步队列尾部。并修改前面一个节点的状态为 SIGNAL,等待排队唤醒,如果修改状态失败则直接唤醒当前节点的线程。

final boolean transferForSignal(Node node) {
    // 如果不能更改waitStatus,则该节点已被取消
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 节点加入同步队列尾部,并获得前面的一个节点(原tail节点)
    Node p = enq(node);
    int ws = p.waitStatus;
    //前面一个节点是被取消的等待队列节点或者修改状态为 SIGNAL 失败
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 唤醒线程
        LockSupport.unpark(node.thread);
    return true;
}

六、signalAll 信号实现

signalAll 通过 doSignalAll 方法进行批量唤醒,传入首个节点。

private void doSignalAll(Node first) {
    // 引用设置为空
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        // 清除引用
        first.nextWaiter = null;
        // 转换节点到同步队列,节点已被取消时返回false
        transferForSignal(first);
        first = next;
    } while (first != null);
}
---------本文结束感谢您的阅读---------

评论

 热烈欢迎各位大佬专家莅临玖涯博客指导检查!

 交换友链的朋友请前往友情链接

12 : 111
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×