一、Condition 概述
Condition
是个接口,依赖于 Lock
接口的实现,基本的方法就是 await()
和 signal()
方法,是在 java 1.5
中才出现的,用于替代 Object
的 wait()
、notify()
实现线程间的协作,相比使用 Object
的 wait()
、notify()
,使用 Condition
的 await()
、signal()
这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,阻塞队列实际上是使用了Condition来模拟线程间协作。
调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用 Conditon中的await()对应Object的wait();
Condition
接口共包含以下这些方法:
方法 | 功能 |
---|---|
void await() throws InterruptedException | 使当前线程等待,直到它收到信号或被中断。 |
void awaitUninterruptibly() | 导致当前线程等待,直到它收到信号,不响应中断。 |
long awaitNanos(long nanosTimeout) throws InterruptedException | 使当前线程等待,直到它被发出信号或被中断,或者指定的等待时间过去。 |
boolean await(long time, TimeUnit unit) throws InterruptedException | 使当前线程等待,直到它被发出信号或被中断,或者指定的等待时间过去。 |
boolean awaitUntil(Date deadline) throws InterruptedException | 导致当前线程等待,直到它被发出信号或被中断,或者指定的截止日期过去。 |
void signal() | 唤醒一个等待线程。 |
void signalAll() | 唤醒所有等待的线程。 |
1.1 newCondition 方法实现
newCondition
执行链路为 newCondition
-> sync.newCondition()
-> new ConditionObject()
。
创建的是 ConditionObject
类的对象,ConditionObject
是 ReentrantLock
的内部类。
二、await 等待实现
2.1 条件等待流程
使当前线程等待,直到它收到信号或被中断。
先创建一个条件等待节点,然后在 fullyRelease
方法中释放当前线程获取锁的状态,并存储锁的状态码。存储状态码完成,通过 fullyRelease
判断当前 node
是否在同步队列中,如果仅在等待队列中则阻塞线程。signal
方法的调用或者中断将唤醒线程,线程唤醒后进入同步队列从新排队等待获取锁。
boolean await(long time, TimeUnit unit)
方法将等待指定时间,程序执行流程和 四、awaitNanos 定时等待实现 相同,不多加赘述。
2.2 await线程等待
public final void await() throws InterruptedException {
// 当前线程为中断状态
if (Thread.interrupted())
throw new InterruptedException();
// 添加一个条件等待节点
Node node = addConditionWaiter();
// 释放锁,返回获得节点状态
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果等待节点不在同步队列中,阻塞线程(等待队列进行唤醒)
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 检查中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 进行获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 清理队列中取消的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
// 根据中断状态判断判断是否抛出异常或者中断
reportInterruptAfterWait(interruptMode);
}
2.3 addConditionWaiter 等待队列添加节点
为当前线程创建 CONDITION
状态的节点,并将节点插入队列。
private Node addConditionWaiter() {
// 等待队列的末尾节点
Node t = lastWaiter;
// 如果节点已被取消,则清除节点
if (t != null && t.waitStatus != Node.CONDITION) {
// 整理队列,清除已被取消的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 节点插入队列
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
2.4 unlinkCancelledWaiters 清除已取消节点
更新等待队列,从条件队列中取消链接已取消的等待节点。 仅在持有锁时调用。
private void unlinkCancelledWaiters() {
// 取得首个节点
Node t = firstWaiter;
// 缓存上个节点
Node trail = null;
while (t != null) {
// 下一个等待节点
Node next = t.nextWaiter;
// 当前节点已被取消
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
// 未缓存上个节点,表示当前节点为首节点
if (trail == null)
// 下一个节点做为首个节点
firstWaiter = next;
else
// 跳过当前节点
trail.nextWaiter = next;
// 如果没有下一个节点,表示被取消的节点是last节点
if (next == null)
lastWaiter = trail;
}
else
// 节点未被取消,更新上个节点的引用
trail = t;
t = next;
}
}
2.5 fullyRelease 释放锁
使用当前状态值调用 release; 返回保存状态。 取消节点并在失败时抛出异常。
final long fullyRelease(Node node) {
boolean failed = true;
try {
// 获取锁状态码
long savedState = getState();
// 释放锁,直接释放所有重入
if (release(savedState)) {
failed = false;
return savedState;
} else {
// 释放锁失败
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
// 未释放锁,取消当前节点状态
node.waitStatus = Node.CANCELLED;
}
}
2.6 checkInterruptWhileWaiting 检查中断
如果是 signal
方法执行前中断则为 THROW_IE
否则为 REINTERRUP
。
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
// 如果是signal前中断则为 THROW_IE 否则为 REINTERRUPT
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
2.7 transferAfterCancelledWait 检查中断
transferAfterCancelledWait
是实际检查中断的方法。中断可以分为两种,如果修改 waitStatus
值成功,表示 signal
还未被执行,如果修改失败,表示 signal
已经执行。但是有可能 signal
中刚修改了 waitStatus
,还未将节点加入同步队列,线程就失去了CPU,所以需要进行判断,如果还未加入队列,执行 Thread.yield()
暂时让出CPU。
final boolean transferAfterCancelledWait(Node node) {
// 值修改成功,表示还未执行 signal
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 加入同步队列
enq(node);
return true;
}
// 判断是否在同步队列中
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
三、awaitUninterruptibly 等待实现
导致当前线程等待,直到它收到信号,不响应中断。
整体执行流程和 await
相同,但是 awaitUninterruptibly
不需要包含检查中断和将节点加入同步队列的逻辑,因为当前方法不响应中断,仅收到 signal
信号才会停止等待,signal
中会将节点加入到同步队列。
public final void awaitUninterruptibly() {
// 添加一个条件等待节点
Node node = addConditionWaiter();
// 释放锁,返回获得节点状态
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 保存中断状态,不响应中断
if (Thread.interrupted())
interrupted = true;
}
// 等待获取锁
if (acquireQueued(node, savedState) || interrupted)
// 中断当前线程
selfInterrupt();
}
四、awaitNanos 定时等待实现
使当前线程等待,直到它被发出信号或被中断,或者指定的等待时间过去。
此方法的实现和 boolean awaitUntil(Date deadline)
、boolean await(long time, TimeUnit unit)
相同。
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
// 当前线程为中断状态
if (Thread.interrupted())
throw new InterruptedException();
// 添加一个条件等待节点
Node node = addConditionWaiter();
// 释放锁,返回获得节点状态
int savedState = fullyRelease(node);
// 取得等待到时的时间
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// 如果等待节点不在同步队列中,阻塞线程(等待队列进行唤醒)
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
// 定时到,将节点加入同步队列
transferAfterCancelledWait(node);
break;
}
// 阻塞指定时间
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 检查中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 更新等待时间
nanosTimeout = deadline - System.nanoTime();
}
// 进行获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
// 清理队列中取消的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
// 根据中断状态判断判断是否抛出异常或者中断
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
五、signal 信号实现
5.1 信号实现流程
signal
用于给 await
中阻塞的一个线程一个取消等待的信号,此时 doSignal
方法将节点从等待队列剔除,transferForSignal
方法中将节点加入同步队列。接收到信号的线程并不能立即获取到锁,还需要等待占有锁的线程释放锁。
5.2 signal 取消等待
public final void signal() {
// 当前线程不是持有锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 取消第一个节点的线程等待
doSignal(first);
}
5.3 doSignal 取消等待
从等待队列中删除当前节点的引用。如果当前节点已被取消等待,则对当前节点唤醒失败,继续唤醒下一个等待节点。
private void doSignal(Node first) {
do {
// 当前节点已经是最后一个节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 转换节点到同步队列,节点已被取消时返回false
(first = firstWaiter) != null);
}
5.4 transferForSignal 转换节点
修改当前节点的状态,将当前节点插入到同步队列尾部。并修改前面一个节点的状态为 SIGNAL,等待排队唤醒,如果修改状态失败则直接唤醒当前节点的线程。
final boolean transferForSignal(Node node) {
// 如果不能更改waitStatus,则该节点已被取消
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 节点加入同步队列尾部,并获得前面的一个节点(原tail节点)
Node p = enq(node);
int ws = p.waitStatus;
//前面一个节点是被取消的等待队列节点或者修改状态为 SIGNAL 失败
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒线程
LockSupport.unpark(node.thread);
return true;
}
六、signalAll 信号实现
signalAll
通过 doSignalAll
方法进行批量唤醒,传入首个节点。
private void doSignalAll(Node first) {
// 引用设置为空
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
// 清除引用
first.nextWaiter = null;
// 转换节点到同步队列,节点已被取消时返回false
transferForSignal(first);
first = next;
} while (first != null);
}