JDK1.7 Condition

介绍

Condition是一个多线程间协调通信的工具类。使得某个或者某些线程一起等待某个条件(Condition),只有当该条件具备(signal()或者signalAll()被带调用)时,这些等待线程才会被唤醒,从而重新争夺锁。
ConditionObject监视器方法(wait()notify()notifyAll())分解成截然不同的对象,以便通过将这些对象与任意Lock实现组合使用,为每个对象提供多个等待set()wait-set)。其中,Lock替代synchronized和语句的使用,Condition替代Object监视器方法的使用。
Lock对应Synchronized,使用之前都要先获取锁。

Object Condition
休眠 wait await
唤醒个线程 notify signal
唤醒所有线程 notifyAll signalAll

Condition它更强大的地方在于。能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,就是多个监视器的意思。在不同的情况下使用不同的Condition

代码

例1

将sync的1个线程通信的例子替换成用Condition实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package conditions;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadTest2 {

public static void main(String[] args) {
final Business business = new Business();
//子线程
new Thread(new Runnable() {
@Override
public void run() {
threadExecute(business, "sub");
}
}).start();
//主线程
threadExecute(business, "main");
}

public static void threadExecute(Business business, String threadType) {
for (int i = 0; i < 5; i++) {
try {
if ("main".equals(threadType)) {
business.main(i);
} else {
business.sub(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Business {
private boolean bool = true;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

public void main(int loop) throws InterruptedException {
lock.lock();
try {
while (bool) {
condition.await();// this.wait();
}
for (int i = 0; i < 3; i++) {
System.out.println("main thread seq of " + i + ", loop of " + loop);
}
bool = true;
condition.signal();// this.notify();
} finally {
lock.unlock();
}
}

public void sub(int loop) throws InterruptedException {
lock.lock();
try {
while (!bool) {
condition.await();// this.wait();
}
for (int i = 0; i < 3; i++) {
System.out.println("-- sub thread seq of " + i + ", loop of " + loop);
}
bool = false;
condition.signal();// this.notify();
} finally {
lock.unlock();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
-- sub thread seq of 0, loop of 0
-- sub thread seq of 1, loop of 0
-- sub thread seq of 2, loop of 0
main thread seq of 0, loop of 0
main thread seq of 1, loop of 0
main thread seq of 2, loop of 0
-- sub thread seq of 0, loop of 1
-- sub thread seq of 1, loop of 1
-- sub thread seq of 2, loop of 1
main thread seq of 0, loop of 1
main thread seq of 1, loop of 1
main thread seq of 2, loop of 1
-- sub thread seq of 0, loop of 2
-- sub thread seq of 1, loop of 2
-- sub thread seq of 2, loop of 2
main thread seq of 0, loop of 2
main thread seq of 1, loop of 2
main thread seq of 2, loop of 2
-- sub thread seq of 0, loop of 3
-- sub thread seq of 1, loop of 3
-- sub thread seq of 2, loop of 3
main thread seq of 0, loop of 3
main thread seq of 1, loop of 3
main thread seq of 2, loop of 3
-- sub thread seq of 0, loop of 4
-- sub thread seq of 1, loop of 4
-- sub thread seq of 2, loop of 4
main thread seq of 0, loop of 4
main thread seq of 1, loop of 4
main thread seq of 2, loop of 4

lockcondition可以代替sync实现同步功能。在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现。

例2

Condition的强大之处在于它可以为多个线程间建立不同的Condition
例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒”读线程”;当从缓冲区读出数据之后,唤醒”写线程”;如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒”读线程”时,不可能通过notify()notifyAll()明确的指定唤醒”读线程”,而只能通过notifyAll()唤醒所有线程(但是notifyAll()无法区分唤醒的线程是读线程,还是写线程)。
通过Condition,就能明确的指定唤醒读线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package conditions;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
final Lock lock = new ReentrantLock();// 锁对象
final Condition notFull = lock.newCondition();// 写线程条件
final Condition notEmpty = lock.newCondition();// 读线程条件

final Object[] items = new Object[100];// 缓存队列
int putptr/* 写索引 */, takeptr/* 读索引 */, count/* 队列中存在的数据个数 */;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length){
// 如果队列满了
notFull.await();// 阻塞写线程
}
items[putptr] = x;// 赋值
if (++putptr == items.length){
putptr = 0;// 如果写索引写到队列的最后一个位置了,那么置为0
}
++count;// 个数++
notEmpty.signal();// 唤醒读线程
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0){
// 如果队列为空
notEmpty.await();// 阻塞读线程
}
Object x = items[takeptr];// 取值
if (++takeptr == items.length){
takeptr = 0;// 如果读索引读到队列的最后一个位置了,那么置为0
}
--count;// 个数--
notFull.signal();// 唤醒写线程
return x;
} finally {
lock.unlock();
}
}
}

这是一个处于多线程工作环境下的缓存区,缓存区提供了两个方法,put()take()put()是存数据,take()是取数据,内部有个缓存队列,具体变量和方法说明见代码,这个缓存区类实现的功能:有多个线程往里面存数据和从里面取数据,其缓存队列(先进先出后进后出)能缓存的最大数值是100,多个线程间是互斥的,当缓存队列中存储的值达到100时,将写线程阻塞,并唤醒读线程,当缓存队列中存储的值为0时,将读线程阻塞,并唤醒写线程,这也是ArrayBlockingQueue的内部实现。

例3

简单阻塞、唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package conditions;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Test {
public static void main(String[] args) {
final ReentrantLock reentrantLock =new ReentrantLock();
final Condition condition=reentrantLock.newCondition();

Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
reentrantLock.lock();
System.out.println("我要等一个新信号!"+this);
condition.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("拿到一个信号!"+this);
reentrantLock.unlock();
}
});

thread1.start();

Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
reentrantLock.lock();
System.out.println("我拿到锁!"+this);
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
condition.signalAll();
System.out.println("我发一个信号!"+this);
reentrantLock.unlock();
System.out.println(" ------------- thread 2 unlock 结束");
}
});
thread2.start();
}
}

