Fork-Join其他方法

ForkJoinTask

ForkJoinTask本身也是Future的实现,所以也会有取消过程。

ForkJoinTask#cancel()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed or could not be
* cancelled for some other reason. If successful, and this task
* has not started when {@code cancel} is called, execution of
* this task is suppressed. After this method returns
* successfully, unless there is an intervening call to {@link
* #reinitialize}, subsequent calls to {@link #isCancelled},
* {@link #isDone}, and {@code cancel} will return {@code true}
* and calls to {@link #join} and related methods will result in
* {@code CancellationException}.
*
* <p>This method may be overridden in subclasses, but if so, must
* still ensure that these properties hold. In particular, the
* {@code cancel} method itself must not throw exceptions.
*
* <p>This method is designed to be invoked by <em>other</em>
* tasks. To terminate the current task, you can just return or
* throw an unchecked exception from its computation method, or
* invoke {@link #completeExceptionally}.
*
* @param mayInterruptIfRunning this value has no effect in the
* default implementation because interrupts are not used to
* control cancellation.
*
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
return setCompletion(CANCELLED) == CANCELLED;
}

ForkJoinTask#setCompletion()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Marks completion and wakes up threads waiting to join this task,
* also clearing signal request bits.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
*/
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
if (s != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}

设置任务运行状态为取消,然后唤醒在任务上等待的线程。

ForkJoinPool#shutdown()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
* Tasks that are in the process of being submitted concurrently
* during the course of this method may or may not be rejected.
*
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public void shutdown() {
checkPermission();
shutdown = true;
tryTerminate(false);
}

ForkJoinPool#shutdownNow()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Attempts to cancel and/or stop all tasks, and reject all
* subsequently submitted tasks. Tasks that are in the process of
* being submitted or executed concurrently during the course of
* this method may or may not be rejected. This method cancels
* both existing and unexecuted tasks, in order to permit
* termination in the presence of task dependencies. So the method
* always returns an empty list (unlike the case for some other
* Executors).
*
* @return an empty list
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public List<Runnable> shutdownNow() {
checkPermission();
shutdown = true;
tryTerminate(true);
return Collections.emptyList();
}

可以用过调用shutdown()或者shutdownNow()来关闭poolshutdown()会等待之前提交到ForkJoinPool#task()完成再真正关闭pool,同时不会接受新提交的任务;而shutdownNow()会尝试取消之前提交到pool且没有完成的任务并关闭pool,也不会接受新提交的任务。

ForkJoinPool#tryTerminate()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Possibly initiates and/or completes termination.
*
* @param now if true, unconditionally terminate, else only
* if shutdown and empty queue and no active workers
* @return true if now terminating or terminated
*/
private boolean tryTerminate(boolean now) {
long c;
while (((c = ctl) & STOP_BIT) == 0) {
if (!now) {
if ((int)(c >> AC_SHIFT) != -parallelism)
return false;
if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
queueBase != queueTop) {
if (ctl == c) // staleness check
return false;
continue;
}
}
if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
startTerminating();
}
if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
final ReentrantLock lock = this.submissionLock;
lock.lock();
try {
termination.signalAll();
} finally {
lock.unlock();
}
}
return true;
}

now==true

首先会尝试设置停止标记总控信息ctl,设置成功的话为调用startTerminating()开始结束pool(设置失败会再次尝试直到成功),然后会判断pool中是否还有工作线程,没有的话会唤醒termination条件上的等待线程,然后返回true;没有就直接返回true。

now==false

如果当前还有活动工作线程,或还有阻塞等待join的工作线程,或者还有未完成的任务,方法会直接返回false。调用shutdown,里面的tryTerminate()返回false的话,因为shutdown里面设置了关闭状态shutdown为true,当工作线程处理完所有任务,空闲(idle)的时候会判断shutdown标识,如果为true的话会再次调用tryTerminate()。所以pool最终会关闭。

ForkJoinPool#startTerminating()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
   /**
* Runs up to three passes through workers: (0) Setting
* termination status for each worker, followed by wakeups up to
* queued workers; (1) helping cancel tasks; (2) interrupting
* lagging threads (likely in external tasks, but possibly also
* blocked in joins). Each pass repeats previous steps because of
* potential lagging thread creation.
*/
private void startTerminating() {
cancelSubmissions();
for (int pass = 0; pass < 3; ++pass) {
ForkJoinWorkerThread[] ws = workers;
if (ws != null) {
for (ForkJoinWorkerThread w : ws) {
if (w != null) {
w.terminate = true;
if (pass > 0) {
w.cancelTasks();
if (pass > 1 && !w.isInterrupted()) {
try {
w.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
}
terminateWaiters();
}
}
}
```
1. 取消`pool`中`submissionQueue`中的任务。
2. 将所有的工作线程的结束状态设置为true
3. 取消所有工作线程的任务队列中未完成的任务。
4. 中断所有工作线程。
5. 结束还在等待的工作线程。
## ForkJoinPool#awaitTermination()
``` java
/**
* Blocks until all tasks have completed execution after a shutdown
* request, or the timeout occurs, or the current thread is
* interrupted, whichever happens first.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return {@code true} if this executor terminated and
* {@code false} if the timeout elapsed before termination
* @throws InterruptedException if interrupted while waiting
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.submissionLock;
lock.lock();
try {
for (;;) {
if (isTerminated())
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
lock.unlock();
}
}

根据注释很好理解,判断pool是否真正结束了,没结束的话,就会在termination条件上等待;关闭了就返回true。这个方法可能超时。

ForkJoinPool#isTerminated()

1
2
3
4
5
6
7
8
9
10
/**
* Returns {@code true} if all tasks have completed following shut down.
*
* @return {@code true} if all tasks have completed following shut down
*/
public boolean isTerminated() {
long c = ctl;
return ((c & STOP_BIT) != 0L &&
(short)(c >>> TC_SHIFT) == -parallelism);
}

同时满足pool的总控信息中有停止标记且总的工作线程数为0。

ForkJoinPool#isTerminating()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Returns {@code true} if the process of termination has
* commenced but not yet completed. This method may be useful for
* debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have
* ignored or suppressed interruption, or are waiting for IO,
* causing this executor not to properly terminate. (See the
* advisory notes for class {@link ForkJoinTask} stating that
* tasks should not normally entail blocking operations. But if
* they do, they must abort them on interrupt.)
*
* @return {@code true} if terminating but not yet terminated
*/
public boolean isTerminating() {
long c = ctl;
return ((c & STOP_BIT) != 0L &&
(short)(c >>> TC_SHIFT) != -parallelism);
}

判断pool是否正在结束过程中,判断逻辑是pool的总控信息中有停止标记,但总的工作线程数不为0。

ForkJoinPool#isAtLeastTerminating()

1
2
3
4
5
6
/**
* Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
*/
final boolean isAtLeastTerminating() {
return (ctl & STOP_BIT) != 0L;
}

判断pool是否正在结束过程中或者已经结束,只要pool的总控信息中有停止标记就可以了。

ForkJoinPool#isShutdown()

1
2
3
4
5
6
7
8
/**
* Returns {@code true} if this pool has been shut down.
*
* @return {@code true} if this pool has been shut down
*/
public boolean isShutdown() {
return shutdown;
}

判断pool是否关闭,只要调用过shutdown或者shutdownNow(),这个就是true了。

ForkJoinWorkerThread

ForkJoinWorkerThread中定义了casSlotNull()writeSlot(),为了提高性能,这两个方法已经被UNSAFE实现了。

ForkJoinWorkerThread#casSlotNull()

1
2
3
4
5
6
7
8
/**
* CASes slot i of array q from t to null. Caller must ensure q is
* non-null and index is in range.
*/
private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
ForkJoinTask<?> t) {
return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
}

ForkJoinWorkerThread#writeSlot()

1
2
3
4
5
6
7
8
9
/**
* Performs a volatile write of the given task at given slot of
* array q. Caller must ensure q is non-null and index is in
* range. This method is used only during resets and backouts.
*/
private static final void writeSlot(ForkJoinTask<?>[] q, int i,
ForkJoinTask<?> t) {
UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
}

ForkJoinWorkerThread#peekTask()

1
2
3
4
5
6
7
8
9
10
11
/**
* Returns next task, or null if empty or contended.
*/
final ForkJoinTask<?> peekTask() {
int m;
ForkJoinTask<?>[] q = queue;
if (q == null || (m = q.length - 1) < 0)
return null;
int i = locallyFifo ? queueBase : (queueTop - 1);
return q[i & m];
}

获取当前任务队列中的下一个(可获取的)任务。

ForkJoinTask#peekNextLocalTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Returns, but does not unschedule or execute, a task queued by
* the current thread but not yet executed, if one is immediately
* available. There is no guarantee that this task will actually
* be polled or executed next. Conversely, this method may return
* null even if a task exists but cannot be accessed without
* contention with other threads. This method is designed
* primarily to support extensions, and is unlikely to be useful
* otherwise.
*
* <p>This method may be invoked only from within {@code
* ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
*
* @return the next task, or {@code null} if none are available
*/
protected static ForkJoinTask<?> peekNextLocalTask() {
return ((ForkJoinWorkerThread) Thread.currentThread())
.peekTask();
}

ForkJoinTask#peekNextLocalTask()ForkJoinPool#peekTask()实现。

ForkJoinWorkerThread#drainTasksTo()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Drains tasks to given collection c.
*
* @return the number of tasks drained
*/
final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
int n = 0;
while (queueBase != queueTop) {
ForkJoinTask<?> t = deqTask();
if (t != null) {
c.add(t);
++n;
}
}
return n;
}

将当前任务队列中所有方法拿出来放到一个给定的集合里面,并返回放入的任务数量。

ForkJoinPool#drainTasksTo()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Removes all available unexecuted submitted and forked tasks
* from scheduling queues and adds them to the given collection,
* without altering their execution status. These may include
* artificially generated or wrapped tasks. This method is
* designed to be invoked only when the pool is known to be
* quiescent. Invocations at other times may not remove all
* tasks. A failure encountered while attempting to add elements
* to collection {@code c} may result in elements being in
* neither, either or both collections when the associated
* exception is thrown. The behavior of this operation is
* undefined if the specified collection is modified while the
* operation is in progress.
*
* @param c the collection to transfer elements into
* @return the number of elements transferred
*/
protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
int count = 0;
while (queueBase != queueTop) {
ForkJoinTask<?> t = pollSubmission();
if (t != null) {
c.add(t);
++count;
}
}
ForkJoinWorkerThread[] ws;
if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
(ws = workers) != null) {
for (ForkJoinWorkerThread w : ws)
if (w != null)
count += w.drainTasksTo(c);
}
return count;
}

ForkJoinWorkerThread#drainTasksTo()实现。
ForkJoinPool#drainTasksTo()的逻辑是将自身任务队列中的任务和所有工作线程中任务队列的任务都拿出来,放入一个给定集合,并返回放入集合的任务数量。

ForkJoinWorkerThread#getQueueSize()

1
2
3
4
5
6
/**
* Returns an estimate of the number of tasks in the queue.
*/
final int getQueueSize() {
return queueTop - queueBase;
}

返回当前工作线程任务队列中的任务数量。

ForkJoinTask#getQueuedTaskCount()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Returns an estimate of the number of tasks that have been
* forked by the current worker thread but not yet executed. This
* value may be useful for heuristic decisions about whether to
* fork other tasks.
*
* <p>This method may be invoked only from within {@code
* ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
*
* @return the number of tasks
*/
public static int getQueuedTaskCount() {
return ((ForkJoinWorkerThread) Thread.currentThread())
.getQueueSize();
}

ForkJoinTask#getQueuedTaskCount()ForkJoinWorkerThread#getQueueSize()实现。

ForkJoinWorkerThread#pollTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Gets and removes a local or stolen task.
*
* @return a task, if available
*/
final ForkJoinTask<?> pollTask() {
ForkJoinWorkerThread[] ws;
ForkJoinTask<?> t = pollLocalTask();
if (t != null || (ws = pool.workers) == null)
return t;
int n = ws.length; // cheap version of FJP.scan
int steps = n << 1;
int r = nextSeed();
int i = 0;
while (i < steps) {
ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
if (w != null && w.queueBase != w.queueTop && w.queue != null) {
if ((t = w.deqTask()) != null)
return t;
i = 0;
}
}
return null;
}

ForkJoinWorkerThread#pollLocalTask()

1
2
3
4
5
6
7
8
/**
* Gets and removes a local task.
*
* @return a task, if available
*/
final ForkJoinTask<?> pollLocalTask() {
return locallyFifo ? locallyDeqTask() : popTask();
}

pollTask()中首先通过pollLocalTask()来获取一个本地任务。如果没获取到的话,会继续扫描其它工作线程,来窃取一个任务。如果最后没扫描到,就返回null。

ForkJoinTask#pollTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Unschedules and returns, without executing, the next task
* queued by the current thread but not yet executed, if one is
* available, or if not available, a task that was forked by some
* other thread, if available. Availability may be transient, so a
* {@code null} result does not necessarily imply quiescence
* of the pool this task is operating in. This method is designed
* primarily to support extensions, and is unlikely to be useful
* otherwise.
*
* <p>This method may be invoked only from within {@code
* ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
*
* @return a task, or {@code null} if none are available
*/
protected static ForkJoinTask<?> pollTask() {
return ((ForkJoinWorkerThread) Thread.currentThread())
.pollTask();
}

ForkJoinWorkerThread#pollTask()实现。

ForkJoinWorkerThread#getEstimatedSurplusTaskCount()

1
2
3
final int getEstimatedSurplusTaskCount() {
return queueTop - queueBase - pool.idlePerActive();
}

返回估计的剩余任务数量。

ForkJoinPool#idlePerActive()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Returns the approximate (non-atomic) number of idle threads per
* active thread.
*/
final int idlePerActive() {
// Approximate at powers of two for small values, saturate past 4
int p = parallelism;
int a = p + (int)(ctl >> AC_SHIFT);
return (a > (p >>>= 1) ? 0 :
a > (p >>>= 1) ? 1 :
a > (p >>>= 1) ? 2 :
a > (p >>>= 1) ? 4 :
8);
}

当前活动线程的数量越少,返回的值越大。假设p(当前并行度,默认是cpu核数)为32,那么有a=1,返回8;a=3,返回4;a=5,返回2;a=9,返回1;a>16,都返回0。
ForkJoinWorkerThread#getEstimatedSurplusTaskCount()来说就是,如果当前活动的工作线程越多,那么估计的剩余任务数量就越接近自身任务队列中的任务数量(因为大家都在忙,被别人窃取的可能性少一些)。

ForkJoinTask#getSurplusQueuedTaskCount()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Returns an estimate of how many more locally queued tasks are
* held by the current worker thread than there are other worker
* threads that might steal them. This value may be useful for
* heuristic decisions about whether to fork other tasks. In many
* usages of ForkJoinTasks, at steady state, each worker should
* aim to maintain a small constant surplus (for example, 3) of
* tasks, and to process computations locally if this threshold is
* exceeded.
*
* <p>This method may be invoked only from within {@code
* ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
*
* @return the surplus number of tasks, which may be negative
*/
public static int getSurplusQueuedTaskCount() {
return ((ForkJoinWorkerThread) Thread.currentThread())
.getEstimatedSurplusTaskCount();
}

ForkJoinWorkerThread#getEstimatedSurplusTaskCount()实现。
通过这个方法的判断来决定是否fork任务,减少任务的窃取率,提高整体性能。

ForkJoinWorkerThread#helpQuiescePool()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* Runs tasks until {@code pool.isQuiescent()}. We piggyback on
* pool's active count ctl maintenance, but rather than blocking
* when tasks cannot be found, we rescan until all others cannot
* find tasks either. The bracketing by pool quiescerCounts
* updates suppresses pool auto-shutdown mechanics that could
* otherwise prematurely terminate the pool because all threads
* appear to be inactive.
*/
final void helpQuiescePool() {
boolean active = true;
ForkJoinTask<?> ps = currentSteal; // to restore below
ForkJoinPool p = pool;
// 1
p.addQuiescerCount(1);
for (;;) {
ForkJoinWorkerThread[] ws = p.workers;
ForkJoinWorkerThread v = null;
int n;
// 2
if (queueTop != queueBase)
// 3
v = this;
// 4
else if (ws != null && (n = ws.length) > 1) {
ForkJoinWorkerThread w;
// 5
int r = nextSeed(); // cheap version of FJP.scan
int steps = n << 1;
for (int i = 0; i < steps; ++i) {
if ((w = ws[(i + r) & (n - 1)]) != null &&
w.queueBase != w.queueTop) {
v = w;
break;
}
}
}
if (v != null) {
ForkJoinTask<?> t;
if (!active) {
active = true;
p.addActiveCount(1);
}
// 6
if ((t = (v != this) ? v.deqTask() :
locallyFifo ? locallyDeqTask() : popTask()) != null) {
currentSteal = t;
t.doExec();
currentSteal = ps;
}
}
else {
if (active) {
active = false;
p.addActiveCount(-1);
}
// 7
if (p.isQuiescent()) {
p.addActiveCount(1);
p.addQuiescerCount(-1);
break;
}
}
}
}

标注代码分析

  1. 增加PoolquiescerCount
  2. 选一个窃取牺牲者。
  3. 当前队列有任务就选自己。
  4. 扫描工作线程数组,选一个队列里有任务的作为牺牲者。
  5. 就是xor-shift算法。
  6. 窃取并执行任务。
  7. 直到Pool休眠再推出。

helpQuiescePool()不断的窃取任务(包括从自身的任务队列中窃取)来执行,直到Pool处于休眠状态。方法执行过程中会修改PoolquiescerCountactiveCount数量,这起到了防止Pool过早结束的作用,可以回头看一下ForkJoinPool#tryAwaitWork(),里面可能会tryTerminate(),会有一个自动结束Pool的代码路径。

ForkJoinPool#isQuiescent()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Returns {@code true} if all worker threads are currently idle.
* An idle worker is one that cannot obtain a task to execute
* because none are available to steal from other threads, and
* there are no pending submissions to the pool. This method is
* conservative; it might not return {@code true} immediately upon
* idleness of all threads, but will eventually become true if
* threads remain inactive.
*
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
return parallelism + (int)(ctl >> AC_SHIFT) + blockedCount == 0;
}

逻辑就是当前活动线程数量为0,且阻塞等到join的工作线程数量也为0。

ForkJoinPool#addQuiescerCount()

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Increment or decrement quiescerCount. Needed only to prevent
* triggering shutdown if a worker is transiently inactive while
* checking quiescence.
*
* @param delta 1 for increment, -1 for decrement
*/
final void addQuiescerCount(int delta) {
int c;
do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
c = quiescerCount, c + delta));
}

ForkJoinPool#addActiveCount()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Directly increment or decrement active count without
* queuing. This method is used to transiently assert inactivation
* while checking quiescence.
*
* @param delta 1 for increment, -1 for decrement
*/
final void addActiveCount(int delta) {
long d = delta < 0 ? -AC_UNIT : AC_UNIT;
long c;
do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
((c + d) & AC_MASK) |
(c & ~AC_MASK)));
}

