JDK1.6 CyclicBarrier

介绍

  1. CyclicBarrier是一种可重复使用的栅栏机制,可以让一组线程在某个点上相互等待,这个点就可以类比为栅栏。并且这个栅栏是可重复使用的,这点可以和前面分析过的CountDownLatch做对比,CountDownLatch只能用一次。
  2. CyclicBarrier还支持在所有线程到达栅栏之后,在所有线程从等待状态转到可运行状态之前,执行一个命令(或者说是动作)。当然,在某些情况下,栅栏可以被打破。比如某个线程无法在规定的时间内到达栅栏。

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* A synchronization aid that allows a set of threads to all wait for
* each other to reach a common barrier point. CyclicBarriers are
* useful in programs involving a fixed sized party of threads that
* must occasionally wait for each other. The barrier is called
* <em>cyclic</em> because it can be re-used after the waiting threads
* are released.
*
* ....
*
* @since 1.5
* @see CountDownLatch
*
* @author Doug Lea
*/
public class CyclicBarrier {
/**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which <tt>count</tt> applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
// 1
private static class Generation {
boolean broken = false;
}

/** The lock for guarding barrier entry */
// 2
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
// 3
private final Condition trip = lock.newCondition();
/** The number of parties */
// 4
private final int parties;
/* The command to run when tripped */
// 5
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();

/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
// 6
private int count;

标注代码分析

