- 介绍
- 代码
- 源码分析
- ReentrantLock#newCondition()
- AbstractQueuedSynchronizer#ConditionObject#await()
- AbstractQueuedSynchronizer#ConditionObject#addConditionWaiter()
- AbstractQueuedSynchronizer#ConditionObject#fullyRelease()
- AbstractQueuedSynchronizer#isOnSyncQueue()
- AbstractQueuedSynchronizer#findNodeFromTail()
- AbstractQueuedSynchronizer#acquireQueued()
- AbstractQueuedSynchronizer#ConditionObject#signal()
- AbstractQueuedSynchronizer和Condition
介绍
Condition
是一个多线程间协调通信的工具类。使得某个或者某些线程一起等待某个条件(Condition
),只有当该条件具备(signal()
或者signalAll()
被带调用)时,这些等待线程才会被唤醒,从而重新争夺锁。Condition
将Object
监视器方法(wait()
、notify()
和notifyAll()
)分解成截然不同的对象,以便通过将这些对象与任意Lock
实现组合使用,为每个对象提供多个等待set()
(wait-set
)。其中,Lock
替代synchronized
和语句的使用,Condition
替代Object
监视器方法的使用。Lock
对应Synchronized
,使用之前都要先获取锁。
– | Object | Condition |
---|---|---|
休眠 | wait | await |
唤醒个线程 | notify | signal |
唤醒所有线程 | notifyAll | signalAll |
Condition
它更强大的地方在于。能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition
,就是多个监视器的意思。在不同的情况下使用不同的Condition
。
代码
例1
将sync的1个线程通信的例子替换成用Condition实现。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73package conditions;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadTest2 {
public static void main(String[] args) {
final Business business = new Business();
//子线程
new Thread(new Runnable() {
@Override
public void run() {
threadExecute(business, "sub");
}
}).start();
//主线程
threadExecute(business, "main");
}
public static void threadExecute(Business business, String threadType) {
for (int i = 0; i < 5; i++) {
try {
if ("main".equals(threadType)) {
business.main(i);
} else {
business.sub(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Business {
private boolean bool = true;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void main(int loop) throws InterruptedException {
lock.lock();
try {
while (bool) {
condition.await();// this.wait();
}
for (int i = 0; i < 3; i++) {
System.out.println("main thread seq of " + i + ", loop of " + loop);
}
bool = true;
condition.signal();// this.notify();
} finally {
lock.unlock();
}
}
public void sub(int loop) throws InterruptedException {
lock.lock();
try {
while (!bool) {
condition.await();// this.wait();
}
for (int i = 0; i < 3; i++) {
System.out.println("-- sub thread seq of " + i + ", loop of " + loop);
}
bool = false;
condition.signal();// this.notify();
} finally {
lock.unlock();
}
}
}
1 | -- sub thread seq of 0, loop of 0 |
lock
和condition
可以代替sync
实现同步功能。在Condition
中,用await()
替换wait()
,用signal()
替换notify()
,用signalAll()
替换notifyAll()
,传统线程的通信方式,Condition
都可以实现。
例2
Condition
的强大之处在于它可以为多个线程间建立不同的Condition
。
例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒”读线程”;当从缓冲区读出数据之后,唤醒”写线程”;如果采用Object
类中的wait()
, notify()
, notifyAll()
实现该缓冲区,当向缓冲区写入数据之后需要唤醒”读线程”时,不可能通过notify()
或notifyAll()
明确的指定唤醒”读线程”,而只能通过notifyAll()
唤醒所有线程(但是notifyAll()
无法区分唤醒的线程是读线程,还是写线程)。
通过Condition
,就能明确的指定唤醒读线程。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51package conditions;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
final Lock lock = new ReentrantLock();// 锁对象
final Condition notFull = lock.newCondition();// 写线程条件
final Condition notEmpty = lock.newCondition();// 读线程条件
final Object[] items = new Object[100];// 缓存队列
int putptr/* 写索引 */, takeptr/* 读索引 */, count/* 队列中存在的数据个数 */;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length){
// 如果队列满了
notFull.await();// 阻塞写线程
}
items[putptr] = x;// 赋值
if (++putptr == items.length){
putptr = 0;// 如果写索引写到队列的最后一个位置了,那么置为0
}
++count;// 个数++
notEmpty.signal();// 唤醒读线程
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0){
// 如果队列为空
notEmpty.await();// 阻塞读线程
}
Object x = items[takeptr];// 取值
if (++takeptr == items.length){
takeptr = 0;// 如果读索引读到队列的最后一个位置了,那么置为0
}
--count;// 个数--
notFull.signal();// 唤醒写线程
return x;
} finally {
lock.unlock();
}
}
}
这是一个处于多线程工作环境下的缓存区,缓存区提供了两个方法,put()
和take()
,put()
是存数据,take()
是取数据,内部有个缓存队列,具体变量和方法说明见代码,这个缓存区类实现的功能:有多个线程往里面存数据和从里面取数据,其缓存队列(先进先出后进后出)能缓存的最大数值是100,多个线程间是互斥的,当缓存队列中存储的值达到100时,将写线程阻塞,并唤醒读线程,当缓存队列中存储的值为0时,将读线程阻塞,并唤醒写线程,这也是ArrayBlockingQueue
的内部实现。
例3
简单阻塞、唤醒。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46package conditions;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Test {
public static void main(String[] args) {
final ReentrantLock reentrantLock =new ReentrantLock();
final Condition condition=reentrantLock.newCondition();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
reentrantLock.lock();
System.out.println("我要等一个新信号!"+this);
condition.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("拿到一个信号!"+this);
reentrantLock.unlock();
}
});
thread1.start();
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
reentrantLock.lock();
System.out.println("我拿到锁!"+this);
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
condition.signalAll();
System.out.println("我发一个信号!"+this);
reentrantLock.unlock();
System.out.println(" ------------- thread 2 unlock 结束");
}
});
thread2.start();
}
}
1 | 我要等一个新信号!conditions.Test$1@51493995 |
signalall()
唤醒操作要等thread2释放锁之后,thread1才能获取到锁。
锁被释放后,线程1开始沉睡,这个时候线程因为线程1沉睡时,会唤醒AQS
队列中的头结点(AQS
队列存着等待唤醒的线程),所以线程2会开始竞争锁,并获取到,等待3秒后,线程2会调用signal()
,”发出”signal信号。
源码分析
ReentrantLock#newCondition()
1 | /** |
AbstractQueuedSynchronizer#ConditionObject#await()
condition
调用await()
,调用的是AQS
的内部类ConditionObject
。查看ConditionObject#await()
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 /**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with
* saved state as argument, throwing
* IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1
Node node = addConditionWaiter();
// 2
int savedState = fullyRelease(node);
int interruptMode = 0;
// 3
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
标注代码分析
- 添加当前线程到
Condition
的等待队列中,等待唤醒。 - 释放当前占有锁。在调用
await()
之前,当前这个线程是占有锁的。 - 遍历
AQS
的队列,看当前节点是否在队列中,如果不在队列中,说明还没有获取到锁(AQS
队列,是获取到锁的线程)。isOnSyncQueue
判断是否在AQS
同步队列中。findNodeFromTail
循环遍历,判断当前线程是不是等于AQS
尾节点线程。如果是尾节点线程,则退出当前while
。 - 被唤醒后,重新开始正式竞争锁,同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争。
AbstractQueuedSynchronizer#ConditionObject#addConditionWaiter()
1 | /** |
AbstractQueuedSynchronizer#ConditionObject#fullyRelease()
1 | /** |
AbstractQueuedSynchronizer#isOnSyncQueue()
1 | /** |
AbstractQueuedSynchronizer#findNodeFromTail()
1 | /** |
AbstractQueuedSynchronizer#acquireQueued()
1 | /* |
AbstractQueuedSynchronizer#ConditionObject#signal()
1 | /** |
标注代码分析
firstWaiter
是Condition
维护的队列的第一个线程,准备唤醒此线程。AbstractQueuedSynchronizer和Condition
AbstractQueuedSynchronizer
维护竞争到锁的线程,有锁的线程进入AQS
队列。Condition
维护等待队列的头节点和尾节点,该队列的作用是存放等待signal
信号的线程,该线程被封装为Node
节点后存放于此。AbstractQueuedSynchronizer#ConditionObject()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22/**
* Condition implementation for a {@link
* AbstractQueuedSynchronizer} serving as the basis of a {@link
* Lock} implementation.
*
* <p>Method documentation for this class describes mechanics,
* not behavioral specifications from the point of view of Lock
* and Condition users. Exported versions of this class will in
* general need to be accompanied by documentation describing
* condition semantics that rely on those of the associated
* <tt>AbstractQueuedSynchronizer</tt>.
*
* <p>This class is Serializable, but all fields are transient,
* so deserialized conditions have no waiters.
*/
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
....
AQS
和Condition
队列作用是不同的,实际上每个线程也仅仅存在2个队列中的一个。
参考例3
- 线程1调用
ReentrantLock#lock
时,线程被加入到AQS
的等待队列中。(此线程获取到锁,加入AQS
中) - 线程1调用
await()
被调用时,该线程从AQS
中移除,对应操作是锁的释放。接着马上被加入到Condition
的等待队列中,等待该线程唤醒信号。 - 线程2因为线程1释放锁的关系,被唤醒,并判断可以获取锁,于是线程2获取锁,并被加入到
AQS
的等待队列中。 - 线程2调用
signal()
,这个时候Condition
的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS
的等待队列中。注意,这个时候,线程1 并没有被唤醒。(线程2执行unlock()
时才被唤醒) signal()
执行完毕,线程2调用ReentrantLock#unLock()
,释放锁。这个时候因为AQS
中只有线程1,于是,AQS
释放锁后按从头到尾的顺序唤醒线程时,线程1被唤醒,于是线程1恢复执行。直到释放所整个过程执行完毕。
doSignal
删除Condition
队列的尾部等待节点线程(设置为null
),头节点的下一个节点线程也是null
。
AbstractQueuedSynchronizer#ConditionObject#doSignal()
1 | /** |
AbstractQueuedSynchronizer#transferForSignal()
1 | /** |
标注代码分析
- 可以看到,正常情况
ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
这个判断是不会为true
的,所以,不会在这个时候唤醒该线程。 只有到发送signal
信号的线程调用ReentrantLock.unlock()
后因为它已经被加到AQS的等待队列中,所以才会被唤醒。