ForkJoinTask#getSurplusQueuedTaskCount()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Returns an estimate of how many more locally queued tasks are
* held by the current worker thread than there are other worker
* threads that might steal them. This value may be useful for
* heuristic decisions about whether to fork other tasks. In many
* usages of ForkJoinTasks, at steady state, each worker should
* aim to maintain a small constant surplus (for example, 3) of
* tasks, and to process computations locally if this threshold is
* exceeded.
*
* <p>This method may be invoked only from within {@code
* ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
*
* @return the surplus number of tasks, which may be negative
*/
public static int getSurplusQueuedTaskCount() {
return ((ForkJoinWorkerThread) Thread.currentThread())
.getEstimatedSurplusTaskCount();
}

getEstimatedSurplusTaskCount()实现ForkJoinTask#getSurplusQueuedTaskCount()

ForkJoinTask#externalAwaitDone()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Blocks a non-worker-thread until completion.
* @return status upon completion
*/
private int externalAwaitDone() {
int s;
if ((s = status) >= 0) {
boolean interrupted = false;
synchronized (this) {
while ((s = status) >= 0) {
if (s == 0)
UNSAFE.compareAndSwapInt(this, statusOffset,
0, SIGNAL);
else {
try {
wait();
} catch (InterruptedException ie) {
interrupted = true;
}
}
}
}
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}

ForkJoin工作线程合并ForkJoinTask的时候会调用到。
ForkJoinTask加个SIGNAL状态(没有的话),然后在ForkJoinTask上等待,注意被唤醒后会传递中断状态。

ForkJoinTask#get()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread is not a
* member of a ForkJoinPool and was interrupted while waiting
*/
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone(0L);
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex);
return getRawResult();
}