  1. 每次对栅栏的使用可以表示为一个generation。栅栏每次开放或者重置,generation都会发生改变。使用栅栏的线程可以关联多个generations,由于等待线程可能会以多种方式请求锁,但是在特定的时间只有一个是可用的,其他的要么被打破,要么开放。如果一个栅栏已经被打破。且没有后续的重置动作,那么可以不存在可用的generation。
  2. 用于保护栅栏的锁。
  3. 栅栏开放的条件。
  4. 当前使用栅栏的使用方(线程)数量。
  5. 当栅栏开放时,要使用的命令。
  6. 处于等待状态的使用方(线程)的数量,在每一个generation上从parties递减为0。当新建generation(栅栏开放)或者栅栏被打破时,重置为parties。

CyclicBarrier内部使用ReentrantLock来实现,并包含一个trip条件,来作为栅栏模拟栅栏的行为。
具体使用栅栏时,各个线程会在要互相等待的地方调用一个await(),然后在这个方法处等待。当所有线程都到达次方法时,栅栏打开,所有线程从等待出继续执行。接下来就从这个await()入手开始分析。

CyclicBarrier#await()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Waits until all {@linkplain #getParties parties} have invoked
* <tt>await</tt> on this barrier.
*
* <p>If the current thread is not the last to arrive then it is
* disabled for thread scheduling purposes and lies dormant until
* one of the following things happens:
* <ul>
* <li>The last thread arrives; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* one of the other waiting threads; or
* <li>Some other thread times out while waiting for barrier; or
* <li>Some other thread invokes {@link #reset} on this barrier.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* ....
*
* @return the arrival index of the current thread, where index
* <tt>{@link #getParties()} - 1</tt> indicates the first
* to arrive and zero indicates the last to arrive
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws BrokenBarrierException if <em>another</em> thread was
* interrupted or timed out while the current thread was
* waiting, or the barrier was reset, or the barrier was
* broken when {@code await} was called, or the barrier
* action (if present) failed due an exception.
*/
// 1
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}

/**
* Waits until all {@linkplain #getParties parties} have invoked
* <tt>await</tt> on this barrier, or the specified waiting time elapses.
*
* .....
*
* @param timeout the time to wait for the barrier
* @param unit the time unit of the timeout parameter
* @return the arrival index of the current thread, where index
* <tt>{@link #getParties()} - 1</tt> indicates the first
* to arrive and zero indicates the last to arrive
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the specified timeout elapses
* @throws BrokenBarrierException if <em>another</em> thread was
* interrupted or timed out while the current thread was
* waiting, or the barrier was reset, or the barrier was broken
* when {@code await} was called, or the barrier action (if
* present) failed due an exception
*/
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

标注代码分析

  1. 线程调用此方法后等待,直到所有parties都调用当前barrier的这个方法。如果当前线程是不最后一个到达此方法的线程,那么会阻塞,直到下面的事情发生。
  • 最后的线程也到达此方法。
  • 其他线程中断了当前线程。
  • 其他线程中断了在栅栏处等待的某个线程。
  • 某个线程调用了栅栏的reset方法。

CyclicBarrier#dowait()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
* Main barrier code, covering the various policies.
*/
// 1
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
// 2
if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
// 3
breakBarrier();
throw new InterruptedException();
}
// 4
int index = --count;
// 5
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 6
if (command != null)
// 7
command.run();
ranAction = true;
// 8
nextGeneration();
return 0;
} finally {
// 9
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
// 10
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
// 11
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
// 12
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();
// 13
if (g != generation)
return index;

if (timed && nanos <= 0L) {
// 14
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

标注代码分析

  1. 栅栏主体代码,涵盖了所有情况。
  2. 如果当前generation状态为broken,说明栅栏被打破,抛出BrokenBarrierException异常。
  3. 如果当前线程被中断,打破栅栏,然后抛出中断异常。
  4. 计算当前到达线程的下标。
  5. 下标为0表示当前线程为最后一个使用栅栏的线程。
  6. 如果有栅栏命令,执行栅栏命令。
  7. 看来栅栏的命令是由最后一个到达栅栏的线程执行。
  8. 产生新的generation。
  9. 如果栅栏命令未执行,打破栅栏。
  10. 等待中的主循环,直到栅栏开放、栅栏被打破、线程被打断或者超时时退出。
  11. 如果出于当前generation 且generation状态为未打破,那么打破栅栏。
  12. 如果没被中断的话,我们即将完成等待。所以这个中断被算作下一次执行的中断。
  13. 如果generation改变了,说明之前的栅栏已经开放,返回index。
  14. 如果超时,打破栅栏,并返回超时异常。

从dowait方法中可以看到,当所有使用者都到达时,栅栏开放,会调用nextGeneration方法;如果有其他情况(超时、中断等)发生,会调用breakBarrier方法。

CyclicBarrier#nextGeneration()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
// 1
private void nextGeneration() {
// signal completion of last generation
// 2
trip.signalAll();
// set up next generation
// 3
count = parties;
// 4
generation = new Generation();
}

标注代码分析

  1. 更新栅栏状态,唤醒所有在栅栏处等待的线程。这个方法只有在持有锁的情况下被调用。
  2. 唤醒所有在栅栏处等待的线程。
  3. 重置count。
  4. 生成新的generation。

CyclicBarrier#breakBarrier()

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
// 1
private void breakBarrier() {
// 2
generation.broken = true;
// 3
count = parties;
// 4
trip.signalAll();
}

标注代码分析

  1. 设置当前栅栏generation状态为运行状态,并唤醒栅栏处的等待线程。这个方法只有在持有锁的情况下被调用。
  2. 设置当前栅栏generation状态为运行状态。
  3. 重置count。
  4. 唤醒所有在栅栏处等待的线程。

CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* Creates a new <tt>CyclicBarrier</tt> that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

/**
* Creates a new <tt>CyclicBarrier</tt> that will trip when the
* given number of parties (threads) are waiting upon it, and
* does not perform a predefined action when the barrier is tripped.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties) {
this(parties, null);
}

/**
* Returns the number of parties required to trip this barrier.
*
* @return the number of parties required to trip this barrier
*/
public int getParties() {
return parties;
}

/**
* Queries if this barrier is in a broken state.
*
* @return {@code true} if one or more parties broke out of this
* barrier due to interruption or timeout since
* construction or the last reset, or a barrier action
* failed due to an exception; {@code false} otherwise.
*/
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}

/**
* Resets the barrier to its initial state. If any parties are
* currently waiting at the barrier, they will return with a
* {@link BrokenBarrierException}. Note that resets <em>after</em>
* a breakage has occurred for other reasons can be complicated to
* carry out; threads need to re-synchronize in some other way,
* and choose one to perform the reset. It may be preferable to
* instead create a new barrier for subsequent use.
*/
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 1
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

/**
* Returns the number of parties currently waiting at the barrier.
* This method is primarily useful for debugging and assertions.
*
* @return the number of parties currently blocked in {@link #await}
*/
// 2
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}

标注代码分析

  1. 重置逻辑,先打破当前的栅栏,然后建立一个新的。
  2. 获取在栅栏处等待的使用方(线程)数量。

总结

  1. 当建立一个使用方数量为n的栅栏时,栅栏内部有一个为n的计数。当使用方调用await方法时,如果其他n-1个使用方没有全部到达await方法(内部计数减1后,不等于0),那么使用方(线程)阻塞等待。
  2. 当第n个使用方调用await时,栅栏开放(内部计数减1后等于0),会唤醒所有在await方法上等待着的使用方(线程),大家一起通过栅栏,然后重置栅栏(内部计数又变成n),栅栏变成新建后的状态,可以再次使用。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×