Fork-Join源码分析

这篇源码分析基于JDK7。

Fork

通过分析一个Fork-Join任务的执行过程来分析Fork-Join的相关代码,主要侧重于分裂(Fork)/合并(Join)过程。

SumTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SumTask extends RecursiveTask<Long>{  
private static final int THRESHOLD = 10;

private long start;
private long end;

public SumTask(long n) {
this(1,n);
}

private SumTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if((end - start) <= THRESHOLD){
for(long l = start; l <= end; l++){
sum += l;
}
}else{
long mid = (start + end) >>> 1;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
left.fork();
right.fork();
sum = left.join() + right.join();
}
return sum;
}
private static final long serialVersionUID = 1L;
}

ForkJoinTask#fork()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Arranges to asynchronously execute this task. While it is not
* necessarily enforced, it is a usage error to fork a task more
* than once unless it has completed and been reinitialized.
* Subsequent modifications to the state of this task or any data
* it operates on are not necessarily consistently observable by
* any thread other than the one executing it unless preceded by a
* call to {@link #join} or related methods, or a call to {@link
* #isDone} returning {@code true}.
*
* <p>This method may be invoked only from within {@code
* ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask<V> fork() {
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);
return this;
}

ForkJoinWorkerThread#pushTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Pushes a task. Call only from this thread.
*
* @param t the task. Caller must ensure non-null.
*/
final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
// 1
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
// 2
UNSAFE.putOrderedObject(q, u, t);
// 3
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
pool.signalWork();
else if (s == m)
// 4
growQueue();
}
}

标注代码分析

  1. 这里首先根据当前的queueTop对队列(数组)长度取模来算出放置任务的下标,然后再通过下标算出偏移地址,提供给Unsafe使用。
  2. 设置任务。
  3. 修改queueTop
  4. 如果队列满了,扩展一下队列容量。

    ForkJoinWorkerThread#growQueue()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    /**
    * Creates or doubles queue array. Transfers elements by
    * emulating steals (deqs) from old array and placing, oldest
    * first, into new array.
    */
    private void growQueue() {
    ForkJoinTask<?>[] oldQ = queue;
    int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
    // 1
    if (size > MAXIMUM_QUEUE_CAPACITY)
    throw new RejectedExecutionException("Queue capacity exceeded");
    // 2
    if (size < INITIAL_QUEUE_CAPACITY)
    size = INITIAL_QUEUE_CAPACITY;
    ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
    int mask = size - 1;
    int top = queueTop;
    int oldMask;
    if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
    for (int b = queueBase; b != top; ++b) {
    long u = ((b & oldMask) << ASHIFT) + ABASE;
    Object x = UNSAFE.getObjectVolatile(oldQ, u);
    if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
    UNSAFE.putObjectVolatile
    (q, ((b & mask) << ASHIFT) + ABASE, x);
    }
    }
    }

标注代码分析

  1. 容量为原来的2倍,不超过MAXIMUM_QUEUE_CAPACITY(1 << 24)
  2. 最小为INITIAL_QUEUE_CAPACITY(1 << 13)

Join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Returns the result of the computation when it {@link #isDone is
* done}. This method differs from {@link #get()} in that
* abnormal completion results in {@code RuntimeException} or
* {@code Error}, not {@code ExecutionException}, and that
* interrupts of the calling thread do <em>not</em> cause the
* method to abruptly return by throwing {@code
* InterruptedException}.
*
* @return the computed result
*/
public final V join() {
// 1
if (doJoin() != NORMAL)
return reportResult();
else
return getRawResult();
}

标注代码分析

  1. 先调用doJoin(),如果doJoin()返回NORMAL,那么通过getRawResult()来获取结果;否则会调用reportResult()来处理和获取结果。

    ForkJoinTask#doJoin()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    /**
    * Primary mechanics for join, get, quietlyJoin.
    * @return status upon completion
    */
    private int doJoin() {
    Thread t; ForkJoinWorkerThread w; int s; boolean completed;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
    // 1
    if ((s = status) < 0)
    return s;
    // 2
    if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
    try {
    completed = exec();
    } catch (Throwable rex) {
    return setExceptionalCompletion(rex);
    }
    if (completed)
    return setCompletion(NORMAL);
    }
    // 3
    return w.joinTask(this);
    }
    else
    // 4
    return externalAwaitDone();
    }