如果当前线程是ForkJoin工作线程,那么调用doJoin()来获取结果;否则调用externalInterruptibleAwaitDone()来获取结果。后面还会处理取消和异常的情况,里面涉及到的方法大部分都分析过,这里只看一下externalInterruptibleAwaitDone()

ForkJoinTask#externalInterruptibleAwaitDone()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Blocks a non-worker-thread until completion or interruption or timeout.
*/
private int externalInterruptibleAwaitDone(long millis)
throws InterruptedException {
int s;
if (Thread.interrupted())
throw new InterruptedException();
if ((s = status) >= 0) {
synchronized (this) {
while ((s = status) >= 0) {
if (s == 0)
UNSAFE.compareAndSwapInt(this, statusOffset,
0, SIGNAL);
else {
wait(millis);
if (millis > 0L)
break;
}
}
}
}
return s;
}

在非ForkJoin线程调用时执行的。
当前ForkJoin任务加个SIGNAL状态(没有的话),然后在ForkJoinTask上等待,方法支持等待超时和中断。

ForkJoinTask#get(timeout,unit)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread is not a
* member of a ForkJoinPool and was interrupted while waiting
* @throws TimeoutException if the wait timed out
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
long nanos = unit.toNanos(timeout);
if (status >= 0) {
boolean completed = false;
if (w.unpushTask(this)) {
try {
completed = exec();
} catch (Throwable rex) {
setExceptionalCompletion(rex);
}
}
if (completed)
setCompletion(NORMAL);
else if (status >= 0 && nanos > 0)
w.pool.timedAwaitJoin(this, nanos);
}
}
else {
long millis = unit.toMillis(timeout);
if (millis > 0)
externalInterruptibleAwaitDone(millis);
}
int s = status;
if (s != NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
if (s != EXCEPTIONAL)
throw new TimeoutException();
if ((ex = getThrowableException()) != null)
throw new ExecutionException(ex);
}
return getRawResult();
}

