- Fork
- Join
- ForkJoinTask#doJoin()
- ForkJoinWorkerThread#unpushTask()
- ForkJoinWorkerThread#joinTask()
- ForkJoinWorkerThread#localHelpJoinTask()
- ForkJoinWorkerThread#tryDeqAndExec()
- ForkJoinWorkerThread#helpJoinTask()
- ForkJoinPool#tryAwaitJoin()
- ForkJoinPool#tryPreBlock()
- ForkJoinTask#tryAwaitDone()
- ForkJoinPool#postBlock()
- ForkJoinTask#cancelIgnoringExceptions()
- ForkJoinTask#getRawResult()
- RecursiveTask#getRawResult()
- ForkJoinTask#reportResult()
- 总结
这篇源码分析基于JDK7。
Fork
通过分析一个Fork-Join任务的执行过程来分析Fork-Join的相关代码,主要侧重于分裂(Fork)/合并(Join)过程。
SumTask1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public class SumTask extends RecursiveTask<Long>{
private static final int THRESHOLD = 10;
private long start;
private long end;
public SumTask(long n) {
this(1,n);
}
private SumTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if((end - start) <= THRESHOLD){
for(long l = start; l <= end; l++){
sum += l;
}
}else{
long mid = (start + end) >>> 1;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
left.fork();
right.fork();
sum = left.join() + right.join();
}
return sum;
}
private static final long serialVersionUID = 1L;
}
ForkJoinTask#fork()
1 | /** |
ForkJoinWorkerThread#pushTask()
1 | /** |
标注代码分析
- 这里首先根据当前的
queueTop
对队列(数组)长度取模来算出放置任务的下标,然后再通过下标算出偏移地址,提供给Unsafe
使用。 - 设置任务。
- 修改
queueTop
。 - 如果队列满了,扩展一下队列容量。
ForkJoinWorkerThread#growQueue()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28/**
* Creates or doubles queue array. Transfers elements by
* emulating steals (deqs) from old array and placing, oldest
* first, into new array.
*/
private void growQueue() {
ForkJoinTask<?>[] oldQ = queue;
int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
// 1
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
// 2
if (size < INITIAL_QUEUE_CAPACITY)
size = INITIAL_QUEUE_CAPACITY;
ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
int mask = size - 1;
int top = queueTop;
int oldMask;
if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
for (int b = queueBase; b != top; ++b) {
long u = ((b & oldMask) << ASHIFT) + ABASE;
Object x = UNSAFE.getObjectVolatile(oldQ, u);
if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
UNSAFE.putObjectVolatile
(q, ((b & mask) << ASHIFT) + ABASE, x);
}
}
}
标注代码分析
- 容量为原来的2倍,不超过
MAXIMUM_QUEUE_CAPACITY(1 << 24)
。 - 最小为
INITIAL_QUEUE_CAPACITY(1 << 13)
。
Join
1 | /** |
标注代码分析
- 先调用
doJoin()
,如果doJoin()
返回NORMAL
,那么通过getRawResult()
来获取结果;否则会调用reportResult()
来处理和获取结果。ForkJoinTask#doJoin()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27/**
* Primary mechanics for join, get, quietlyJoin.
* @return status upon completion
*/
private int doJoin() {
Thread t; ForkJoinWorkerThread w; int s; boolean completed;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
// 1
if ((s = status) < 0)
return s;
// 2
if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
return setCompletion(NORMAL);
}
// 3
return w.joinTask(this);
}
else
// 4
return externalAwaitDone();
}
标注代码分析
- 如果当前任务已经完成,直接返回状态。
- 如果当前任务恰好是当前工作线程的队列顶端的第一个任务,那么将该任务出队,然后执行。
- 否则调用当前工作线程的
joinTask()
。 - 如果当前线程不是ForkJoin工作线程,那么调用
externalAwaitDone()
。ForkJoinWorkerThread#unpushTask()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17/**
* Specialized version of popTask to pop only if topmost element
* is the given task. Called only by this thread.
*
* @param t the task. Caller must ensure non-null.
*/
final boolean unpushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q;
int s;
if ((q = queue) != null && (s = queueTop) != queueBase &&
UNSAFE.compareAndSwapObject
(q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
queueTop = s; // or putOrderedInt
return true;
}
return false;
}
这是另一个版本的pop
。unpushTash()
中仅当给定的t是队列顶端的任务,才会返回并移除t。
ForkJoinWorkerThread#joinTask()
1 | /** |
标注代码分析
- 记录之前的合并任务。
- 设置当前工作线程的合并任务。
- 如果合并任务已经完成,恢复之前的合并任务。
- 返回任务状态。
- 如果当前任务队列中有任务,尝试从当前队列顶端获取给定任务 (如果给定任务恰好在当前任务队列顶端的话)或者其他一个已经被取消的任务。
- 这里尝试一种特殊情况:如果给定的任务正好在其他工作线程的队列的底部,那么尝试窃取这个任务并执行。
- 如果没成功,这里出让一下CPU。
joinTask
方法中主体是一个无限循环,里面会先尝试帮助合并的一些操作,失败的话会继续重试,最多尝试MAX_HELP
次。超过了MAX_HELP
无法继续尝试的话,就会调用tryAwaitJoin()
等待合并任务。
尝试过程中,如果当前任务队列中有任务,会调用localHelpJoinTask()
,如果方法调用失败会直接进入合并等待;否则会先进行helpJoinTask()
的尝试,尝试MAX_HELP/2
次,成功的话会一直尝试,直到给定的任务完成;如果helpJoinTask()
尝试了MAX_HELP/2
次都没有成功过,且本地队列一直没有任务,那么会进行一个特殊的尝试,会假设给定的任务在其他工作线程的任务队列的底部,然后去窃取这个任务,也会尝试MAX_HELP/2
次。
ForkJoinWorkerThread#localHelpJoinTask()
1 | /** |
标注代码分析
- 如果当前工作线程的任务队列顶端的任务不是给定任务,且任务的状态是未取消(这里如果<0,一定是取消的任务),返回false。
- 取出给定任务或者一个被取消的任务。
如果当前任务队列顶端的任务是要合并的任务,或者是一个被取消的任务,那么尝试处理这个任务,返回成功;否则失败。
ForkJoinWorkerThread#tryDeqAndExec()
1 | /** |
标注代码分析
- 扫描所有工作线程。
- 如果有工作线程的任务队列的底部正好是给定任务t。尝试窃取t后执行。
如果有工作线程的队列底部的任务正好是要合并的任务,那么窃取该任务然后处理之。
ForkJoinWorkerThread#helpJoinTask()
1 | /** |
标注代码分析
- 找到线程
thread
的窃取者v。 - 如果
thread
没有窃取者或者v当前窃取的任务不是task
,扫描工作线程数组。 - 如果找到了窃取线程,将其设置为
thread
的窃取线程。 - 没找到的话,直接跳出
outer
循环。 - 找到了窃取者v。
- 如果
joinMe
任务已经完成,跳出outer
循环。 - 如果v的队列是空的,跳出当前循环。
- 如果
task
任务已经完成,跳出outer
循环。 - 尝试窃取v的任务队列底部的任务。
- 窃取成功后,执行任务。
- 再去找v的窃取者,注意这里是一个链。
- 如果超过最大深度
(MAX_HELP)
或者task
已经执行完成 或者 找到了头(next==null)
或者出现循环退出。
这个方法的前提是当前线程需要join给定的任务joinMe
,但是这个任务被其他线程(窃取者)窃取了。所以方法中首先找到窃取joinMe
任务的工作线程v,如果找到了窃取者v,就会从v的任务队列中窃取任务来完成(帮助v完成任务)。但也有可能v也在join其他的任务(比如当前线程执行任务过程中,分裂出一个子任务A,工作线程v窃取了A,然后执行,执行过程中A由分裂出子任务A1,A1又被另一个工作线程v1给窃取了…是一个链),所以方法中要顺着这个链一直找下去,目的就是能尽快的合并joinMe
任务。为了避免一些情况,这里尝试的最大链深度限定为MAX_HELP
。
ForkJoinPool#tryAwaitJoin()
1 | /** |
标注代码分析
- 如果
joinMe
未完成。 - 尝试阻塞等待之前的预操作。
- 在
joinMe
任务上阻塞等待。 - 被唤醒后的操作。
- 如果
Pool
关闭了,取消任务。调用其cancelIgnoringExceptions()
。ForkJoinPool#tryPreBlock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58/**
* Tries to increment blockedCount, decrement active count
* (sometimes implicitly) and possibly release or create a
* compensating worker in preparation for blocking. Fails
* on contention or termination.
*
* @return true if the caller can block, else should recheck and retry
*/
private boolean tryPreBlock() {
int b = blockedCount;
// 1
if (UNSAFE.compareAndSwapInt(this, blockedCountOffset, b, b + 1)) {
int pc = parallelism;
do {
ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w;
int e, ac, tc, rc, i;
long c = ctl;
int u = (int)(c >>> 32);
// 2
if ((e = (int)c) < 0) {
// skip -- terminating
}
else if ((ac = (u >> UAC_SHIFT)) <= 0 && e != 0 &&
(ws = workers) != null &&
(i = ~e & SMASK) < ws.length &&
(w = ws[i]) != null) {
// 3
long nc = ((long)(w.nextWait & E_MASK) |
(c & (AC_MASK|TC_MASK)));
if (w.eventCount == e &&
UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
w.eventCount = (e + EC_UNIT) & E_MASK;
if (w.parked)
UNSAFE.unpark(w);
return true; // release an idle worker
}
}
else if ((tc = (short)(u >>> UTC_SHIFT)) >= 0 && ac + pc > 1) {
// 4
long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
return true; // no compensation needed
}
else if (tc + pc < MAX_ID) {
// 5
long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
addWorker();
return true; // create a replacement
}
}
// try to back out on any failure and let caller retry
// 6
} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
b = blockedCount, b - 1));
}
return false;
}
标注代码分析
- 累加等待join任务的计数。
- 如果
Pool
关闭了,跳过。 - 如果当前活动的工作线程不大于cpu核数,且有线程在等待任务(处于空闲状态)。那么唤醒这个工作线程。
- 如果总的工作线程数量不少于cpu核心数量,且至少有一个活动的工作线程。尝试在总控信息上将AC递减。
- 如果不满足上面条件,这里会增加一个工作线程。
- 如果失败,这里会把刚才对b增加的1给减回去。
ForkJoinTask#tryAwaitDone()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/**
* Tries to block a worker thread until completed or timed out.
* Uses Object.wait time argument conventions.
* May fail on contention or interrupt.
*
* @param millis if > 0, wait time.
*/
final void tryAwaitDone(long millis) {
int s;
try {
if (((s = status) > 0 ||
(s == 0 &&
UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&
status > 0) {
synchronized (this) {
if (status > 0)
wait(millis);
}
}
} catch (InterruptedException ie) {
// caller must check termination
}
}
阻塞等待(join任务)时,首先会检查join任务的状态,如果join任务未完成的话,才可以在join任务上等待。也就是说,join的运行状态(再回顾一下task的运行状态定义)必须大于等于0。如果join#status
大于0,说明join任务上已经有其他工作线程等待了,当前线程直接等待就可以了;如果join#status
等于0,说明当前线程是第一个要在join任务上阻塞等待的线程,那么会尝试将join的status
改为SIGNAL(1)
,然后进行阻塞等待工作。注意方法中不会处理中断异常,需要外部来处理。
ForkJoinPool#postBlock()
1 | /** |
标注代码分析
- 累加活动线程计数。
- 递减等待join任务的计数。
ForkJoinTask#cancelIgnoringExceptions()
1
2
3
4
5
6
7
8
9
10
11
12/**
* Cancels, ignoring any exceptions thrown by cancel. Used during
* worker and pool shutdown. Cancel is spec'ed not to throw any
* exceptions, but if it does anyway, we have no recourse during
* shutdown, so guard against this case.
*/
final void cancelIgnoringExceptions() {
try {
cancel(false);
} catch (Throwable ignore) {
}
}
1 | /** |
1 | /** |
1 | private static final int CANCELLED = -2; |
cancelIgnoringExceptions()
cancelIgnoringExceptions()
中的逻辑就是将任务运行状态设置为CANCELLED
,然后唤醒在任务上等待的线程。
ForkJoinTask#getRawResult()
回到ForkJoinTask#join()
,如果正常完成会调用getRawResult()
。1
2
3
4
5
6
7
8
9
10/**
* Returns the result that would be returned by {@link #join}, even
* if this task completed abnormally, or {@code null} if this task
* is not known to have been completed. This method is designed
* to aid debugging, as well as to support extensions. Its use in
* any other context is discouraged.
*
* @return the result, or {@code null} if not completed
*/
public abstract V getRawResult();
ForkJoinTask#getRawResult()
未实现,交由子类去实现,比如在RecursiveTask
。
RecursiveTask#getRawResult()
1 | public final V getRawResult() { |
ForkJoinTask#reportResult()
1 | /** |
如果ForkJoinTask#join()
中,任务非正常结束,会调用reportResult()
。
标注代码分析
fork()
会将任务添加到当前工作线程的任务队列的里面。join()
某个任务后,当前线程要做的首先是想办法完成这个任务,或者帮助加快这个任务的完成,如果这些尝试失败,当前线程就会在要join的任务(等待队列)上进行阻塞等待,等任务完成后被唤醒。