了解Condition & LockSupport
1、Lock锁的协作类:Condition
1.1 什么是Condition?
任何一个Java对象都天然继承于Object类,在线程间实现通信的往往会应用到Object的几个方法,比如wait()、wait(long timeout)、wait(long timeout, int nanos)与notify()、notifyAll()几个方法实现等待/通知机制。
synchronized
可以使用继承自Object类的wait
和notify/notifyAll
实现线程在条件不满足时等待、条件满足时唤醒的功能。
使用ReentrantLock
比直接使用synchronized
更安全,可以替代synchronized
进行线程同步。
那么,用ReentrantLock
我们怎么编写wait
和notify
的功能呢?
答案是使用Condition
对象来实现wait
和notify
的功能。
从整体上来看Object的wait和notify/notifyAll是与对象监视器配合完成线程间的等待/通知机制,而Condition与Lock配合完成等待通知机制,前者是Java底层级别的实现,后者是语言级别的,具有更高的可控制性和扩展性。
两者除了在使用方式上不同外,在功能特性上也有很多的不同:
- Condition能支持不响应中断,而Object则不支持
- Condition能支持多个等待队列(new 多个Condition对象),而Object只能支持一个
- Condition能支持超时时间的设置,而Object不支持
参照Object的wait和notify/notifyAll方法,Condition也提供了同样的方法:
针对Object的wait方法
void await() throws InterruptedException:线程会释放当前锁、进入等待状态
long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者超时
boolean await(long time, TimeUnit unit)throws InterruptedException:同第二种,支持自定义时间单位
boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知,中断或者到了某个时间
针对Object的notify/notifyAll方法
void signal():唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。
void signalAll():与1的区别在于能够唤醒所有等待在condition上的线程
1.2 Condition底层原理
(1)等待队列
想要掌握Condition还是应该知道它的实现原理,现在我们一起来看看Condition的源码。
创建一个Condition对象是通过Lock.newCondition()
,而这个方法实际上就是new
出一个ConditionObject对象,该类是AQS的一个内部类,有兴趣可以去看看。前面我们说过,Condition要和Lock配合使用,也就是Condition和Lock是绑定在一起的,而Lock的实现原理又依赖于AQS,自然而然ConditionObject作为AQS的一个内部类无可厚非。
我们知道在锁机制的实现上,AQS内部维护了一个同步队列CLH,如果是独占式锁的话,所有获取锁失败的线程将采用尾插法插入到同步队列中。同样的,Condition内部也是使用同样的方式,内部维护了一个等待队列,所有调用Condition.await方法的线程会加入到这个等待队列中,并且线程状态转换为等待状态。另外注意到ConditionObject中有两个成员变量:
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
这样我们就可以看出来ConditionObject通过持有等待队列的头、尾指针来管理等待队列。
主要注意的是Node类复用了在AQS中的Node类,其节点状态和相关属性可以去看AQS的实现原理的文章,如果你能仔细看完这篇文章定会对Condition的理解易如反掌,对Lock体系的实现也会有一个质的提升。
Node类有这样一个属性:
//后继节点
Node nextWaiter;
进一步说明,等待队列是一个单向队列,而在之前说AQS时知道同步队列是一个双向队列
。接下来我们用一个demo,通过debug进去看是不是符合我们的猜想:
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
});
thread.start();
}
}
这段代码没有任何实际意义,甚至很臭,只是想证明下我们刚才的所想是否正确。
新建了10个线程,每个线程先获取锁,然后调用condition.await方法释放锁将当前线程加入到等待队列中,通过debug控制当走到第10个线程的时候查看firstWaiter
即等待队列中的头结点,debug情景图如下:
从这个图我们可以很清楚的看到这样几点:
调用Condition.await方法后线程依次尾插入到等待队列中,如图队列中的线程引用依次为Thread-0,Thread-1,Thread-2....Thread-9
等待队列是一个单向队列
通过实验验证,我们可以得出等待队列的结构图如下所示:
同时还有一点需要注意的是:我们可以多次调用Lock.newCondition()
方法创建多个Condition对象,也就是一个Lock可以持有多个等待队列。而在之前利用Object的方式实际上是指在对象Object对象监视器上只能拥有一个同步队列和一个等待队列,而并发包中的Lock拥有一个同步队列和多个等待队列。
示意图如下:
(2)await实现原理
当调用Condition.await()方法后会使得当前已获取Lock的线程进入到等待队列,即如果该线程能够从await()方法返回的话一定是该线程获取了与Condition相关联的Lock。
接下来,我们还是从源码的角度去看,只有熟悉了源码的底层逻辑我们的理解才是最深的。await()
方法源码:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 将当前线程包装成Node,尾插入到等待队列中
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 自旋等待获取到同步状态(即获取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 5. 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
代码的主要逻辑请看注释,我们都知道,当前线程调用Condition.await()方法后,会使得当前线程释放Lock,然后加入到等待队列中,直至被signal/signalAll后才会使得当前线程从等待队列中移至到同步队列CLH中去;直到再次获得了Lock后才会从await方法返回,或者在等待时被中断。
那么关于这个实现过程我们会有这样几个问题:
是怎样将当前线程添加到等待队列中去的?
释放锁的过程?
怎样才能从await方法退出?
而这段代码的逻辑就是告诉我们这三个问题的答案。
另外,在第1步中调用addConditionWaiter
将当前线程添加到等待队列中,该方法源码为:
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//将当前线程包装成Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
//尾插入
t.nextWaiter = node;
//更新lastWaiter
lastWaiter = node;
return node;
}
这段代码就很容易理解了,将当前节点包装成Node,如果等待队列的firstWaiter为null的话(等待队列为空队列),则将firstWaiter指向当前的Node;
否则,更新lastWaiter(尾节点)即可。
即通过尾插法的方式将当前线程封装的Node插入到等待队列中,同时可以看出等待队列是一个不带头结点的链式队列,之前我们学习AQS
时知道同步队列CLH是一个带头结点的链式队列,这是两者的一个区别。
将当前节点插入到等待对列
之后,会使当前线程释放Lock,由fullyRelease方法实现,fullyRelease源码为:
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
//成功释放同步状态
failed = false;
return savedState;
} else {
//不成功释放同步状态抛出异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
这段代码就很容易理解了,调用AQS的模板方法release方法释放AQS的同步状态并且唤醒在同步队列CLH中头结点的后继节点引用的线程。
如果释放成功则正常返回,若失败的话就抛出异常。
到目前为止,这两段代码已经解决了前面的两个问题的答案了,还剩下第3个问题,怎样从await方法退出?
现在回过头再来看await
方法有这样一段逻辑:
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
很显然,当线程第一次调用Condition.await()方法时,会进入到这个while()循环中,然后通过LockSupport.park(this)方法使得当前线程进入等待状态,那么要想退出这个await方法第一个前提条件,自然而然是要先退出这个while循环,出口就只剩下两个地方:
1. 逻辑走到break退出while循环
2. while循环中的逻辑判断为false
再看代码出现第1种情况的条件是:当前等待的线程被中断后代码会走到break退出;
第2种情况是:当前节点被移动到了同步队列CLH中(即另外线程调用的condition的signal或者signalAll方法),while中逻辑判断为false后结束while循环。
总结,就是当前线程被中断或者调用Condition.signal/Condition.signalAll方法,则当前节点移动到了同步队列CLH后 ,这是当前线程退出await方法的前提条件。当退出while循环后,就会调用acquireQueued(node, savedState)
,该方法的作用是在自旋过程中,线程不断尝试获取同步状态,直至成功(线程获取到Lock)。这样也说明了退出await方法必须是已经获得了Condition引用的Lock。
到目前为止,开头的三个问题我们通过阅读源码的方式已经完全找到了答案,也对await
方法的理解加深。await
方法示意图如下图:
如图,调用Condition.await方法的线程前提是必须已经获得了Lock,也就是当前线程是同步队列CLH中的头结点。
调用该方法后会使得当前线程所封装的Node采用尾插法插入到等待队列
中。
超时机制的支持
Condition还额外支持了超时机制,使用者可调用方法awaitNanos、awaitUtil
。这两个方法的实现原理,基本上与AQS中的tryAcquire方法如出一辙。
不响应中断的支持
要想不响应中断可以调用Condition.awaitUninterruptibly()
方法,该方法的源码为:
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
这段方法与上面的await方法基本一致,只不过减少了对中断的处理,并省略了reportInterruptAfterWait方法处理被中断的情况。
(3)signal/signalAll实现原理
signal
调用Condition的signal或者signalAll方法可以将等待队列中等待时间最长的节点移动到同步队列CLH中,使得该节点能够有机会获得Lock。
按照等待队列是先进先出(FIFO)的,所以等待队列的头节点必然是等待时间最长的节点,也就是每次调用Condition的signal方法是将头节点移动到同步队列CLH中。我们通过看源码的方式来看这样的猜想是不是对的,signal方法源码为:
public final void signal() {
//1. 先检测当前线程是否已经获取lock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
signal方法首先会检测当前线程是否已经获取Lock,如果没有获取Lock会直接抛出异常,如果已获取,则再拿到等待队列的头指针引用的节点,之后的操作的doSignal方法也是基于该节点。
下面我们来看看doSignal方法做了些什么事情,doSignal方法源码为:
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//1. 将头结点从等待队列中移除
first.nextWaiter = null;
//2. while中transferForSignal方法对头结点做真正的处理
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
具体逻辑请看注释,真正对头节点做处理的逻辑在transferForSignal中,该方法源码为:
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//1. 更新状态为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//2.将该节点移入到同步队列中去
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
关键逻辑请看注释,这段代码主要做了两件事情
1.将头结点的状态更改为CONDITION
2.调用enq方法,将该节点尾插入到同步队列中
现在我们可以得出结论:调用Condition的signal的前提条件是当前线程已经获取了Lock,该方法会使得等待队列中的头节点即等待时间最长的那个节点移入到同步队列CLH中,而移入到同步队列CLH后才有机会使得等待线程被唤醒,即从await方法中的LockSupport.park(this)方法中返回,从而才有机会使得调用await方法的线程成功退出。
signal执行示意图如下图:
signalAll
signalAll与signal方法的区别体现在doSignalAll方法上,前面我们已经知道**doSignal方法只会对等待队列的头节点进行操作,**而doSignalAll的源码为:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
该方法只不过是实现了等待队列中的每一个节点都移入到同步队列CLH
中,即“通知”当前调用Condition.await()方法的每一个线程。
1.2.4 await与signal/signalAll的结合思考
文章开篇提到等待/通知机制,通过使用Condition提供的await和signal/signalAll
方法就可以实现这种机制,而这种机制能够解决最经典的问题就是“生产者与消费者问题”,关于“生产者消费者问题”
之后会用单独的一篇文章进行讲解,这也是面试的高频考点。
await和signal和signalAll
方法就像一个开关控制着线程A(等待方)和线程B(通知方)。它们之间的关系可以用下面一个图来表现得更加贴切:
如图,
1、线程awaitThread先通过lock.lock()方法获取锁成功后,调用了condition.await方法进入等待队列;
2、而另一个线程signalThread通过lock.lock()方法获取锁成功后,调用了condition.signal或者signalAll方法,使得线程awaitThread能够有机会移入到同步队列CLH中;那么当其他已释放lock的线程awaitThread能够有机会再次获取lock,从而使得线程awaitThread能够从await方法中退出并执行后续操作。
3、如果awaitThread获取lock失败会直接进入到同步队列。
1.3 一个例子
我们用一个很简单的例子说说Condition的用法:
public class AwaitSignal {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
private static volatile boolean flag = false;
public static void main(String[] args) {
Thread waiter = new Thread(new waiter());
waiter.start();
Thread signaler = new Thread(new signaler());
signaler.start();
}
static class waiter implements Runnable {
@Override
public void run() {
lock.lock();
try {
while (!flag) {
System.out.println(Thread.currentThread().getName() + "当前线程条件不满足,等待通知");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "当前线程接收到通知,条件满足");
} finally {
lock.unlock();
}
}
}
static class signaler implements Runnable {
@Override
public void run() {
lock.lock();
try {
flag = true;
condition.signalAll();
} finally {
lock.unlock();
}
}
}
}
输出结果:
Thread-0当前线程条件不满足,等待通知
Thread-0当前线程接收到通知,条件满足
开启了两个线程waiter和signaler,waiter线程开始执行的时候由于条件不满足,执行condition.await方法使该线程进入等待状态同时释放锁,signaler线程获取到锁之后更改条件,并通知所有的等待线程后,释放锁。这时,waiter线程再次获取到锁,并由于signaler线程更改了条件此时相对于waiter来说条件满足,继续执行。
1.4 总结
1、synchronized
可以使用继承自Object类的wait
和notify/notifyAll
实现线程在条件不满足时等待、条件满足时唤醒的功能。
2、Lock体系
使用Condition接口实现类的await和signal/signalAll
实现线程在条件不满足时等待、条件满足时唤醒的功能。
3、Condition的await和signal/signalAll
的实现离不开Condition内部的等待队列,以及AQS内部的同步队列CLH。
4、Condition可以替代wait和notify,Condition对象必须从Lock对象获取。
2、Lock锁的协作类:LockSupport
2.1 什么是LockSupport?
回顾一下,我们在之前介绍AQS的底层原理中、在介绍通过Condition实现线程间的等待/通知机制时,都会调用LockSupport.park()方法和LockSupport.unpark()方法,而这个在同步组件的底层实现中被频繁调用的LockSupport
到底是何方神圣,现在就来看看。
LockSupport
位于java.util.concurrent.locks包下,有兴趣的可以直接去看源码,该类的方法并不多。
LockSupprot
是线程的阻塞原语,用来阻塞线程和唤醒线程。
每个使用LockSupport
的线程都会与一个许可关联。
如果该许可可用,并且可在当前线程中使用,则调用 park()
时将会立即返回,否则调用 park()
时可能阻塞。
如果许可尚不可用,则可以调用 unpark()
使其可用。但是注意许可不可重入,也就是说只能调用一次park()
方法,否则会一直阻塞。
2.2 LockSupport中的方法
阻塞线程方法
- void park():阻塞当前线程,如果调用unpark()方法或者当前线程被中断,才能从park()方法中返回
- void park(Object blocker):功能同方法1,入参增加一个Object对象,用来记录导致线程阻塞的阻塞对象,方便进行问题排查
- void parkNanos(long nanos):阻塞当前线程,最长不超过nanos纳秒,增加了超时返回的特性
- void parkNanos(Object blocker, long nanos):功能同方法3,入参增加一个Object对象,用来记录导致线程阻塞的阻塞对象,方便进行问题排查
- void parkUntil(long deadline):阻塞当前线程,直到deadline
- void parkUntil(Object blocker, long deadline):功能同方法5,入参增加一个Object对象,用来记录导致线程阻塞的阻塞对象,方便进行问题排查
唤醒线程方法
void unpark(Thread thread):唤醒处于阻塞状态的指定线程
实际上LockSupport
阻塞和唤醒线程的功能是依赖于sun.misc.Unsafe
,这是一个很底层的类,有兴趣的可以去查阅资料,比如park()
方法的功能实现则是靠unsafe.park()
方法。
另外在阻塞线程这一系列方法中还有一个很有意思的现象是,每个方法都会新增一个带有Object的阻塞对象的重载方法。那么增加了一个Object对象的入参会有什么不同的地方呢?示例代码很简单就不说了,直接看dump线程的信息。
调用park()方法dump线程:
"main" #1 prio=5 os_prio=0 tid=0x02cdcc00 nid=0x2b48 waiting on condition [0x00d6f000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
at learn.LockSupportDemo.main(LockSupportDemo.java:7)
调用park(Object blocker)方法dump线程:
"main" #1 prio=5 os_prio=0 tid=0x0069cc00 nid=0x6c0 waiting on condition [0x00dcf000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x048c2d18> (a java.lang.String)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at learn.LockSupportDemo.main(LockSupportDemo.java:7)
通过分别调用这两个方法然后dump线程信息可以看出,带Object的park方法相较于无参的park方法会增加:
parking to wait for <0x048c2d18> (a java.lang.String)
这种信息就类似于记录“案发现场”,有助于工程人员能够迅速发现问题、解决问题。
其实有个有意思的事情,我们都知道如果使用synchronzed阻塞了线程,那么dump线程时都会有阻塞对象的描述,但是在Java 5推出LockSupport时遗漏了这一点,在Java 6时才进行了补充。
还有一点需要需要的是:synchronzed致使线程阻塞,线程会进入到BLOCKED状态;而调用LockSupprt方法阻塞线程会致使线程进入到WAITING状态!
2.3 一个例子
用一个很简单的例子来说明这些方法如何使用:
public class LockSupportDemo {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "被唤醒");
});
thread.start();
try {
System.out.println(Thread.currentThread().getName() + "急了吗");
Thread.sleep(2000);
LockSupport.unpark(thread);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出结果:
main急了吗
Thread-0被唤醒
thread线程调用LockSupport.park()致使thread被阻塞;
当mian线程睡眠2秒后,调用LockSupport.unpark(thread)方法唤醒thread线程,thread线程被唤醒才能执行后续操作;
有一点值得关注的是,LockSupport.unpark(thread)可以指定线程对象、唤醒指定的线程!
2.4 总结
LockSupprot
是线程的阻塞原语,用来阻塞线程和唤醒线程。
简而言之,当调用LockSupport.park()
时,表示当前线程将会等待,直至获得许可;
当调用LockSupport.unpark()
时,必须把等待获得许可的线程作为参数
进行传递,好让此线程继续运行。