ForkJoin线程

先尝试从自己的任务队列里pop出给定任务(如果给定任务恰好在当前任务队列的顶端),然后执行任务;否则会调用一个支持超时版本的等待合并任务的方法timedAwaitJoin(),等待任务完成。最后返回任务执行结果。

非ForkJoin线程

会调用externalInterruptibleAwaitDone(),等待任务完成。最后返回任务执行结果。

ForkJoinPool#timedAwaitJoin()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* Possibly blocks the given worker waiting for joinMe to
* complete or timeout
*
* @param joinMe the task
* @param millis the wait time for underlying Object.wait
*/
final void timedAwaitJoin(ForkJoinTask<?> joinMe, long nanos) {
// 1
while (joinMe.status >= 0) {
// 2
Thread.interrupted();
if ((ctl & STOP_BIT) != 0L) {
// 3
joinMe.cancelIgnoringExceptions();
break;
}
// 4
if (tryPreBlock()) {
long last = System.nanoTime();
while (joinMe.status >= 0) {
long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
if (millis <= 0)
break;
// 5
joinMe.tryAwaitDone(millis);
if (joinMe.status < 0)
break;
if ((ctl & STOP_BIT) != 0L) {
joinMe.cancelIgnoringExceptions();
break;
}
long now = System.nanoTime();
nanos -= now - last;
last = now;
}
// 6
postBlock();
break;
}
}
}