标注代码分析

  1. 如果当前任务已经完成,直接返回状态。
  2. 如果当前任务恰好是当前工作线程的队列顶端的第一个任务,那么将该任务出队,然后执行。
  3. 否则调用当前工作线程的joinTask()
  4. 如果当前线程不是ForkJoin工作线程,那么调用externalAwaitDone()

    ForkJoinWorkerThread#unpushTask()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    /**
    * Specialized version of popTask to pop only if topmost element
    * is the given task. Called only by this thread.
    *
    * @param t the task. Caller must ensure non-null.
    */
    final boolean unpushTask(ForkJoinTask<?> t) {
    ForkJoinTask<?>[] q;
    int s;
    if ((q = queue) != null && (s = queueTop) != queueBase &&
    UNSAFE.compareAndSwapObject
    (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
    queueTop = s; // or putOrderedInt
    return true;
    }
    return false;
    }

这是另一个版本的popunpushTash()中仅当给定的t是队列顶端的任务,才会返回并移除t。

ForkJoinWorkerThread#joinTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Possibly runs some tasks and/or blocks, until joinMe is done.
*
* @param joinMe the task to join
* @return completion status on exit
*/
final int joinTask(ForkJoinTask<?> joinMe) {
// 1
ForkJoinTask<?> prevJoin = currentJoin;
// 2
currentJoin = joinMe;
for (int s, retries = MAX_HELP;;) {
if ((s = joinMe.status) < 0) {
// 3
currentJoin = prevJoin;
// 4
return s;
}
if (retries > 0) {
if (queueTop != queueBase) {
// 5
if (!localHelpJoinTask(joinMe))
retries = 0; // cannot help
}
else if (retries == MAX_HELP >>> 1) {
--retries; // check uncommon case
// 6
if (tryDeqAndExec(joinMe) >= 0)
// 7
Thread.yield(); // for politeness
}
else
retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
}
else {
retries = MAX_HELP; // restart if not done
pool.tryAwaitJoin(joinMe);
}
}
}

标注代码分析

  1. 记录之前的合并任务。
  2. 设置当前工作线程的合并任务。
  3. 如果合并任务已经完成,恢复之前的合并任务。
  4. 返回任务状态。
  5. 如果当前任务队列中有任务,尝试从当前队列顶端获取给定任务 (如果给定任务恰好在当前任务队列顶端的话)或者其他一个已经被取消的任务。
  6. 这里尝试一种特殊情况:如果给定的任务正好在其他工作线程的队列的底部,那么尝试窃取这个任务并执行。
  7. 如果没成功,这里出让一下CPU。

joinTask方法中主体是一个无限循环,里面会先尝试帮助合并的一些操作,失败的话会继续重试,最多尝试MAX_HELP次。超过了MAX_HELP无法继续尝试的话,就会调用tryAwaitJoin()等待合并任务。
尝试过程中,如果当前任务队列中有任务,会调用localHelpJoinTask(),如果方法调用失败会直接进入合并等待;否则会先进行helpJoinTask()的尝试,尝试MAX_HELP/2次,成功的话会一直尝试,直到给定的任务完成;如果helpJoinTask()尝试了MAX_HELP/2次都没有成功过,且本地队列一直没有任务,那么会进行一个特殊的尝试,会假设给定的任务在其他工作线程的任务队列的底部,然后去窃取这个任务,也会尝试MAX_HELP/2次。

ForkJoinWorkerThread#localHelpJoinTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* If present, pops and executes the given task, or any other
* cancelled task
*
* @return false if any other non-cancelled task exists in local queue
*/
private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
if ((s = queueTop) != queueBase && (q = queue) != null &&
(i = (q.length - 1) & --s) >= 0 &&
(t = q[i]) != null) {
if (t != joinMe && t.status >= 0)
// 1
return false;
if (UNSAFE.compareAndSwapObject
(q, (i << ASHIFT) + ABASE, t, null)) {
// 2
queueTop = s; // or putOrderedInt
t.doExec();
}
}
return true;
}

标注代码分析

  1. 如果当前工作线程的任务队列顶端的任务不是给定任务,且任务的状态是未取消(这里如果<0,一定是取消的任务),返回false。
  2. 取出给定任务或者一个被取消的任务。

如果当前任务队列顶端的任务是要合并的任务,或者是一个被取消的任务,那么尝试处理这个任务,返回成功;否则失败。