1
2
3
4
5
我要等一个新信号!conditions.Test$1@51493995
我拿到锁!conditions.Test$2@892b7c2
我发一个信号!conditions.Test$2@892b7c2
------------- thread 2 unlock 结束
拿到一个信号!conditions.Test$1@51493995

signalall()唤醒操作要等thread2释放锁之后,thread1才能获取到锁。
锁被释放后,线程1开始沉睡,这个时候线程因为线程1沉睡时,会唤醒AQS队列中的头结点(AQS队列存着等待唤醒的线程),所以线程2会开始竞争锁,并获取到,等待3秒后,线程2会调用signal(),”发出”signal信号。

源码分析

ReentrantLock#newCondition()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* Returns a {@link Condition} instance for use with this
* {@link Lock} instance.
*
* <p>The returned {@link Condition} instance supports the same
* usages as do the {@link Object} monitor methods ({@link
* Object#wait() wait}, {@link Object#notify notify}, and {@link
* Object#notifyAll notifyAll}) when used with the built-in
* monitor lock.
*
* <ul>
*
* <li>If this lock is not held when any of the {@link Condition}
* {@linkplain Condition#await() waiting} or {@linkplain
* Condition#signal signalling} methods are called, then an {@link
* IllegalMonitorStateException} is thrown.
*
* <li>When the condition {@linkplain Condition#await() waiting}
* methods are called the lock is released and, before they
* return, the lock is reacquired and the lock hold count restored
* to what it was when the method was called.
*
* <li>If a thread is {@linkplain Thread#interrupt interrupted}
* while waiting then the wait will terminate, an {@link
* InterruptedException} will be thrown, and the thread's
* interrupted status will be cleared.
*
* <li> Waiting threads are signalled in FIFO order.
*
* <li>The ordering of lock reacquisition for threads returning
* from waiting methods is the same as for threads initially
* acquiring the lock, which is in the default case not specified,
* but for <em>fair</em> locks favors those threads that have been
* waiting the longest.
*
* </ul>
*
* @return the Condition object
*/
public Condition newCondition() {
return sync.newCondition();
}

AbstractQueuedSynchronizer#ConditionObject#await()

condition调用await(),调用的是AQS的内部类ConditionObject。查看ConditionObject#await()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
     /**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with
* saved state as argument, throwing
* IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1
Node node = addConditionWaiter();
// 2
int savedState = fullyRelease(node);
int interruptMode = 0;
// 3
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

标注代码分析

  1. 添加当前线程到Condition的等待队列中,等待唤醒。
  2. 释放当前占有锁。在调用await()之前,当前这个线程是占有锁的。
  3. 遍历AQS的队列,看当前节点是否在队列中,如果不在队列中,说明还没有获取到锁(AQS队列,是获取到锁的线程)。isOnSyncQueue判断是否在AQS同步队列中。findNodeFromTail循环遍历,判断当前线程是不是等于AQS尾节点线程。如果是尾节点线程,则退出当前while
  4. 被唤醒后,重新开始正式竞争锁,同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争。

AbstractQueuedSynchronizer#ConditionObject#addConditionWaiter()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

AbstractQueuedSynchronizer#ConditionObject#fullyRelease()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

AbstractQueuedSynchronizer#isOnSyncQueue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}

AbstractQueuedSynchronizer#findNodeFromTail()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* @return true if present
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

AbstractQueuedSynchronizer#acquireQueued()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
* Various flavors of acquire, varying in exclusive/shared and
* control modes. Each is mostly the same, but annoyingly
* different. Only a little bit of factoring is possible due to
* interactions of exception mechanics (including ensuring that we
* cancel if tryAcquire throws exception) and other control, at
* least not without hurting performance too much.
*/

/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

AbstractQueuedSynchronizer#ConditionObject#signal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 1
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

标注代码分析

  1. firstWaiterCondition维护的队列的第一个线程,准备唤醒此线程。

    AbstractQueuedSynchronizer和Condition

    AbstractQueuedSynchronizer

    维护竞争到锁的线程,有锁的线程进入AQS队列。

    Condition

    维护等待队列的头节点和尾节点,该队列的作用是存放等待signal信号的线程,该线程被封装为Node节点后存放于此。

    AbstractQueuedSynchronizer#ConditionObject()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    /**
    * Condition implementation for a {@link
    * AbstractQueuedSynchronizer} serving as the basis of a {@link
    * Lock} implementation.
    *
    * <p>Method documentation for this class describes mechanics,
    * not behavioral specifications from the point of view of Lock
    * and Condition users. Exported versions of this class will in
    * general need to be accompanied by documentation describing
    * condition semantics that rely on those of the associated
    * <tt>AbstractQueuedSynchronizer</tt>.
    *
    * <p>This class is Serializable, but all fields are transient,
    * so deserialized conditions have no waiters.
    */
    public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
    ....

AQSCondition队列作用是不同的,实际上每个线程也仅仅存在2个队列中的一个。
参考例3

  1. 线程1调用ReentrantLock#lock时,线程被加入到AQS的等待队列中。(此线程获取到锁,加入AQS中)
  2. 线程1调用await()被调用时,该线程从AQS中移除,对应操作是锁的释放。接着马上被加入到Condition的等待队列中,等待该线程唤醒信号。
  3. 线程2因为线程1释放锁的关系,被唤醒,并判断可以获取锁,于是线程2获取锁,并被加入到AQS的等待队列中。
  4. 线程2调用signal(),这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候,线程1 并没有被唤醒。(线程2执行unlock()时才被唤醒)
  5. signal()执行完毕,线程2调用ReentrantLock#unLock(),释放锁。这个时候因为AQS中只有线程1,于是,AQS释放锁后按从头到尾的顺序唤醒线程时,线程1被唤醒,于是线程1恢复执行。直到释放所整个过程执行完毕。

doSignal删除Condition队列的尾部等待节点线程(设置为null),头节点的下一个节点线程也是null

AbstractQueuedSynchronizer#ConditionObject#doSignal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

AbstractQueuedSynchronizer#transferForSignal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  /**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal).
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
// 1
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

标注代码分析

  1. 可以看到,正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为true的,所以,不会在这个时候唤醒该线程。 只有到发送signal信号的线程调用ReentrantLock.unlock()后因为它已经被加到AQS的等待队列中,所以才会被唤醒。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×