标注代码分析

  1. 如果任务状态是未完成。
  2. 先清空中断标记。
  3. 如果Pool关闭了,取消任务。
  4. 阻塞前的工作。
  5. 等待任务完成。
  6. 唤醒后的工作。

    ForkJoinTask#tryAwaitDone()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**
    * Tries to block a worker thread until completed or timed out.
    * Uses Object.wait time argument conventions.
    * May fail on contention or interrupt.
    *
    * @param millis if > 0, wait time.
    */
    final void tryAwaitDone(long millis) {
    int s;
    try {
    if (((s = status) > 0 ||
    (s == 0 &&
    UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&
    status > 0) {
    synchronized (this) {
    if (status > 0)
    wait(millis);
    }
    }
    } catch (InterruptedException ie) {
    // caller must check termination
    }
    }

ForkJoinTask#invoke()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Commences performing this task, awaits its completion if
* necessary, and returns its result, or throws an (unchecked)
* {@code RuntimeException} or {@code Error} if the underlying
* computation did so.
*
* @return the computed result
*/
public final V invoke() {
if (doInvoke() != NORMAL)
return reportResult();
else
return getRawResult();
}

ForkJoinTask#doInvoke()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Primary mechanics for invoke, quietlyInvoke.
* @return status upon completion
*/
private int doInvoke() {
int s; boolean completed;
if ((s = status) < 0)
return s;
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
return setCompletion(NORMAL);
else
return doJoin();
}

先执行任务,然后判断任务是否结束,结束的话设置完成状态;否则join任务等待结果。

ForkJoinTask#invokeAll()

1
2
3
4
5
6
.....
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
t2.fork();
t1.invoke();
t2.join();
}

同时执行两个方法,都执行完毕后返回。注意如果其中有一个方法抛异常的话,另一个方法就可能会被取消。Check Exception的原因。

ForkJoinTask#invokeAll(ForkJoinTask<?>… tasks)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
.....
public static void invokeAll(ForkJoinTask<?>... tasks) {
Throwable ex = null;
int last = tasks.length - 1;
for (int i = last; i >= 0; --i) {
ForkJoinTask<?> t = tasks[i];
if (t == null) {
if (ex == null)
ex = new NullPointerException();
}
else if (i != 0)
t.fork();
else if (t.doInvoke() < NORMAL && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = tasks[i];
if (t != null) {
if (ex != null)
t.cancel(false);
else if (t.doJoin() < NORMAL && ex == null)
ex = t.getException();
}
}
if (ex != null)
UNSAFE.throwException(ex);
}

ForkJoinTask#invokeAll(Collection tasks)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
.....
public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
return tasks;
}
@SuppressWarnings("unchecked")
List<? extends ForkJoinTask<?>> ts =
(List<? extends ForkJoinTask<?>>) tasks;
Throwable ex = null;
int last = ts.size() - 1;
for (int i = last; i >= 0; --i) {
ForkJoinTask<?> t = ts.get(i);
if (t == null) {
if (ex == null)
ex = new NullPointerException();
}
else if (i != 0)
t.fork();
else if (t.doInvoke() < NORMAL && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = ts.get(i);
if (t != null) {
if (ex != null)
t.cancel(false);
else if (t.doJoin() < NORMAL && ex == null)
ex = t.getException();
}
}
if (ex != null)
UNSAFE.throwException(ex);
return tasks;
}

ForkJoinTask#isCompletedAbnormally()

1
2
3
4
5
6
7
8
/**
* Returns {@code true} if this task threw an exception or was cancelled.
*
* @return {@code true} if this task threw an exception or was cancelled
*/
public final boolean isCompletedAbnormally() {
return status < NORMAL;
}

判断任务是否异常结束。

ForkJoinTask#isCompletedNormally()

1
2
3
4
5
6
7
8
9
10
 /**
* Returns {@code true} if this task completed without throwing an
* exception and was not cancelled.
*
* @return {@code true} if this task completed without throwing an
* exception and was not cancelled
*/
public final boolean isCompletedNormally() {
return status == NORMAL;
}

判断任务是否正常完成。

ForkJoinTask#quietlyJoin()

1
2
3
4
5
6
7
8
9
/**
* Joins this task, without returning its result or throwing its
* exception. This method may be useful when processing
* collections of tasks when some have been cancelled or otherwise
* known to have aborted.
*/
public final void quietlyJoin() {
doJoin();
}

ForkJoinTask#quietlyInvoke()

1
2
3
4
5
6
7
8
/**
* Commences performing this task and awaits its completion if
* necessary, without returning its result or throwing its
* exception.
*/
public final void quietlyInvoke() {
doInvoke();
}

quietlyJoin()quietlyInvoke()它们不会返回结果或者抛出异常。

ForkJoinTask#reinitialize()

1
2
3
4
5
6
7
8
.....
public void reinitialize() {
if (status == EXCEPTIONAL)
// 1
clearExceptionalCompletion();
else
status = 0;
}

reinitialize()是重置方法。
标注代码分析

  1. 如果发生过异常,清空异常表。

    ForkJoinTask#tryUnfork()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /**
    * Tries to unschedule this task for execution. This method will
    * typically succeed if this task is the most recently forked task
    * by the current thread, and has not commenced executing in
    * another thread. This method may be useful when arranging
    * alternative local processing of tasks that could have been, but
    * were not, stolen.
    *
    * <p>This method may be invoked only from within {@code
    * ForkJoinPool} computations (as may be determined using method
    * {@link #inForkJoinPool}). Attempts to invoke in other contexts
    * result in exceptions or errors, possibly including {@code
    * ClassCastException}.
    *
    * @return {@code true} if unforked
    */
    public boolean tryUnfork() {
    return ((ForkJoinWorkerThread) Thread.currentThread())
    .unpushTask(this);
    }

方法内部会尝试将自身从当前工作线程的队列顶端移除,相当于在当前任务没有被调度执行之前取消了自己。

ForkJoinTask#getPool()

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Returns the pool hosting the current task execution, or null
* if this task is executing outside of any ForkJoinPool.
*
* @see #inForkJoinPool
* @return the pool, or {@code null} if none
*/
public static ForkJoinPool getPool() {
Thread t = Thread.currentThread();
return (t instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread) t).pool : null;
}

ForkJoinTask#inForkJoinPool()

1
2
3
4
5
6
7
8
9
10
11
/**
* Returns {@code true} if the current thread is a {@link
* ForkJoinWorkerThread} executing as a ForkJoinPool computation.
*
* @return {@code true} if the current thread is a {@link
* ForkJoinWorkerThread} executing as a ForkJoinPool computation,
* or {@code false} otherwise
*/
public static boolean inForkJoinPool() {
return Thread.currentThread() instanceof ForkJoinWorkerThread;
}

ForkJoinPool

ForkJoinPool#managedBlock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Blocks in accord with the given blocker. If the current thread
* is a {@link ForkJoinWorkerThread}, this method possibly
* arranges for a spare thread to be activated if necessary to
* ensure sufficient parallelism while the current thread is blocked.
*
* <p>If the caller is not a {@link ForkJoinTask}, this method is
* behaviorally equivalent to
* <pre> {@code
* while (!blocker.isReleasable())
* if (blocker.block())
* return;
* }</pre>
*
* If the caller is a {@code ForkJoinTask}, then the pool may
* first be expanded to ensure parallelism, and later adjusted.
*
* @param blocker the blocker
* @throws InterruptedException if blocker.block did so
*/
public static void managedBlock(ManagedBlocker blocker)
throws InterruptedException {
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
w.pool.awaitBlocker(blocker);
}
else {
do {} while (!blocker.isReleasable() && !blocker.block());
}
}

ForkJoin工作线程

会调用工作线程所在的PoolawaitBlocker()

非ForkJoin工作线程

相当于在blocker上阻塞,被唤醒后方法就结束了。

ForkJoinPool#awaitBlocker()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* If necessary, compensates for blocker, and blocks
*/
private void awaitBlocker(ManagedBlocker blocker)
throws InterruptedException {
while (!blocker.isReleasable()) {
if (tryPreBlock()) {
try {
do {} while (!blocker.isReleasable() && !blocker.block());
} finally {
postBlock();
}
break;
}
}
}

managedBlock中非ForkJoin工作的逻辑差不多,只是加了阻塞前后的处理。
可以回顾一下这两个方法都做了哪些工作。
managedBlock的目的是为了保证Pool的并行性。假设具体的ForkJoinTask在执行过程中,被外部阻塞了,相当于执行当前任务的ForkJoin工作线程在Pool以外的阻塞对象(blocker)上阻塞了,对于Pool来说,它还认为这个工作线程是活动的(不像内部阻塞,如join某个任务,在内部会有记录),如果这个阻塞过程持续的时间很长的话,一定会影响Pool的并行性能。如果能将这种外部阻塞也记录到Pool里面,那么工作线程被外部阻塞(阻塞时间很长)的话,Pool就可能会产生一个新的工作线程来保证并行性能。managedBlock()的目的就是这个,具体使用方式就是通过实现ManagedBlocker接口将外部阻塞记录进来。

ForkJoinPool#ManagedBlocker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* Interface for extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.
*
* <p>A {@code ManagedBlocker} provides two methods. Method
* {@code isReleasable} must return {@code true} if blocking is
* not necessary. Method {@code block} blocks the current thread
* if necessary (perhaps internally invoking {@code isReleasable}
* before actually blocking). These actions are performed by any
* thread invoking {@link ForkJoinPool#managedBlock}. The
* unusual methods in this API accommodate synchronizers that may,
* but don't usually, block for long periods. Similarly, they
* allow more efficient internal handling of cases in which
* additional workers may be, but usually are not, needed to
* ensure sufficient parallelism. Toward this end,
* implementations of method {@code isReleasable} must be amenable
* to repeated invocation.
*
* <p>For example, here is a ManagedBlocker based on a
* ReentrantLock:
* <pre> {@code
* class ManagedLocker implements ManagedBlocker {
* final ReentrantLock lock;
* boolean hasLock = false;
* ManagedLocker(ReentrantLock lock) { this.lock = lock; }
* public boolean block() {
* if (!hasLock)
* lock.lock();
* return true;
* }
* public boolean isReleasable() {
* return hasLock || (hasLock = lock.tryLock());
* }
* }}</pre>
*
* <p>Here is a class that possibly blocks waiting for an
* item on a given queue:
* <pre> {@code
* class QueueTaker<E> implements ManagedBlocker {
* final BlockingQueue<E> queue;
* volatile E item = null;
* QueueTaker(BlockingQueue<E> q) { this.queue = q; }
* public boolean block() throws InterruptedException {
* if (item == null)
* item = queue.take();
* return true;
* }
* public boolean isReleasable() {
* return item != null || (item = queue.poll()) != null;
* }
* public E getItem() { // call after pool.managedBlock completes
* return item;
* }
* }}</pre>
*/
public static interface ManagedBlocker {
/**
* Possibly blocks the current thread, for example waiting for
* a lock or condition.
*
* @return {@code true} if no additional blocking is necessary
* (i.e., if isReleasable would return true)
* @throws InterruptedException if interrupted while waiting
* (the method is not required to do so, but is allowed to)
*/
boolean block() throws InterruptedException;

/**
* Returns {@code true} if blocking is unnecessary.
*/
boolean isReleasable();
}

ManagedBlocker#doc#ManagedLocker

ManagedBlocker#doc上还提供示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
class ManagedLocker implements ManagedBlocker {
final ReentrantLock lock;
boolean hasLock = false;
ManagedLocker(ReentrantLock lock) { this.lock = lock; }
public boolean block() {
if (!hasLock)
lock.lock();
return true;
}
public boolean isReleasable() {
return hasLock || (hasLock = lock.tryLock());
}
}}

ForkJoinPool#invoke()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Performs the given task, returning its result upon completion.
* If the computation encounters an unchecked Exception or Error,
* it is rethrown as the outcome of this invocation. Rethrown
* exceptions behave in the same way as regular exceptions, but,
* when possible, contain stack traces (as displayed for example
* using {@code ex.printStackTrace()}) of both the current thread
* as well as the thread actually encountering the exception;
* minimally only the latter.
*
* @param task the task
* @return the task's result
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> T invoke(ForkJoinTask<T> task) {
Thread t = Thread.currentThread();
if (task == null)
throw new NullPointerException();
if (shutdown)
throw new RejectedExecutionException();
if ((t instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread)t).pool == this)
return task.invoke(); // bypass submit if in same pool
else {
addSubmission(task);
return task.join();
}
}