ForkJoinWorkerThread#tryDeqAndExec()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Performs an uncommon case for joinTask: If task t is at base of
* some workers queue, steals and executes it.
*
* @param t the task
* @return t's status
*/
private int tryDeqAndExec(ForkJoinTask<?> t) {
int m = pool.scanGuard & SMASK;
ForkJoinWorkerThread[] ws = pool.workers;
// 1
if (ws != null && ws.length > m && t.status >= 0) {
for (int j = 0; j <= m; ++j) {
ForkJoinTask<?>[] q; int b, i;
ForkJoinWorkerThread v = ws[j];
if (v != null &&
(b = v.queueBase) != v.queueTop &&
(q = v.queue) != null &&
(i = (q.length - 1) & b) >= 0 &&
q[i] == t) {
// 2
long u = (i << ASHIFT) + ABASE;
if (v.queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
v.queueBase = b + 1;
v.stealHint = poolIndex;
ForkJoinTask<?> ps = currentSteal;
currentSteal = t;
t.doExec();
currentSteal = ps;
}
break;
}
}
}
return t.status;
}

标注代码分析

  1. 扫描所有工作线程。
  2. 如果有工作线程的任务队列的底部正好是给定任务t。尝试窃取t后执行。

如果有工作线程的队列底部的任务正好是要合并的任务,那么窃取该任务然后处理之。

ForkJoinWorkerThread#helpJoinTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* Tries to locate and execute tasks for a stealer of the given
* task, or in turn one of its stealers, Traces
* currentSteal->currentJoin links looking for a thread working on
* a descendant of the given task and with a non-empty queue to
* steal back and execute tasks from. The implementation is very
* branchy to cope with potential inconsistencies or loops
* encountering chains that are stale, unknown, or of length
* greater than MAX_HELP links. All of these cases are dealt with
* by just retrying by caller.
*
* @param joinMe the task to join
* @param canSteal true if local queue is empty
* @return true if ran a task
*/
private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
boolean helped = false;
int m = pool.scanGuard & SMASK;
ForkJoinWorkerThread[] ws = pool.workers;
if (ws != null && ws.length > m && joinMe.status >= 0) {
int levels = MAX_HELP; // remaining chain length
ForkJoinTask<?> task = joinMe; // base of chain
outer:for (ForkJoinWorkerThread thread = this;;) {
// Try to find v, the stealer of task, by first using hint
// 1
ForkJoinWorkerThread v = ws[thread.stealHint & m];
if (v == null || v.currentSteal != task) {
// 2
for (int j = 0; ;) { // search array
if ((v = ws[j]) != null && v.currentSteal == task) {
// 3
thread.stealHint = j;
break; // save hint for next time
}
if (++j > m)
// 4
break outer; // can't find stealer
}
}
// Try to help v, using specialized form of deqTask
// 5
for (;;) {
ForkJoinTask<?>[] q; int b, i;
if (joinMe.status < 0)
// 6
break outer;
if ((b = v.queueBase) == v.queueTop ||
(q = v.queue) == null ||
(i = (q.length-1) & b) < 0)
// 7
break; // empty
long u = (i << ASHIFT) + ABASE;
ForkJoinTask<?> t = q[i];
if (task.status < 0)
// 8
break outer; // stale
// 9
if (t != null && v.queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
// 10
v.queueBase = b + 1;
v.stealHint = poolIndex;
ForkJoinTask<?> ps = currentSteal;
currentSteal = t;
t.doExec();
currentSteal = ps;
helped = true;
}
}
// Try to descend to find v's stealer
// 11
ForkJoinTask<?> next = v.currentJoin;
if (--levels > 0 && task.status >= 0 &&
next != null && next != task) {
task = next;
thread = v;
}
else
// 12
break; // max levels, stale, dead-end, or cyclic
}
}
return helped;
}

标注代码分析

  1. 找到线程thread的窃取者v。
  2. 如果thread没有窃取者或者v当前窃取的任务不是task,扫描工作线程数组。
  3. 如果找到了窃取线程,将其设置为thread的窃取线程。
  4. 没找到的话,直接跳出outer循环。
  5. 找到了窃取者v。
  6. 如果joinMe任务已经完成,跳出outer循环。
  7. 如果v的队列是空的,跳出当前循环。
  8. 如果task任务已经完成,跳出outer循环。
  9. 尝试窃取v的任务队列底部的任务。
  10. 窃取成功后,执行任务。
  11. 再去找v的窃取者,注意这里是一个链。
  12. 如果超过最大深度(MAX_HELP)或者task已经执行完成 或者 找到了头(next==null)或者出现循环退出。

