- ForkJoinPool属性
- ForkJoinWorkerThread属性
- ForkJoinTask属性
- 非ForkJoin任务的执行过程
- 例
- ForkJoinPool
- ForkJoinPool
- ForkJoinPool默认的工作线程工厂
- ForkJoinPool#submit()
- ForkJoinTask#adapt()
- ForkJoinTask#AdaptedCallable()
- ForkJoinTask#AdaptedRunnable()
- ForkJoinPool#forkOrSubmit()
- ForkJoinPool#addSubmission()
- ForkJoinPool#growSubmissionQueue()
- ForkJoinPool#signalWork()
- ForkJoinPool#addWorker()
- ForkJoinPool#ForkJoinWorkerThreadFactory#newThread()
- ForkJoinPool#nextWorkerName()
- ForkJoinPool#registerWorker()
- ForkJoinWorkerThread#run()
- ForkJoinWorkerThread#onStart()
- ForkJoinWorkerThread#onTermination()
- ForkJoinPool#work
- ForkJoinPool#scan()
- ForkJoinWorkerThread#execTask()
- ForJoinTask#doExec()
- RecursiveAction#exec()
- RecursiveAction#compute()
- RecursiveTask#exec()
- RecursiveTask#compute()
- ForkJoinTask#setCompletion()
- ForkJoinTask#setExceptionalCompletion()
- Exception table
- ForkJoinPool#tryAwaitWork()
- ForkJoinPool#idleAwaitWork()
- ForkJoinWorkerThread#onTermination()
- ForkJoinWorkerThread#cancelTasks()
- ForkJoinTask#cancel()
- ForkJoinPool#deregisterWorker()
- ForkJoinPool#tryTerminate()
- ForkJoinPool#startTerminating()
- ForkJoinPool#cancelSubmissions()
- ForkJoinPool#pollSubmission()
- ForkJoinPool#terminateWaiters()
- ForkJoinPool#tryReleaseWaiter()
- ForkJoinWorkerThread#execTask()
- ForkJoinWorkerThread#popTask()
- ForkJoinWorkerThread#locallyDeqTask()
- 总结
ForkJoinPool属性
部分参数1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55/**
* Main pool control -- a long packed with:
* AC: Number of active running workers minus target parallelism (16 bits)
* TC: Number of total workers minus target parallelism (16bits)
* ST: true if pool is terminating (1 bit)
* EC: the wait count of top waiting thread (15 bits)
* ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits)
*
* When convenient, we can extract the upper 32 bits of counts and
* the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
* (int)ctl. The ec field is never accessed alone, but always
* together with id and st. The offsets of counts by the target
* parallelism and the positionings of fields makes it possible to
* perform the most common checks via sign tests of fields: When
* ac is negative, there are not enough active workers, when tc is
* negative, there are not enough total workers, when id is
* negative, there is at least one waiting worker, and when e is
* negative, the pool is terminating. To deal with these possibly
* negative fields, we use casts in and out of "short" and/or
* signed shifts to maintain signedness.
*/
volatile long ctl;
// bit positions/shifts for fields
private static final int AC_SHIFT = 48;
private static final int TC_SHIFT = 32;
private static final int ST_SHIFT = 31;
private static final int EC_SHIFT = 16;
// bounds
private static final int MAX_ID = 0x7fff; // max poolIndex
private static final int SMASK = 0xffff; // mask short bits
private static final int SHORT_SIGN = 1 << 15;
private static final int INT_SIGN = 1 << 31;
// masks
private static final long STOP_BIT = 0x0001L << ST_SHIFT;
private static final long AC_MASK = ((long)SMASK) << AC_SHIFT;
private static final long TC_MASK = ((long)SMASK) << TC_SHIFT;
// units for incrementing and decrementing
private static final long TC_UNIT = 1L << TC_SHIFT;
private static final long AC_UNIT = 1L << AC_SHIFT;
// masks and units for dealing with u = (int)(ctl >>> 32)
private static final int UAC_SHIFT = AC_SHIFT - 32;
private static final int UTC_SHIFT = TC_SHIFT - 32;
private static final int UAC_MASK = SMASK << UAC_SHIFT;
private static final int UTC_MASK = SMASK << UTC_SHIFT;
private static final int UAC_UNIT = 1 << UAC_SHIFT;
private static final int UTC_UNIT = 1 << UTC_SHIFT;
// masks and units for dealing with e = (int)ctl
private static final int E_MASK = 0x7fffffff; // no STOP_BIT
private static final int EC_UNIT = 1 << EC_SHIFT;
ForkJoinPool
的总控制信息,包含在一个long(64bit)
里面。
- AC: 表示当前活动的工作线程的数量减去并行度得到的数值。(16 bits)
- TC: 表示全部工作线程的数量减去并行度得到的数值。(16bits)
- ST: 表示当前
ForkJoinPool
是否正在关闭。(1 bit) - EC: 表示
Treiber stack
顶端的等待工作线程的等待次数。(15 bits) - ID:
Treiber stack
顶端的等待工作线程的下标取反。(16 bits)
1111111111111111 | 1111111111111111 | 1 | 111111111111111 | 1111111111111111 |
---|---|---|---|---|
AC | TC | ST | EC | ID |
- 如果
AC
为负数,说明没有足够的活动工作线程。 - 如果
TC
为负数,说明工作线程数量没达到最大工作线程数量。 - 如果
ID
为负数,说明至少有一个等待的工作线程。 - 如果
(int)ctl
为负数,说明ForkJoinPool
正在关闭。ctl
是ForkJoinPool
中最重要的,也是设计最精密的域,它是整个ForkJoinPool
的总控信息。所有信息包含在一个long(64bit)
中,这些信息包括:当前活动的工作线程数量、当前总的工作线程数量、ForkJoinPool
的关闭标志、在Treiber stack
(由全部等待工作线程组成的一个链)顶端等待的工作线程的等待次数、Treiber stack
(由全部等待工作线程组成的一个链)顶端等待的工作线程的ID
信息(工作线程的下标取反)。ctl
还有一个相对不重要的作用就是,某些非volatile
域会依赖ctl
来保证可见性。1
2
3
4
5
6
7
8
9
10
11/**
* Array holding all worker threads in the pool. Initialized upon
* construction. Array size must be a power of two. Updates and
* replacements are protected by scanGuard, but the array is
* always kept in a consistent enough state to be randomly
* accessed without locking by workers performing work-stealing,
* as well as other traversal-based methods in this class, so long
* as reads memory-acquire by first reading ctl. All readers must
* tolerate that some array slots may be null.
*/
ForkJoinWorkerThread[] workers;
workers
是ForkJoinPool
中保存工作线程的数组,它的更新会由一个锁(scanGuard
)来保护。1
2
3
4
5
6
7
8
9
10
11/**
* SeqLock and index masking for updates to workers array. Locked
* when SG_UNIT is set. Unlocking clears bit by adding
* SG_UNIT. Staleness of read-only operations can be checked by
* comparing scanGuard to value before the reads. The low 16 bits
* (i.e, anding with SMASK) hold (the smallest power of two
* covering all worker indices, minus one, and is used to avoid
* dealing with large numbers of null slots when the workers array
* is overallocated.
*/
volatile int scanGuard;
scanGuard
是另外一个比较重要的域,它有两个作用。
- 作为更新工作线程数组是使用的(顺序)锁。
- 作为扫描工作线程数组时使用的边界值来避免一些没用的扫描(当数组过大时)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24/**
* Initial size for submission queue array. Must be a power of
* two. In many applications, these always stay small so we use a
* small initial cap.
*/
private static final int INITIAL_QUEUE_CAPACITY = 8;
/**
* Maximum size for submission queue array. Must be a power of two
* less than or equal to 1 << (31 - width of array entry) to
* ensure lack of index wraparound, but is capped at a lower
* value to help users trap runaway computations.
*/
private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
/**
* Array serving as submission queue. Initialized upon construction.
*/
private ForkJoinTask<?>[] submissionQueue;
/**
* Lock protecting submissions array for addSubmission
*/
private final ReentrantLock submissionLock;
ForkJoinPool
中也有一个队列submissionQueue
,这个队列里存放的是有外部(非ForkJoin工作线程)提交到ForkJoinPool
中的任务。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/**
* Index (mod submission queue length) of next element to take
* from submission queue. Usage is identical to that for
* per-worker queues -- see ForkJoinWorkerThread internal
* documentation.
*/
volatile int queueBase;
/**
* Index (mod submission queue length) of next element to add
* in submission queue. Usage is identical to that for
* per-worker queues -- see ForkJoinWorkerThread internal
* documentation.
*/
int queueTop;
这两个域分别表示submissionQueue
的底部和顶部。1
2
3
4
5/**
* True if use local fifo, not default lifo, for local polling
* Read by, and replicated by ForkJoinWorkerThreads
*/
final boolean locallyFifo;
locallyFifo
域也比较重要,它有ForkJoinPool
的构造方法的参数asyncMode
来指定。如果locallyFifo
为true,表示内部将才用FIFO的方式来调度任务队列中的任务,而且这些任务可以分裂(fork),但最好不要合并(join),这种模式很适合来处理事件形式(event-style
)的异步任务。默认locallyFifo
为false。
ForkJoinWorkerThread属性
ForkJoinWorkerThread
是ForkJoin框架中负责执行具体任务的工作线程。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41/**
* Capacity of work-stealing queue array upon initialization.
* Must be a power of two. Initial size must be at least 4, but is
* padded to minimize cache effects.
*/
private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
/**
* Maximum size for queue array. Must be a power of two
* less than or equal to 1 << (31 - width of array entry) to
* ensure lack of index wraparound, but is capped at a lower
* value to help users trap runaway computations.
*/
private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
/**
* The work-stealing queue array. Size must be a power of two.
* Initialized when started (as oposed to when constructed), to
* improve memory locality.
*/
ForkJoinTask<?>[] queue;
/**
* The pool this thread works in. Accessed directly by ForkJoinTask.
*/
final ForkJoinPool pool;
/**
* Index (mod queue.length) of next queue slot to push to or pop
* from. It is written only by owner thread, and accessed by other
* threads only after reading (volatile) queueBase. Both queueTop
* and queueBase are allowed to wrap around on overflow, but
* (queueTop - queueBase) still estimates size.
*/
int queueTop;
/**
* Index (mod queue.length) of least valid queue slot, which is
* always the next position to steal from if nonempty.
*/
volatile int queueBase;
queue
就是ForkJoinWorkerThread
中的任务队列,当从其他工作线程中窃取任务时,就是从这个队列中进行窃取。1
2
3
4
5
6/**
* Index of this worker in pool array. Set once by pool before
* running, and accessed directly by pool to locate this worker in
* its workers array.
*/
final int poolIndex;
工作线程在ForkJoinPool
中工作线程数组中的下标。1
2
3
4
5
6
7
8/**
* The index of most recent stealer, used as a hint to avoid
* traversal in method helpJoinTask. This is only a hint because a
* worker might have had multiple steals and this only holds one
* of them (usually the most current). Declared non-volatile,
* relying on other prevailing sync to keep reasonably current.
*/
int stealHint;
stealHint
保存了最近的窃取者(来窃取任务的工作线程)的下标(poolIndex
)。注意这个值不准确,因为可能同时有很多窃取者来窃取任务,这个值只能记录其中之一。1
2
3
4
5/**
* Encoded record for pool task waits. Usages are always
* surrounded by volatile reads/writes
*/
int nextWait;
nextWait
算是比较难理解的一个域。首先所有的等待工作线程组成了一个隐式的单链(代码中也叫Treiber stack,由于行为类似于栈),链顶端的等待工作线程的信息保存在Pool
的ctl
中,新来的等待工作线程会将ctl
中之前的等待工作线程信息保存到nextWait
上,然后将自己的信息设置到ctl
上。1
2
3
4
5/**
* True if use local fifo, not default lifo, for local polling.
* Shadows value from ForkJoinPool.
*/
final boolean locallyFifo;
这个和ForkJoinPool#locallyFifo
一致。1
2
3
4
5
6/**
* The task most recently stolen from another worker (or
* submission queue). All uses are surrounded by enough volatile
* reads/writes to maintain as non-volatile.
*/
ForkJoinTask<?> currentSteal;
当前工作线程最新窃取的任务。注意可以从其他工作线程的任务队列或者从Pool
中的提交任务队列(submissionQueue
)中窃取任务。1
2
3
4
5
6/**
* The task currently being joined, set only when actively trying
* to help other stealers in helpJoinTask. All uses are surrounded
* by enough volatile reads/writes to maintain as non-volatile.
*/
ForkJoinTask<?> currentJoin;
当前工作线程正在合并的任务。
ForkJoinTask属性
1 | /* |
ForkJoinTask
中只有一个表示运行状态的域。初始为0;1表示在等待被唤醒;负数都表示执行完毕。
- -1表示正常完成。
- -2表示被取消。
- -3表示异常结束。
非ForkJoin任务的执行过程
非ForkJoin(Runnable或者Callable)任务的执行过程来分析ForkJoin的相关代码,注意这里说的非ForkJoin任务实际上也是ForkJoinTask
,只是没有分裂(fork)/合并(join)过程。例
非ForkJoin任务的代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<?> task = forkJoinPool.submit(new Runnable() {
@Override
public void run() {
System.out.println("AAA");
}
});
try {
task.get();
forkJoinPool.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
代码中提交了一个Runnable
的任务到ForkJoinPool
,任务的执行就是打印一句话。
下面就从提交一个Runnable
的任务到ForkJoinPool
,直到任务被执行的过程来分析源码。
ForkJoinPool
1 | ..... |
标注代码分析
- 设置并行度。
- 设置工作线程工厂。
- 设置线程未捕获异常处理器。
- 设置是否为异步模式。
- 参考我们之前介绍的
ctl
的内容,由于ctl
中的AC
表示当前活动。工作线程数量减去并行度,所以这里要将这个信息加到ctl
上。 - 初始化提交任务队列。
- 这里需要根据并行度来算出工作线程数组的大小。由于数组大小必须的2的幂,这里的算法是算出比。
parallelism
的2倍大的最小的2的幂,但不能超过。MAX_ID + 1(1 << 16)
的数作为工作线程数组大小。 - 创建存放工作线程的数组。
- 生成工作线程名称前缀。
ForkJoinPool
- 并行度如果未提供,默认就是当前处理器核数。
- 初始化控制信息的部分,注意在
AC
和TC
的信息上减掉了并行度,比如如果并行度为4,那么初始的AC
和TC
就都是-4,那么如果后续发现AC
等于0,就说明当前活动的线程数正好等于处理器核心数量。 - 确定工作线程数组大小的过程是这样的,首先取一个数n,默认是并行度的2倍。然后会使用来自
HD
的一个位操作技巧,就是将n的位模式的前导1后面所有的位都变成1,其实就是一个比n大的2的幂减1的数。当然n最大不能超过MAX_ID
,最终数组的大小是n+1
,是一个2的幂,能简化后续的取模操作。ForkJoinPool默认的工作线程工厂
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32static {
poolNumberGenerator = new AtomicInteger();
workerSeedGenerator = new Random();
modifyThreadPermission = new RuntimePermission("modifyThread");
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
int s;
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = ForkJoinPool.class;
ctlOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("ctl"));
stealCountOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("stealCount"));
blockedCountOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("blockedCount"));
quiescerCountOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("quiescerCount"));
scanGuardOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("scanGuard"));
nextWorkerNumberOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("nextWorkerNumber"));
Class a = ForkJoinTask[].class;
ABASE = UNSAFE.arrayBaseOffset(a);
s = UNSAFE.arrayIndexScale(a);
} catch (Exception e) {
throw new Error(e);
}
if ((s & (s-1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
}
1 | /** |
1 | /** |
ForkJoinPool#submit()
1 | /** |
ForkJoinPool
中定义了一些列重载的submit()
,这些submit()
在ForkJoinTask
内部会先将Callable
、Runnable
包装(适配)成ForkJoinTask
,然后再进行提交。
ForkJoinTask#adapt()
1 | /** |
ForkJoinTask#AdaptedCallable()
Callable
会包装成一个AdaptedCallable
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28/**
* Adaptor for Callables
*/
static final class AdaptedCallable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
final Callable<? extends T> callable;
T result;
AdaptedCallable(Callable<? extends T> callable) {
if (callable == null) throw new NullPointerException();
this.callable = callable;
}
public T getRawResult() { return result; }
public void setRawResult(T v) { result = v; }
public boolean exec() {
try {
result = callable.call();
return true;
} catch (Error err) {
throw err;
} catch (RuntimeException rex) {
throw rex;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public void run() { invoke(); }
private static final long serialVersionUID = 2838392045355241008L;
}
ForkJoinTask#AdaptedRunnable()
Runnable
会包装成一个AdaptedRunnable
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25/**
* Adaptor for Runnables. This implements RunnableFuture
* to be compliant with AbstractExecutorService constraints
* when used in ForkJoinPool.
*/
static final class AdaptedRunnable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
final Runnable runnable;
final T resultOnCompletion;
T result;
AdaptedRunnable(Runnable runnable, T result) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
this.resultOnCompletion = result;
}
public T getRawResult() { return result; }
public void setRawResult(T v) { result = v; }
public boolean exec() {
runnable.run();
result = resultOnCompletion;
return true;
}
public void run() { invoke(); }
private static final long serialVersionUID = 5232453952276885070L;
}
ForkJoinPool#forkOrSubmit()
1 | /** |
如果当前线程是ForkJoin工作线程,说明是在ForkJoinTask
内部提交的任务(比如分裂出子任务然后提交执行),这种情况下会将任务添加到工作线程的任务队列中;如果当前线程不是ForkJoin工作线程,说明是初始提交ForkJoin任务(外部将ForkJoinTask
初次提交给ForkJoinPool
),这种情况下会将任务添加到ForkJoinPool
的任务队列中。
ForkJoinPool#addSubmission()
1 | /** |
标注代码分析
- 这步计算内存偏移地址。
- 将t设置到q的对应位置。(LazySet)
- 如果队列满了,扩展队列。
- 唤醒工作线程。
addSubmission()
添加一个任务到提交任务队列(过程要加锁),如果队列满了,扩展一下。然后唤醒工作线程。
这时候是非fork-join提交任务。在forkOrSubmit()
中一定会走addSubmission()
的分支。
代码可以看到通过Unsafe
设置Task
到数组的方式,之后所有的设置任务到数组都会采取这种方式,之所以这样做是为了提高性能:这种方式其实和数组原子量(AtomicReferenceArray
)中一致,但减少了2方面的性能损耗。
- 不用像
AtomicReferenceArray
内部一样再做边界检测(由外部保证)。 - 由于队列最大容量的限制(工作线程中的任务队列也一样),不用像
AtomicReferenceArray
一样在计算偏移量过程中不会进行从int
到long
的提升。ForkJoinPool#growSubmissionQueue()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25/**
* Creates or doubles submissionQueue array.
* Basically identical to ForkJoinWorkerThread version.
*/
private void growSubmissionQueue() {
ForkJoinTask<?>[] oldQ = submissionQueue;
int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
if (size < INITIAL_QUEUE_CAPACITY)
size = INITIAL_QUEUE_CAPACITY;
ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size];
int mask = size - 1;
int top = queueTop;
int oldMask;
if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
for (int b = queueBase; b != top; ++b) {
long u = ((b & oldMask) << ASHIFT) + ABASE;
Object x = UNSAFE.getObjectVolatile(oldQ, u);
if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
UNSAFE.putObjectVolatile
(q, ((b & mask) << ASHIFT) + ABASE, x);
}
}
}
很容易看懂,最小容量为INITIAL_QUEUE_CAPACITY = 8
,每次扩展为原来的2倍,最大不能超过MAXIMUM_QUEUE_CAPACITY = 1 << 24(16777216)
。
现在任务已经提交到ForkJoinPool#submissionQueue()
。还会执行一个唤醒工作线程的动作,这样就会有工作线程来执行我们提交的任务了。
ForkJoinPool#signalWork()
1 | /** |
标注代码分析
- 唤醒一个等待的工作线程。
- 等待工作线程的下标比
workers
的size
大||获取不到等待线程。 - 将Treiber stack中下一个等待线程的信息(下标)放到控制信息上||控制信息上添加一个
AC
计数。 - 累加w的等待次数。
- 唤醒w。
- 添加一个工作线程。CAS操作累加控制信息上
AC
和TC
。 - 添加工作线程。
signalWork
标注0
while循环内部由e来区分,当e==0
,表示当前没有等待的工作线程,这种情况下要创建一个工作线程;当e>0
,说明当前有等待线程,这种情况下唤醒一下等待的工作线程。e>=0前代码
- 当e的第32位
bit
为1或者u的第32位bit
为1 并且 e的第16位bit
为1或者u的第16位bit
为1,结合上一篇我们了解到的ForkJoinPool
中的控制信息可知。 - e的第32位
bit
为1,说明ST
为1,表示当前ForkJoinPool
正在关闭。 - u的第32位
bit
为1,说明AC
为负数,表示没有足够多的活动的工作线程。 - e的第16位
bit
为1,说明ID
为负数,表示至少有一个等待线程。 - u的第16位
bit
为1,说明TC
为负数,表示没有足够多的(总的)工作线程。e>=0代码
- 排除了当前
ForkJoinPool
正在关闭的情况。ForkJoinPool#addWorker()
新创建的Pool
,里面还没有工作线程,所以signalWork()
中一定会走添加工作线程的流程。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29/**
* Tries to create and start a worker; minimally rolls back counts
* on failure.
*/
private void addWorker() {
Throwable ex = null;
ForkJoinWorkerThread t = null;
try {
t = factory.newThread(this);
} catch (Throwable e) {
ex = e;
}
if (t == null) { // null or exceptional factory return
// 1
long c; // adjust counts
do {} while (!UNSAFE.compareAndSwapLong
(this, ctlOffset, c = ctl,
(((c - AC_UNIT) & AC_MASK) |
((c - TC_UNIT) & TC_MASK) |
(c & ~(AC_MASK|TC_MASK)))));
// Propagate exception if originating from an external caller
// 2
if (!tryTerminate(false) && ex != null &&
!(Thread.currentThread() instanceof ForkJoinWorkerThread))
UNSAFE.throwException(ex);
}
else // 3
t.start();
}
标注代码分析
- 如果发生了异常导致工作线程创建失败,需要把之前累加的到控制信息的
AC
和TC
计数减回去。 - 如果调用来之外部,需要将异常传递出去。
- 创建成功的话,启动工作线程。
ForkJoinPool#ForkJoinWorkerThreadFactory#newThread()
工作线程的创建过程。调用ForkJoinWorkerThread
的构造方法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26/**
* Factory for creating new {@link ForkJoinWorkerThread}s.
* A {@code ForkJoinWorkerThreadFactory} must be defined and used
* for {@code ForkJoinWorkerThread} subclasses that extend base
* functionality or initialize threads with different contexts.
*/
public static interface ForkJoinWorkerThreadFactory {
/**
* Returns a new worker thread operating in the given pool.
*
* @param pool the pool this thread works in
* @throws NullPointerException if the pool is null
*/
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}
/**
* Default ForkJoinWorkerThreadFactory implementation; creates a
* new ForkJoinWorkerThread.
*/
static class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}
ForkJoinPool#nextWorkerName()
设置线程名称1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/**
* Counter for worker Thread names (unrelated to their poolIndex)
*/
private volatile int nextWorkerNumber;
/**
* Callback from ForkJoinWorkerThread constructor to assign a
* public name
*/
final String nextWorkerName() {
for (int n;;) {
if (UNSAFE.compareAndSwapInt(this, nextWorkerNumberOffset,
n = nextWorkerNumber, ++n))
return workerNamePrefix + n;
}
}
ForkJoinPool#registerWorker()
ForkJoinWorkerThread
注册到ForkJoinPool
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52/**
* Callback from ForkJoinWorkerThread constructor to
* determine its poolIndex and record in workers array.
*
* @param w the worker
* @return the worker's pool index
*/
final int registerWorker(ForkJoinWorkerThread w) {
/*
* In the typical case, a new worker acquires the lock, uses
* next available index and returns quickly. Since we should
* not block callers (ultimately from signalWork or
* tryPreBlock) waiting for the lock needed to do this, we
* instead help release other workers while waiting for the
* lock.
*/
for (int g;;) {
ForkJoinWorkerThread[] ws;
// 1
if (((g = scanGuard) & SG_UNIT) == 0 &&
UNSAFE.compareAndSwapInt(this, scanGuardOffset,
g, g | SG_UNIT)) {
int k = nextWorkerIndex;
try {
if ((ws = workers) != null) { // ignore on shutdown
int n = ws.length;
if (k < 0 || k >= n || ws[k] != null) {
for (k = 0; k < n && ws[k] != null; ++k)
;
if (k == n)
ws = workers = Arrays.copyOf(ws, n << 1);
}
ws[k] = w;
nextWorkerIndex = k + 1;
int m = g & SMASK;
g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
}
} finally {
scanGuard = g;
}
return k;
}
else if ((ws = workers) != null) { // help release others
for (ForkJoinWorkerThread u : ws) {
if (u != null && u.queueBase != u.queueTop) {
if (tryReleaseWaiter())
break;
}
}
}
}
}
标注代码分析
- 如果当前
scanGuard
中没有SG_UNIT
标记,尝试设置SG_UNIT
标记。这是一个加锁的过程。registerWorker()
registerWorker()
的主要逻辑就是要注册一个工作线程到ForkJoinPool
,然后返回工作线程在Pool
内部工作线程数组的下标。说明一下。
- 方法内部的主流程(无限循环)中,首先尝试获取一个顺序锁。如果获取失败,会遍历下所有的工作线程,如果发现有工作线程的任务队列里还有未处理的任务,就会尝试唤醒等待的工作线程,然后再尝试去获取顺序锁。
- 如果获取顺序锁成功,内部会将传入的工作线程设置到相应的位置(必要的时候工作线程数组需要扩容),然后返回下标。过程中会调整
scanGuard
的低16比特。
这里再次分析一下scanGuard
这个神奇的域。
- 它的高16位用来做顺序锁,我们看到在主流程中首先会查看
scanGuard
中是否含有SG_UNIT
对应的bit
信息,如果有说明已经有其他线程持有这个锁了;如果没有,就可以通过一个CAS操作来获取这个锁,获取动作就是给scanGuard
上添加上SG_UNIT
对应的bit
信息,在内部完成逻辑后会清除scanGuard
上的SG_UNIT
信息。 - 它的低16位就像它的命名一样,表示扫描的边界。上面的代码中可以看到,在注册工作线程时候会调整这个边界值,规律是这样,边界值的大小=比当前工作线程最大下标大的最小的2的幂减1(有点绕,不要晕),举几个栗子:
maxIndex=5,guard=7、maxIndex=10,guard=15
,看出规律了吧。这个值的作用是为了避免不必要的扫描,因为Pool
内部的工作线程数组size
可能比较大(初始化的时候就比并行度要大很多,回去看下Pool
的构造方法。而且还有可能扩展),但实际的工作线程数量可能比较小(比如可能数组size
是16,但里面只有4个工作线程,14个空位),如果扫描的时候扫描范围是全部数组,那一定会在空位上浪费很多时间,有了这个guard
作为边界(而不是数组的length-1
),会避免这些时间的浪费。ForkJoinWorkerThread#run()
ForkJoinPool#addWorker()
的t.start()
,实际上是执行ForkJoinWorkerThread#run()
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
* {@link ForkJoinTask}s.
*/
public void run() {
Throwable exception = null;
try {
onStart();
pool.work(this);
} catch (Throwable ex) {
exception = ex;
} finally {
onTermination(exception);
}
}
启动前回调下onStart()
,然后是主流程(pool#work
),如果方法结束,还会回调下onTermination()
。
ForkJoinWorkerThread#onStart()
1 | /** |
标注代码分析
- 初始化工作线程的任务队列。
- 生成工作线程的种子。
- 确保种子不为0。
ForkJoinWorkerThread#onTermination()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24/**
* Performs cleanup associated with termination of this worker
* thread. If you override this method, you must invoke
* {@code super.onTermination} at the end of the overridden method.
*
* @param exception the exception causing this thread to abort due
* to an unrecoverable error, or {@code null} if completed normally
*/
protected void onTermination(Throwable exception) {
try {
// 1
terminate = true;
// 2
cancelTasks();
// 3
pool.deregisterWorker(this, exception);
} catch (Throwable ex) { // Shouldn't ever happen
if (exception == null) // but if so, at least rethrown
exception = ex;
} finally {
if (exception != null)
UNSAFE.throwException(exception);
}
}
标注代码分析
- 设置关闭标志。
- 取消任务。
- 从
ForkJoinPool
上注销当前工作线程。ForkJoinPool#work
ForkJoinWorkerThread#run()
中pool#work(this)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22/**
* Top-level loop for worker threads: On each step: if the
* previous step swept through all queues and found no tasks, or
* there are excess threads, then possibly blocks. Otherwise,
* scans for and, if found, executes a task. Returns when pool
* and/or worker terminate.
*
* @param w the worker
*/
final void work(ForkJoinWorkerThread w) {
boolean swept = false; // true on empty scans
long c;
// 1
while (!w.terminate && (int)(c = ctl) >= 0) {
int a; // active count
// 2
if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
swept = scan(w, a);
else if (tryAwaitWork(w, c))// 3
swept = false;
}
}
标注代码分析
- while条件就是当前工作线程未结束且
pool
未关闭。while循环中,会不断的扫描(scan
)或等待(tryAwaitWork
)。 - 如果扫描标志(起始为false)表示上一次扫描不是空扫描(false),并且当前活动线程数量小于处理器核数(这里要注意下,返回去在看下控制信息中
AC
的定义),那么执行scan(w, a)
。 - 当前没有任务需要执行的任务 或者 当前活动的线程数量已经大于处理器核心数。进行等待
(tryAwaitWork(w, c))
。ForkJoinPool#scan()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77/**
* Scans for and, if found, executes one task. Scans start at a
* random index of workers array, and randomly select the first
* (2*#workers)-1 probes, and then, if all empty, resort to 2
* circular sweeps, which is necessary to check quiescence. and
* taking a submission only if no stealable tasks were found. The
* steal code inside the loop is a specialized form of
* ForkJoinWorkerThread.deqTask, followed bookkeeping to support
* helpJoinTask and signal propagation. The code for submission
* queues is almost identical. On each steal, the worker completes
* not only the task, but also all local tasks that this task may
* have generated. On detecting staleness or contention when
* trying to take a task, this method returns without finishing
* sweep, which allows global state rechecks before retry.
*
* @param w the worker
* @param a the number of active workers
* @return true if swept all queues without finding a task
*/
private boolean scan(ForkJoinWorkerThread w, int a) {
int g = scanGuard; // mask 0 avoids useless scans if only one active
// 1
int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
ForkJoinWorkerThread[] ws = workers;
if (ws == null || ws.length <= m) // staleness check
return false;
for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
// 2
ForkJoinWorkerThread v = ws[k & m];
if (v != null && (b = v.queueBase) != v.queueTop &&
(q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
// 3
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && v.queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
// 4
int d = (v.queueBase = b + 1) - v.queueTop;
// 5
v.stealHint = w.poolIndex;
if (d != 0)
// 6
signalWork(); // propagate if nonempty
w.execTask(t);
}
// 7
r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
// 不是一个空扫描
return false; // store next seed
}
else if (j < 0) { // xorshift
// 8
r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
}
else // 9
++k;
}
if (scanGuard != g) // staleness check
return false;
else { // try to take submission
// 10
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
if ((b = queueBase) != queueTop &&
(q = submissionQueue) != null &&
(i = (q.length - 1) & b) >= 0) {
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueBase = b + 1;
w.execTask(t);
}
return false;
}
// 11
return true; // all queues empty
}
}
标注代码分析
- 如果当前只有一个工作线程,将m设置为0,避免没用的扫描。否则获取
guard
值。 - 随机选出一个牺牲者(工作线程)。
- 如果这个牺牲者的任务队列中还有任务,尝试窃取这个任务。
- 窃取成功后,调整
queueBase
。 - 将牺牲者的
stealHint
设置为当前工作线程在pool
中的下标。 - 如果牺牲者的任务队列还有任务,继续唤醒(或创建)线程。
- 计算出下一个随机种子。
- 前
2*m
次,随机扫描。 - 后
2*m
次,顺序扫描。 - 如果扫描完毕后没找到可窃取的任务,那么从
Pool
的提交任务队列中取一个任务来执行。 - 如果所有的队列(工作线程的任务队列和
pool
的任务队列)都是空的,返回true。scan()
- 需要确定扫描边界值m,如果当前只有一个工作线程,那么m就为0,避免多余的扫描;如果当前有多个工作线程,那么m就是
scanGuard
的低16位表示的值(去前面看看scanGuard
)。 - 确定m以后,开始一个for循环进行扫描。扫描的目的就是要通过工作线程的
seed
(这个域之前没提,使用来选择一个窃取牺牲者的)算出一个牺牲者(victim)的下标,牺牲者也是一个工作线程,然后当前工作线程便会从牺牲者的任务队列里面窃取一个任务来执行。当然如果算出的下标对应的位置上没有牺牲者(工作线程),或者牺牲者的任务队列里没有任务,就会进行下一次尝试。整个for循环最多会尝试4*m
次,前2*m
次是随机算下标,每次会通过xorshift算法
来生成新的k。后2*m
次是顺序递增k来算下标。 - 如果for循环结束了还没有扫描到任务,那么就会从
Pool
的submissionQueue
中窃取一个任务来执行了。ForkJoinWorkerThread#execTask()
由于是初始提交,scan()
中一定是从ForkJoinPool#submissionQueue()
中获取提交的任务,然后执行execTask()
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19/**
* Runs the given task, plus any local tasks until queue is empty
*/
final void execTask(ForkJoinTask<?> t) {
currentSteal = t;
for (;;) {
if (t != null)
// 1
t.doExec();
// 2
if (queueTop == queueBase)
break;
// 3
t = locallyFifo ? locallyDeqTask() : popTask();
}
// 4
++stealCount;
currentSteal = null;
}
标注代码分析
- 执行任务。
- 如果当前工作线程的任务队列里没有任务了,退出循环。
- 根据模式来获取任务。如果
Pool
中指定为异步模式,这里从当前任务队列的尾部获取任务;否则,从任务队列头部获取任务。 - 最后累加窃取任务计数。
ForJoinTask#doExec()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21/**
* Primary execution method for stolen tasks. Unless done, calls
* exec and records status if completed, but doesn't wait for
* completion otherwise.
*/
final void doExec() {
if (status >= 0) {
boolean completed;
try {
// 1
completed = exec();
} catch (Throwable rex) {
// 2
setExceptionalCompletion(rex);
return;
}
if (completed)
// 3
setCompletion(NORMAL); // must be outside try block
}
}
标注代码分析
- 调用
exec()
执行任务。 - 设置异常完成结果。
- 设置正常完成结果。
ForJoinTask#exec()
是一个抽象方法,具体执行逻辑交给子类去实现,前面看到的适配类AdaptedRunnable
和AdaptedCallable
里面,会在exec()
里面分别调用runnable#run()
和callable#call()
;而在ForJoinTask
的两个子类RecursiveAction
和RecursiveTask
里面,exec()
里面调用的是compute()
。
RecursiveAction#exec()
1 | /** |
RecursiveAction#compute()
1 | /** |
RecursiveTask#exec()
1 | /** |
RecursiveTask#compute()
1 | /** |
ForJoinTask#exec()
有返回值,表示任务是否执行完毕。doExec()
中会根据这个返回值来设置任务的完成状态,如果任务正常完成,会调用setCompletion(NORMAL)
。
ForkJoinTask#setCompletion()
1 | volatile int status; // accessed directly by pool and workers |
标注代码分析
- 尝试将任务状态设置为正常完成。
- 同时唤醒合并当前任务的等待线程。
ForkJoinTask#setExceptionalCompletion()
doExec()
中如果执行exec()
发生异常,会调用setExceptionalCompletion()
来设置异常完成状态。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30/**
* Records exception and sets exceptional completion.
*
* @return status on exit
*/
private int setExceptionalCompletion(Throwable ex) {
// 1
int h = System.identityHashCode(this);
final ReentrantLock lock = exceptionTableLock;
lock.lock();
try {
// 2
expungeStaleExceptions();
// 3
ExceptionNode[] t = exceptionTable;
// 4
int i = h & (t.length - 1);
for (ExceptionNode e = t[i]; ; e = e.next) {
if (e == null) {
t[i] = new ExceptionNode(this, ex, t[i]);
break;
}
if (e.get() == this) // already present
break;
}
} finally {
lock.unlock();
}
return setCompletion(EXCEPTIONAL);
}
标注代码分析
- 计算当前对象的原生
hashCode
。 - 删除异常表中过期的异常。
- 获取异常数组。
- 通过当前对象
hashCode
获取在异常表中的下标。
注意到setExceptionalCompletion()
中最后还是调用setCompletion
,但之前会做一下将异常放入一个异常表的工作。
看到这里可能会有疑问,我们通过2个问题来了解下这个异常表。
这里为什么会有一个异常表呢?
因为异常很少发生,所以没有将异常保存在任务对象内,而是放在一个弱引用异常表里(异常表里不会保存取消异常)。
异常表的结构是怎么样的呢?
这里的异常表结构上是一个哈希表,每个桶里是单链,弱引用Key。
Exception table
1 | /** |
1 | // Unsafe mechanics |
继续流程,执行完doExec()
,方法返回到execTask()
,接下来由于当前工作线程自身的任务队列中并没有任务,所以queueTop == queueBase
成立,execTask()
退出到scan()
,scan()
返回false到ForkJoinPool#work()
,进行下一次扫描(scan
),由于工作线程本身的任务队列和Pool
的任务队列都为空,所以下一次扫描一定是个空扫描,然后程序会走到work()
的tryAwaitWork
分支,看下tryAwaitWork()
。
ForkJoinPool#tryAwaitWork()
1 | /** |
标注代码分析
w#nextWait
保存的是等待之前Pool
的控制信息。- 这里是将当前线程的ID信息(下标取反)记录到
Pool
控制信息上,同时将控制信息上的活动工作线程计数减1。 - 如果和另外的一个窃取线程竞争并失败,这里返回true,
work()
中会继续扫描。 - 将工作线程上的
stealCount
原子累加到Pool#stealCount
上面。 - 如果
eventCount
发生变化,重试。 Pool
未关闭且有工作线程且活动的工作线程数量等于cpu
核心数量,且没有工作线程在合并过程中阻塞且没有工作线程休眠。- 如果满足条件,说明当前
Pool
休眠,需要调用下idleAwaitWork
进行处理。 - 如果
eventCount
发生变化,重试。 - 这里再重新扫描一下,如果从其他工作线程任务队列里找到任务,尝试唤醒等待的工作线程。
- 发生竞争,再次扫描。
scanGuard
发生变化或者从Pool
任务队列中找到任务,再次扫描。- 出让cpu,减少竞争。
park
前清除中断标记。- 设置
park
标记。
这个方法的目的就是阻塞工作线程w,但过程中有一些细节。
- 首先,方法中要调整
Pool
的控制信息ctl
,将w#ID
信息设置到ctl
上并将ctl
上保存的活动工作线程数量减1。 - 其次,在阻塞前,还会将w的窃取任务数量累计到
Pool
的总窃取任务数量(stealCount
)上;再次,如果当前Pool
正好处理休眠状态,那么要调用idleAwaitWork
处理一下(可能会结束工作线程)。 - 最后,再真正阻塞前还要扫描一下工作线程任务队列和
Pool
任务队列,如果发现有任务,会尝试唤醒一个等待工作线程(可能是自己),这个过程要结束时会清除一下当前线程的中断标记,然后进行阻塞(以Pool
为Blocker
,相当于进入Pool
的等待队列)。ForkJoinPool#idleAwaitWork()
如果按照流程,到这儿就结束了,工作线程已经执行提交的任务,然后阻塞等待了。
但如果当前Pool
正好处于休眠状态了,看看会怎么样,继续看下上面方法中调用的idleAwaitWork()
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47/**
* If inactivating worker w has caused pool to become
* quiescent, check for pool termination, and wait for event
* for up to SHRINK_RATE nanosecs (rescans are unnecessary in
* this case because quiescence reflects consensus about lack
* of work). On timeout, if ctl has not changed, terminate the
* worker. Upon its termination (see deregisterWorker), it may
* wake up another worker to possibly repeat this process.
*
* @param w the calling worker
* @param currentCtl the ctl value after enqueuing w
* @param prevCtl the ctl value if w terminated
* @param v the eventCount w awaits change
*/
private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
long prevCtl, int v) {
if (w.eventCount == v) {
if (shutdown)
// 1
tryTerminate(false);
// 2
ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
while (ctl == currentCtl) {
long startTime = System.nanoTime();
w.parked = true;
if (w.eventCount == v) // must recheck
// 3
LockSupport.parkNanos(this, SHRINK_RATE);
w.parked = false;
if (w.eventCount != v)
break;
else if (System.nanoTime() - startTime <
SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop
// 4
Thread.interrupted(); // spurious wakeup
else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
currentCtl, prevCtl)) {// 5
// 6
w.terminate = true; // restore previous
// 7
w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK;
break;
}
}
}
}
标注代码分析
- 如果关闭方法已经被调用,那么调用
tryTerminate()
。 - 清理一下异常表中
weak key
。 - 阻塞给定时间(4秒)。
- 如果发生伪唤醒,清除中断标志。
- 恢复之前的
ctl
,如果ctl
一直没发生变化,会进入if。 - 结束工作线程,设置结束标志。
- 设置
w#eventCount
。
idleAwaitWork()
中开始会检测一下Pool
是否正在关闭,是的话要调用tryTerminate
;否则会先将工作线程w阻塞一段时间(4s),如果超过了这个时间,Pool
的控制信息还没发生变化(说明Pool
还是休眠状态),那么就需要将w结束掉,会设置w的结束标记为true,同时设置w#eventCount
,然后退出到tryAwaitWork()
,tryAwaitWork()
中检测到w#eventCount
发生变化,会退出到work()
,work()
中检测到w的结束标记为true,主循环回退出,工作线程w就要结束了,结束时会调用w#onTermination()
。
ForkJoinWorkerThread#onTermination()
1 | /** |
ForkJoinWorkerThread#cancelTasks()
取消任务。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20/**
* Removes and cancels all tasks in queue. Can be called from any
* thread.
*/
final void cancelTasks() {
ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
if (cj != null && cj.status >= 0)
// 1
cj.cancelIgnoringExceptions();
ForkJoinTask<?> cs = currentSteal;
if (cs != null && cs.status >= 0)
// 2
cs.cancelIgnoringExceptions();
while (queueBase != queueTop) {
// 3
ForkJoinTask<?> t = deqTask();
if (t != null)
t.cancelIgnoringExceptions();
}
}
标注代码分析
- 取消正在合并的任务。
- 取消窃取的任务。
- 取消工作线程任务队列中的所有任务。
ForkJoinTask#cancel()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35.....
public boolean cancel(boolean mayInterruptIfRunning) {
return setCompletion(CANCELLED) == CANCELLED;
}
/**
* Cancels, ignoring any exceptions thrown by cancel. Used during
* worker and pool shutdown. Cancel is spec'ed not to throw any
* exceptions, but if it does anyway, we have no recourse during
* shutdown, so guard against this case.
*/
final void cancelIgnoringExceptions() {
try {
cancel(false);
} catch (Throwable ignore) {
}
}
/**
* Marks completion and wakes up threads waiting to join this task,
* also clearing signal request bits.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
*/
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
if (s != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}
ForkJoinPool#deregisterWorker()
从Pool中注销自己。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45/**
* Final callback from terminating worker. Removes record of
* worker from array, and adjusts counts. If pool is shutting
* down, tries to complete termination.
*
* @param w the worker
*/
final void deregisterWorker(ForkJoinWorkerThread w, Throwable ex) {
int idx = w.poolIndex;
int sc = w.stealCount;
int steps = 0;
// Remove from array, adjust worker counts and collect steal count.
// We can intermix failed removes or adjusts with steal updates
do {
long s, c;
int g;
if (steps == 0 && ((g = scanGuard) & SG_UNIT) == 0 &&
UNSAFE.compareAndSwapInt(this, scanGuardOffset,
g, g |= SG_UNIT)) {
ForkJoinWorkerThread[] ws = workers;
if (ws != null && idx >= 0 &&
idx < ws.length && ws[idx] == w)
ws[idx] = null; // verify
nextWorkerIndex = idx;
scanGuard = g + SG_UNIT;
steps = 1;
}
if (steps == 1 &&
UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
(((c - AC_UNIT) & AC_MASK) |
((c - TC_UNIT) & TC_MASK) |
(c & ~(AC_MASK|TC_MASK)))))
steps = 2;
if (sc != 0 &&
UNSAFE.compareAndSwapLong(this, stealCountOffset,
s = stealCount, s + sc))
sc = 0;
} while (steps != 2 || sc != 0);
if (!tryTerminate(false)) {
if (ex != null) // possibly replace if died abnormally
signalWork();
else
tryReleaseWaiter();
}
}
deregisterWorker()
中首先在一个while无限循环中完成工作线程注销的工作,包括3个阶段,全部完成后再执行下一步。之所以这样做是由于每个阶段都可能发生竞争,需要重试。
- 第1阶段,会在获取顺序锁(
scanGuard
高16位)的情况下将Pool
中工作线程对应的数组位置置空,并调整nextWorkerIndex
。 - 第2阶段,将控制信息
ctl
中的活动工作线程数量和总工作线程数量减1。 - 第3阶段,将要注销工作线程的窃取任务数量累加到
Pool
的总窃取任务数量上。
deregisterWorker()
在完成注销线程工作后,还有可能会唤醒其他等待线程,首先会调用一下tryTerminate(false)
。
ForkJoinPool#tryTerminate()
1 | /** |
参数为false
参数为false(表示不会马上关闭Pool),那么实现中会查看是否有活动的工作线程,有的话返回false。然后检查Pool
是否还在运行中(包括Pool
有没有被关闭、有没有等待合并的工作线程、有没有空闲的工作线程、提交队列中是否有任务等),如果还在运行中,返回false。 否则会尝试设置关闭标志到控制信息,然后调用startTerminating()
。
参数为true
参数为true的话也会直接进行这些操作。然后再检测下如果当前总的工作线程为0,就会唤醒在termination
条件上等待的线程了。
ForkJoinPool#startTerminating()
1 | /** |
标注代码分析
- 取消
Pool
提交任务队列中的任务。 - 结束工作线程。
- 取消工作线程中任务队列的任务。
- 中断工作线程。
- 结束等待的工作线程。
startTerminating流程
- 取消
Pool#submissionQueue
中的任务。 - 将所有的工作线程的结束状态设置为true。
- 取消所有工作线程的任务队列中未完成的任务。
- 中断所有工作线程。
- 结束还在等待的工作线程。
ForkJoinPool#cancelSubmissions()
1
2
3
4
5
6
7
8
9
10
11
12
13
14/**
* Polls and cancels all submissions. Called only during termination.
*/
private void cancelSubmissions() {
while (queueBase != queueTop) {
ForkJoinTask<?> task = pollSubmission();
if (task != null) {
try {
task.cancel(false);
} catch (Throwable ignore) {
}
}
}
}
ForkJoinPool#pollSubmission()
1 | /** |
ForkJoinPool#terminateWaiters()
1 | /** |
隐式的等待工作线程组成的链(也叫Treiber stack
)。这里可能猛地一看,方法内部只唤醒了一个等待线程,这个等待线程的ID
信息存储在Pool
的控制信息中。但仔细看会发现,在设置(调整)ctl的时候,有这句w.nextWait & E_MASK
,也就是说,唤醒w
之后,ctl
里又会有另一个等待工作线程的ID
信息(这个信息之前是存在w#nextWait
上面的)。deregisterWorker()
里,假设当前Pool
还在运行,那么tryTerminate(false)
返回false,就会执行if里的语句。1
2
3
4
5
6
7if (!tryTerminate(false)) {
// 1
if (ex != null) // possibly replace if died abnormally
signalWork();
else// 2
tryReleaseWaiter();
}
标注代码分析
- 如果有异常发生,会调用
signalWork()
来唤醒或者创建一个工作线程。 - 调用
tryReleaseWaiter()
来尝试唤醒一个等待工作线程。ForkJoinPool#tryReleaseWaiter()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29/**
* Variant of signalWork to help release waiters on rescans.
* Tries once to release a waiter if active count < 0.
*
* @return false if failed due to contention, else true
*/
private boolean tryReleaseWaiter() {
long c; int e, i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
// 1
if ((e = (int)(c = ctl)) > 0 &&
(int)(c >> AC_SHIFT) < 0 &&
(ws = workers) != null &&
(i = ~e & SMASK) < ws.length &&
(w = ws[i]) != null) {
// 2
long nc = ((long)(w.nextWait & E_MASK) |
((c + AC_UNIT) & (AC_MASK|TC_MASK)));
if (w.eventCount != e ||
!UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
// 3
return false;
// 4
w.eventCount = (e + EC_UNIT) & E_MASK;
if (w.parked)
// 5
UNSAFE.unpark(w);
}
return true;
}
标注代码分析
(e = (int)(c = ctl)) > 0
如果有等待线程,(int)(c >> AC_SHIFT) < 0
当前活动线程数小于CPU核数,(ws = workers) != null
检测工作线程数组合法性,(i = ~e & SMASK) < ws.length
检测控制信息中等待的工作线程的ID
信息的合法性,(w = ws[i]) != null
检测工作线程的合法性。- 尝试调整控制信息,增加活动工作线程计数,将
Treiber stack
下一个等待线程的ID
信息设置到ctl
。 - 如果发生冲突。
- 累加
w#eventCount
。 - 唤醒w。
ForkJoinWorkerThread#execTask()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/**
* Runs the given task, plus any local tasks until queue is empty
*/
final void execTask(ForkJoinTask<?> t) {
currentSteal = t;
for (;;) {
if (t != null)
t.doExec();
if (queueTop == queueBase)
break;
t = locallyFifo ? locallyDeqTask() : popTask();
}
++stealCount;
currentSteal = null;
}
假如执行完t,发现当前工作的任务队列中还有任务,那么接下来就会根据当前Pool
的工作模式(是否是同步模式),来通过locallyDeqTask()
或者popTask()
获取一个任务出来继续执行。
工作线程中的任务队列(Pool
中的任务队列也一样),形式上是一个数组,但概念上是一个双端队列。但和其他双端队列(JDK里另外的双端队列实现)不一样的是,这里的队列只定义了三种操作:从队列首部入队(push)、从队列首部出队(pop)、从队列尾部出队(deg)。这里既然说是队列,但又说什么push、pop可能会让人感觉晕晕的,其实可以这样理解,双端队列就可以看成是栈和队列的混血。结合使用Pool
内部工作原理来说,如果不是异步模式(默认),那么就会把任务队列当成一个FIFO队列来使用;否则就相当于把任务队列当成一个栈来使用。
ForkJoinWorkerThread#popTask()
1 | /** |
如果Pool
不是异步模式(locallyFifo
为false),那么会执行popTask()
。从当前任务队列pop一个任务出来。
ForkJoinWorkerThread#locallyDeqTask()
1 | /** |
如果Pool
是异步模式(locallyFifo
为true
),那么会执行locallyDeqTask()
。就是从当前任务队列deg
一个任务出来。
总结
- 被创建,注册到
Pool
中,然后启动。 - 扫描所有工作线程的队列和
Pool
中的任务队列,窃取一个任务来执行。 - 执行完毕后,在从自身工作队列中获取任务来执行。
- 没任务执行了,会阻塞等待,如果正好赶上了
Pool
休眠,那么会被结束掉,从Pool
中注销。