执行任务然后返回结果,不是异步的。
如果当前线程是ForkJoin工作线程,就执行给定任务的invoke();否则将任务提交到Pool的任务队列里面,然后join等待任务。

ForkJoinPool#execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Arranges for (asynchronous) execution of the given task.
*
* @param task the task
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
forkOrSubmit(task);
}

// AbstractExecutorService methods

/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
forkOrSubmit(job);
}

两个execute()是异步执行的。

ForkJoinPool#invokeAll()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* @throws NullPointerException {@inheritDoc}
* @throws RejectedExecutionException {@inheritDoc}
*/
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
ArrayList<ForkJoinTask<T>> forkJoinTasks =
new ArrayList<ForkJoinTask<T>>(tasks.size());
for (Callable<T> task : tasks)
forkJoinTasks.add(ForkJoinTask.adapt(task));
invoke(new InvokeAll<T>(forkJoinTasks));

@SuppressWarnings({"unchecked", "rawtypes"})
List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
return futures;
}

static final class InvokeAll<T> extends RecursiveAction {
final ArrayList<ForkJoinTask<T>> tasks;
InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
public void compute() {
try { invokeAll(tasks); }
catch (Exception ignore) {}
}
private static final long serialVersionUID = -7914297376763021607L;
}

内部调用的是ForkJoinTask#invokeAll()