这个方法的前提是当前线程需要join给定的任务joinMe,但是这个任务被其他线程(窃取者)窃取了。所以方法中首先找到窃取joinMe任务的工作线程v,如果找到了窃取者v,就会从v的任务队列中窃取任务来完成(帮助v完成任务)。但也有可能v也在join其他的任务(比如当前线程执行任务过程中,分裂出一个子任务A,工作线程v窃取了A,然后执行,执行过程中A由分裂出子任务A1,A1又被另一个工作线程v1给窃取了…是一个链),所以方法中要顺着这个链一直找下去,目的就是能尽快的合并joinMe任务。为了避免一些情况,这里尝试的最大链深度限定为MAX_HELP

ForkJoinPool#tryAwaitJoin()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Possibly blocks waiting for the given task to complete, or
* cancels the task if terminating. Fails to wait if contended.
*
* @param joinMe the task
*/
final void tryAwaitJoin(ForkJoinTask<?> joinMe) {
int s;
Thread.interrupted(); // clear interrupts before checking termination
// 1
if (joinMe.status >= 0) {
// 2
if (tryPreBlock()) {
// 3
joinMe.tryAwaitDone(0L);
// 4
postBlock();
}
else if ((ctl & STOP_BIT) != 0L)
// 5
joinMe.cancelIgnoringExceptions();
}
}

标注代码分析

  1. 如果joinMe未完成。
  2. 尝试阻塞等待之前的预操作。
  3. joinMe任务上阻塞等待。
  4. 被唤醒后的操作。
  5. 如果Pool关闭了,取消任务。调用其cancelIgnoringExceptions()

    ForkJoinPool#tryPreBlock()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    /**
    * Tries to increment blockedCount, decrement active count
    * (sometimes implicitly) and possibly release or create a
    * compensating worker in preparation for blocking. Fails
    * on contention or termination.
    *
    * @return true if the caller can block, else should recheck and retry
    */
    private boolean tryPreBlock() {
    int b = blockedCount;
    // 1
    if (UNSAFE.compareAndSwapInt(this, blockedCountOffset, b, b + 1)) {
    int pc = parallelism;
    do {
    ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w;
    int e, ac, tc, rc, i;
    long c = ctl;
    int u = (int)(c >>> 32);
    // 2
    if ((e = (int)c) < 0) {
    // skip -- terminating
    }
    else if ((ac = (u >> UAC_SHIFT)) <= 0 && e != 0 &&
    (ws = workers) != null &&
    (i = ~e & SMASK) < ws.length &&
    (w = ws[i]) != null) {
    // 3
    long nc = ((long)(w.nextWait & E_MASK) |
    (c & (AC_MASK|TC_MASK)));
    if (w.eventCount == e &&
    UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
    w.eventCount = (e + EC_UNIT) & E_MASK;
    if (w.parked)
    UNSAFE.unpark(w);
    return true; // release an idle worker
    }
    }
    else if ((tc = (short)(u >>> UTC_SHIFT)) >= 0 && ac + pc > 1) {
    // 4
    long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
    if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
    return true; // no compensation needed
    }
    else if (tc + pc < MAX_ID) {
    // 5
    long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
    if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
    addWorker();
    return true; // create a replacement
    }
    }
    // try to back out on any failure and let caller retry
    // 6
    } while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
    b = blockedCount, b - 1));
    }
    return false;
    }

标注代码分析

  1. 累加等待join任务的计数。
  2. 如果Pool关闭了,跳过。
  3. 如果当前活动的工作线程不大于cpu核数,且有线程在等待任务(处于空闲状态)。那么唤醒这个工作线程。
  4. 如果总的工作线程数量不少于cpu核心数量,且至少有一个活动的工作线程。尝试在总控信息上将AC递减。
  5. 如果不满足上面条件,这里会增加一个工作线程。
  6. 如果失败,这里会把刚才对b增加的1给减回去。

    ForkJoinTask#tryAwaitDone()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**
    * Tries to block a worker thread until completed or timed out.
    * Uses Object.wait time argument conventions.
    * May fail on contention or interrupt.
    *
    * @param millis if > 0, wait time.
    */
    final void tryAwaitDone(long millis) {
    int s;
    try {
    if (((s = status) > 0 ||
    (s == 0 &&
    UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&
    status > 0) {
    synchronized (this) {
    if (status > 0)
    wait(millis);
    }
    }
    } catch (InterruptedException ie) {
    // caller must check termination
    }
    }

阻塞等待(join任务)时,首先会检查join任务的状态,如果join任务未完成的话,才可以在join任务上等待。也就是说,join的运行状态(再回顾一下task的运行状态定义)必须大于等于0。如果join#status大于0,说明join任务上已经有其他工作线程等待了,当前线程直接等待就可以了;如果join#status等于0,说明当前线程是第一个要在join任务上阻塞等待的线程,那么会尝试将join的status改为SIGNAL(1),然后进行阻塞等待工作。注意方法中不会处理中断异常,需要外部来处理。

ForkJoinPool#postBlock()

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Decrements blockedCount and increments active count
*/
private void postBlock() {
long c;
// 1
do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, // no mask
c = ctl, c + AC_UNIT));
int b;
// 2
do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
b = blockedCount, b - 1));
}

标注代码分析

  1. 累加活动线程计数。
  2. 递减等待join任务的计数。

    ForkJoinTask#cancelIgnoringExceptions()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /**
    * Cancels, ignoring any exceptions thrown by cancel. Used during
    * worker and pool shutdown. Cancel is spec'ed not to throw any
    * exceptions, but if it does anyway, we have no recourse during
    * shutdown, so guard against this case.
    */
    final void cancelIgnoringExceptions() {
    try {
    cancel(false);
    } catch (Throwable ignore) {
    }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed or could not be
* cancelled for some other reason. If successful, and this task
* has not started when {@code cancel} is called, execution of
* this task is suppressed. After this method returns
* successfully, unless there is an intervening call to {@link
* #reinitialize}, subsequent calls to {@link #isCancelled},
* {@link #isDone}, and {@code cancel} will return {@code true}
* and calls to {@link #join} and related methods will result in
* {@code CancellationException}.
*
* <p>This method may be overridden in subclasses, but if so, must
* still ensure that these properties hold. In particular, the
* {@code cancel} method itself must not throw exceptions.
*
* <p>This method is designed to be invoked by <em>other</em>
* tasks. To terminate the current task, you can just return or
* throw an unchecked exception from its computation method, or
* invoke {@link #completeExceptionally}.
*
* @param mayInterruptIfRunning this value has no effect in the
* default implementation because interrupts are not used to
* control cancellation.
*
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning){
return setCompletion(CANCELLED) == CANCELLED;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Marks completion and wakes up threads waiting to join this task,
* also clearing signal request bits.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
*/
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
if (s != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}
1
private static final int CANCELLED   = -2;

cancelIgnoringExceptions()

cancelIgnoringExceptions()中的逻辑就是将任务运行状态设置为CANCELLED,然后唤醒在任务上等待的线程。

ForkJoinTask#getRawResult()

回到ForkJoinTask#join(),如果正常完成会调用getRawResult()

1
2
3
4
5
6
7
8
9
10
/**
* Returns the result that would be returned by {@link #join}, even
* if this task completed abnormally, or {@code null} if this task
* is not known to have been completed. This method is designed
* to aid debugging, as well as to support extensions. Its use in
* any other context is discouraged.
*
* @return the result, or {@code null} if not completed
*/
public abstract V getRawResult();

ForkJoinTask#getRawResult()未实现,交由子类去实现,比如在RecursiveTask

RecursiveTask#getRawResult()

1
2
3
4
5
6
7
public final V getRawResult() {
return result;
}

protected final void setRawResult(V value) {
result = value;
}

ForkJoinTask#reportResult()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Report the result of invoke or join; called only upon
* non-normal return of internal versions.
*/
private V reportResult() {
int s; Throwable ex;
// 1
if ((s = status) == CANCELLED)
throw new CancellationException();
// 2
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
UNSAFE.throwException(ex);
return getRawResult();
}

如果ForkJoinTask#join()中,任务非正常结束,会调用reportResult()
标注代码分析

  1. 如果任务状态为取消,抛出取消异常;
  2. 如果任务状态是异常结束,会从异常表中获取异常,获取到的话,抛出异常。

    总结

  • fork()会将任务添加到当前工作线程的任务队列的里面。
  • join()某个任务后,当前线程要做的首先是想办法完成这个任务,或者帮助加快这个任务的完成,如果这些尝试失败,当前线程就会在要join的任务(等待队列)上进行阻塞等待,等任务完成后被唤醒。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×