ForkJoinPool#other()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/**
* Returns the factory used for constructing new workers.
*
* @return the factory used for constructing new workers
*/
// 1
public ForkJoinWorkerThreadFactory getFactory() {
return factory;
}

/**
* Returns the handler for internal worker threads that terminate
* due to unrecoverable errors encountered while executing tasks.
*
* @return the handler, or {@code null} if none
*/
// 2
public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
return ueh;
}

/**
* Returns the targeted parallelism level of this pool.
*
* @return the targeted parallelism level of this pool
*/
// 3
public int getParallelism() {
return parallelism;
}

/**
* Returns the number of worker threads that have started but not
* yet terminated. The result returned by this method may differ
* from {@link #getParallelism} when threads are created to
* maintain parallelism when others are cooperatively blocked.
*
* @return the number of worker threads
*/
// 4
public int getPoolSize() {
return parallelism + (short)(ctl >>> TC_SHIFT);
}

/**
* Returns {@code true} if this pool uses local first-in-first-out
* scheduling mode for forked tasks that are never joined.
*
* @return {@code true} if this pool uses async mode
*/
// 5
public boolean getAsyncMode() {
return locallyFifo;
}

/**
* Returns an estimate of the number of worker threads that are
* not blocked waiting to join tasks or for other managed
* synchronization. This method may overestimate the
* number of running threads.
*
* @return the number of worker threads
*/
// 6
public int getRunningThreadCount() {
int r = parallelism + (int)(ctl >> AC_SHIFT);
return (r <= 0) ? 0 : r; // suppress momentarily negative values
}

/**
* Returns an estimate of the number of threads that are currently
* stealing or executing tasks. This method may overestimate the
* number of active threads.
*
* @return the number of active threads
*/
// 7
public int getActiveThreadCount() {
int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount;
return (r <= 0) ? 0 : r; // suppress momentarily negative values
}

/**
* Returns an estimate of the total number of tasks stolen from
* one thread's work queue by another. The reported value
* underestimates the actual total number of steals when the pool
* is not quiescent. This value may be useful for monitoring and
* tuning fork/join programs: in general, steal counts should be
* high enough to keep threads busy, but low enough to avoid
* overhead and contention across threads.
*
* @return the number of steals
*/
// 8
public long getStealCount() {
return stealCount;
}

/**
* Returns an estimate of the total number of tasks currently held
* in queues by worker threads (but not including tasks submitted
* to the pool that have not begun executing). This value is only
* an approximation, obtained by iterating across all threads in
* the pool. This method may be useful for tuning task
* granularities.
*
* @return the number of queued tasks
*/
// 9
public long getQueuedTaskCount() {
long count = 0;
ForkJoinWorkerThread[] ws;
if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
(ws = workers) != null) {
for (ForkJoinWorkerThread w : ws)
if (w != null)
count -= w.queueBase - w.queueTop; // must read base first
}
return count;
}

/**
* Returns an estimate of the number of tasks submitted to this
* pool that have not yet begun executing. This method may take
* time proportional to the number of submissions.
*
* @return the number of queued submissions
*/
// 10
public int getQueuedSubmissionCount() {
return -queueBase + queueTop;
}

/**
* Returns {@code true} if there are any tasks submitted to this
* pool that have not yet begun executing.
*
* @return {@code true} if there are any queued submissions
*/
// 11
public boolean hasQueuedSubmissions() {
return queueBase != queueTop;
}

标注代码分析

  1. 获取工作线程工厂。
  2. 获取内部工作线程未获取异常处理器。
  3. 获取当前Pool的并行度。
  4. 获取总的工作线程数量。
  5. 获取工作模式(是否异步模式)。
  6. 获取运行中的工作线程数量。(近似值)
  7. 获取活动的工作数量(包括阻塞的,近似值)。
  8. 获取Pool内部所有工作线程窃取的任务总数。(近似值)
  9. 获取所有工作线程任务队列中的任务总数。(近似值)
  10. 获取Pool的任务队列中的任务数量。(近似值)
  11. 判断Pool的任务队列中是否有任务。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×