非Fork/Join源码分析

ForkJoinPool属性

部分参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Main pool control -- a long packed with:
* AC: Number of active running workers minus target parallelism (16 bits)
* TC: Number of total workers minus target parallelism (16bits)
* ST: true if pool is terminating (1 bit)
* EC: the wait count of top waiting thread (15 bits)
* ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits)
*
* When convenient, we can extract the upper 32 bits of counts and
* the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
* (int)ctl. The ec field is never accessed alone, but always
* together with id and st. The offsets of counts by the target
* parallelism and the positionings of fields makes it possible to
* perform the most common checks via sign tests of fields: When
* ac is negative, there are not enough active workers, when tc is
* negative, there are not enough total workers, when id is
* negative, there is at least one waiting worker, and when e is
* negative, the pool is terminating. To deal with these possibly
* negative fields, we use casts in and out of "short" and/or
* signed shifts to maintain signedness.
*/
volatile long ctl;

// bit positions/shifts for fields
private static final int AC_SHIFT = 48;
private static final int TC_SHIFT = 32;
private static final int ST_SHIFT = 31;
private static final int EC_SHIFT = 16;

// bounds
private static final int MAX_ID = 0x7fff; // max poolIndex
private static final int SMASK = 0xffff; // mask short bits
private static final int SHORT_SIGN = 1 << 15;
private static final int INT_SIGN = 1 << 31;

// masks
private static final long STOP_BIT = 0x0001L << ST_SHIFT;
private static final long AC_MASK = ((long)SMASK) << AC_SHIFT;
private static final long TC_MASK = ((long)SMASK) << TC_SHIFT;

// units for incrementing and decrementing
private static final long TC_UNIT = 1L << TC_SHIFT;
private static final long AC_UNIT = 1L << AC_SHIFT;

// masks and units for dealing with u = (int)(ctl >>> 32)
private static final int UAC_SHIFT = AC_SHIFT - 32;
private static final int UTC_SHIFT = TC_SHIFT - 32;
private static final int UAC_MASK = SMASK << UAC_SHIFT;
private static final int UTC_MASK = SMASK << UTC_SHIFT;
private static final int UAC_UNIT = 1 << UAC_SHIFT;
private static final int UTC_UNIT = 1 << UTC_SHIFT;

// masks and units for dealing with e = (int)ctl
private static final int E_MASK = 0x7fffffff; // no STOP_BIT
private static final int EC_UNIT = 1 << EC_SHIFT;

ForkJoinPool的总控制信息,包含在一个long(64bit)里面。

  • AC: 表示当前活动的工作线程的数量减去并行度得到的数值。(16 bits)
  • TC: 表示全部工作线程的数量减去并行度得到的数值。(16bits)
  • ST: 表示当前ForkJoinPool是否正在关闭。(1 bit)
  • EC: 表示Treiber stack顶端的等待工作线程的等待次数。(15 bits)
  • ID: Treiber stack顶端的等待工作线程的下标取反。(16 bits)
1111111111111111 1111111111111111 1 111111111111111 1111111111111111
AC TC ST EC ID
  • 如果AC为负数,说明没有足够的活动工作线程。
  • 如果TC为负数,说明工作线程数量没达到最大工作线程数量。
  • 如果ID为负数,说明至少有一个等待的工作线程。
  • 如果(int)ctl为负数,说明ForkJoinPool正在关闭。 ctlForkJoinPool中最重要的,也是设计最精密的域,它是整个ForkJoinPool的总控信息。所有信息包含在一个long(64bit)中,这些信息包括:当前活动的工作线程数量、当前总的工作线程数量、ForkJoinPool的关闭标志、在Treiber stack(由全部等待工作线程组成的一个链)顶端等待的工作线程的等待次数、Treiber stack(由全部等待工作线程组成的一个链)顶端等待的工作线程的ID信息(工作线程的下标取反)。
    ctl还有一个相对不重要的作用就是,某些非volatile域会依赖ctl来保证可见性。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /**
    * Array holding all worker threads in the pool. Initialized upon
    * construction. Array size must be a power of two. Updates and
    * replacements are protected by scanGuard, but the array is
    * always kept in a consistent enough state to be randomly
    * accessed without locking by workers performing work-stealing,
    * as well as other traversal-based methods in this class, so long
    * as reads memory-acquire by first reading ctl. All readers must
    * tolerate that some array slots may be null.
    */
    ForkJoinWorkerThread[] workers;

workersForkJoinPool中保存工作线程的数组,它的更新会由一个锁(scanGuard)来保护。

1
2
3
4
5
6
7
8
9
10
11
/**
* SeqLock and index masking for updates to workers array. Locked
* when SG_UNIT is set. Unlocking clears bit by adding
* SG_UNIT. Staleness of read-only operations can be checked by
* comparing scanGuard to value before the reads. The low 16 bits
* (i.e, anding with SMASK) hold (the smallest power of two
* covering all worker indices, minus one, and is used to avoid
* dealing with large numbers of null slots when the workers array
* is overallocated.
*/
volatile int scanGuard;

scanGuard是另外一个比较重要的域,它有两个作用。

  1. 作为更新工作线程数组是使用的(顺序)锁。
  2. 作为扫描工作线程数组时使用的边界值来避免一些没用的扫描(当数组过大时)。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    /**
    * Initial size for submission queue array. Must be a power of
    * two. In many applications, these always stay small so we use a
    * small initial cap.
    */
    private static final int INITIAL_QUEUE_CAPACITY = 8;

    /**
    * Maximum size for submission queue array. Must be a power of two
    * less than or equal to 1 << (31 - width of array entry) to
    * ensure lack of index wraparound, but is capped at a lower
    * value to help users trap runaway computations.
    */
    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M

    /**
    * Array serving as submission queue. Initialized upon construction.
    */
    private ForkJoinTask<?>[] submissionQueue;

    /**
    * Lock protecting submissions array for addSubmission
    */
    private final ReentrantLock submissionLock;

ForkJoinPool中也有一个队列submissionQueue,这个队列里存放的是有外部(非ForkJoin工作线程)提交到ForkJoinPool中的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Index (mod submission queue length) of next element to take
* from submission queue. Usage is identical to that for
* per-worker queues -- see ForkJoinWorkerThread internal
* documentation.
*/
volatile int queueBase;

/**
* Index (mod submission queue length) of next element to add
* in submission queue. Usage is identical to that for
* per-worker queues -- see ForkJoinWorkerThread internal
* documentation.
*/
int queueTop;

这两个域分别表示submissionQueue的底部和顶部。

1
2
3
4
5
/**
* True if use local fifo, not default lifo, for local polling
* Read by, and replicated by ForkJoinWorkerThreads
*/
final boolean locallyFifo;

locallyFifo域也比较重要,它有ForkJoinPool的构造方法的参数asyncMode来指定。如果locallyFifo为true,表示内部将才用FIFO的方式来调度任务队列中的任务,而且这些任务可以分裂(fork),但最好不要合并(join),这种模式很适合来处理事件形式(event-style)的异步任务。默认locallyFifo为false。

ForkJoinWorkerThread属性

ForkJoinWorkerThread是ForkJoin框架中负责执行具体任务的工作线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Capacity of work-stealing queue array upon initialization.
* Must be a power of two. Initial size must be at least 4, but is
* padded to minimize cache effects.
*/
private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

/**
* Maximum size for queue array. Must be a power of two
* less than or equal to 1 << (31 - width of array entry) to
* ensure lack of index wraparound, but is capped at a lower
* value to help users trap runaway computations.
*/
private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M

/**
* The work-stealing queue array. Size must be a power of two.
* Initialized when started (as oposed to when constructed), to
* improve memory locality.
*/
ForkJoinTask<?>[] queue;

/**
* The pool this thread works in. Accessed directly by ForkJoinTask.
*/
final ForkJoinPool pool;

/**
* Index (mod queue.length) of next queue slot to push to or pop
* from. It is written only by owner thread, and accessed by other
* threads only after reading (volatile) queueBase. Both queueTop
* and queueBase are allowed to wrap around on overflow, but
* (queueTop - queueBase) still estimates size.
*/
int queueTop;

/**
* Index (mod queue.length) of least valid queue slot, which is
* always the next position to steal from if nonempty.
*/
volatile int queueBase;

queue就是ForkJoinWorkerThread中的任务队列,当从其他工作线程中窃取任务时,就是从这个队列中进行窃取。

1
2
3
4
5
6
/**
* Index of this worker in pool array. Set once by pool before
* running, and accessed directly by pool to locate this worker in
* its workers array.
*/
final int poolIndex;

工作线程在ForkJoinPool中工作线程数组中的下标。

1
2
3
4
5
6
7
8
/**
* The index of most recent stealer, used as a hint to avoid
* traversal in method helpJoinTask. This is only a hint because a
* worker might have had multiple steals and this only holds one
* of them (usually the most current). Declared non-volatile,
* relying on other prevailing sync to keep reasonably current.
*/
int stealHint;

stealHint保存了最近的窃取者(来窃取任务的工作线程)的下标(poolIndex)。注意这个值不准确,因为可能同时有很多窃取者来窃取任务,这个值只能记录其中之一。

1
2
3
4
5
/**
* Encoded record for pool task waits. Usages are always
* surrounded by volatile reads/writes
*/
int nextWait;

nextWait算是比较难理解的一个域。首先所有的等待工作线程组成了一个隐式的单链(代码中也叫Treiber stack,由于行为类似于栈),链顶端的等待工作线程的信息保存在Poolctl中,新来的等待工作线程会将ctl中之前的等待工作线程信息保存到nextWait上,然后将自己的信息设置到ctl上。

1
2
3
4
5
/**
* True if use local fifo, not default lifo, for local polling.
* Shadows value from ForkJoinPool.
*/
final boolean locallyFifo;

这个和ForkJoinPool#locallyFifo一致。

1
2
3
4
5
6
/**
* The task most recently stolen from another worker (or
* submission queue). All uses are surrounded by enough volatile
* reads/writes to maintain as non-volatile.
*/
ForkJoinTask<?> currentSteal;

当前工作线程最新窃取的任务。注意可以从其他工作线程的任务队列或者从Pool中的提交任务队列(submissionQueue)中窃取任务。

1
2
3
4
5
6
/**
* The task currently being joined, set only when actively trying
* to help other stealers in helpJoinTask. All uses are surrounded
* by enough volatile reads/writes to maintain as non-volatile.
*/
ForkJoinTask<?> currentJoin;

当前工作线程正在合并的任务。

ForkJoinTask属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
* The status field holds run control status bits packed into a
* single int to minimize footprint and to ensure atomicity (via
* CAS). Status is initially zero, and takes on nonnegative
* values until completed, upon which status holds value
* NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
* waits by other threads have the SIGNAL bit set. Completion of
* a stolen task with SIGNAL set awakens any waiters via
* notifyAll. Even though suboptimal for some purposes, we use
* basic builtin wait/notify to take advantage of "monitor
* inflation" in JVMs that we would otherwise need to emulate to
* avoid adding further per-task bookkeeping overhead. We want
* these monitors to be "fat", i.e., not use biasing or thin-lock
* techniques, so use some odd coding idioms that tend to avoid
* them.
*/

/** The run status of this task */
volatile int status; // accessed directly by pool and workers
private static final int NORMAL = -1;
private static final int CANCELLED = -2;
private static final int EXCEPTIONAL = -3;
private static final int SIGNAL = 1;

ForkJoinTask中只有一个表示运行状态的域。初始为0;1表示在等待被唤醒;负数都表示执行完毕。

  • -1表示正常完成。
  • -2表示被取消。
  • -3表示异常结束。

    非ForkJoin任务的执行过程

    非ForkJoin(Runnable或者Callable)任务的执行过程来分析ForkJoin的相关代码,注意这里说的非ForkJoin任务实际上也是ForkJoinTask,只是没有分裂(fork)/合并(join)过程。

    非ForkJoin任务的代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public static void main(String[] args) {  
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    ForkJoinTask<?> task = forkJoinPool.submit(new Runnable() {
    @Override
    public void run() {
    System.out.println("AAA");
    }
    });
    try {
    task.get();
    forkJoinPool.shutdown();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

代码中提交了一个Runnable的任务到ForkJoinPool,任务的执行就是打印一句话。
下面就从提交一个Runnable的任务到ForkJoinPool,直到任务被执行的过程来分析源码。

ForkJoinPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
.....

/**
* Creates a {@code ForkJoinPool} with the given parameters.
*
* @param parallelism the parallelism level. For default value,
* use {@link java.lang.Runtime#availableProcessors}.
* @param factory the factory for creating new threads. For default value,
* use {@link #defaultForkJoinWorkerThreadFactory}.
* @param handler the handler for internal worker threads that
* terminate due to unrecoverable errors encountered while executing
* tasks. For default value, use {@code null}.
* @param asyncMode if true,
* establishes local first-in-first-out scheduling mode for forked
* tasks that are never joined. This mode may be more appropriate
* than default locally stack-based mode in applications in which
* worker threads only process event-style asynchronous tasks.
* For default value, use {@code false}.
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
Thread.UncaughtExceptionHandler handler,
boolean asyncMode) {
checkPermission();
if (factory == null)
throw new NullPointerException();
if (parallelism <= 0 || parallelism > MAX_ID)
throw new IllegalArgumentException();
// 1
this.parallelism = parallelism;
// 2
this.factory = factory;
// 3
this.ueh = handler;
// 4
this.locallyFifo = asyncMode;
// 5
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
// 6
this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
// initialize workers array with room for 2*parallelism if possible
// 7
int n = parallelism << 1;
if (n >= MAX_ID)
n = MAX_ID;
else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
}
// 8
workers = new ForkJoinWorkerThread[n + 1];
this.submissionLock = new ReentrantLock();
this.termination = submissionLock.newCondition();
// 9
StringBuilder sb = new StringBuilder("ForkJoinPool-");
sb.append(poolNumberGenerator.incrementAndGet());
sb.append("-worker-");
this.workerNamePrefix = sb.toString();
}

标注代码分析

  1. 设置并行度。
  2. 设置工作线程工厂。
  3. 设置线程未捕获异常处理器。
  4. 设置是否为异步模式。
  5. 参考我们之前介绍的ctl的内容,由于ctl中的AC表示当前活动。工作线程数量减去并行度,所以这里要将这个信息加到ctl上。
  6. 初始化提交任务队列。
  7. 这里需要根据并行度来算出工作线程数组的大小。由于数组大小必须的2的幂,这里的算法是算出比。parallelism的2倍大的最小的2的幂,但不能超过。MAX_ID + 1(1 << 16)的数作为工作线程数组大小。
  8. 创建存放工作线程的数组。
  9. 生成工作线程名称前缀。

    ForkJoinPool

  • 并行度如果未提供,默认就是当前处理器核数。
  • 初始化控制信息的部分,注意在ACTC的信息上减掉了并行度,比如如果并行度为4,那么初始的ACTC就都是-4,那么如果后续发现AC等于0,就说明当前活动的线程数正好等于处理器核心数量。
  • 确定工作线程数组大小的过程是这样的,首先取一个数n,默认是并行度的2倍。然后会使用来自HD的一个位操作技巧,就是将n的位模式的前导1后面所有的位都变成1,其实就是一个比n大的2的幂减1的数。当然n最大不能超过MAX_ID,最终数组的大小是n+1,是一个2的幂,能简化后续的取模操作。

    ForkJoinPool默认的工作线程工厂

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    static {
    poolNumberGenerator = new AtomicInteger();
    workerSeedGenerator = new Random();
    modifyThreadPermission = new RuntimePermission("modifyThread");
    defaultForkJoinWorkerThreadFactory =
    new DefaultForkJoinWorkerThreadFactory();
    int s;
    try {
    UNSAFE = sun.misc.Unsafe.getUnsafe();
    Class k = ForkJoinPool.class;
    ctlOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("ctl"));
    stealCountOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("stealCount"));
    blockedCountOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("blockedCount"));
    quiescerCountOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("quiescerCount"));
    scanGuardOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("scanGuard"));
    nextWorkerNumberOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("nextWorkerNumber"));
    Class a = ForkJoinTask[].class;
    ABASE = UNSAFE.arrayBaseOffset(a);
    s = UNSAFE.arrayIndexScale(a);
    } catch (Exception e) {
    throw new Error(e);
    }
    if ((s & (s-1)) != 0)
    throw new Error("data type scale not a power of two");
    ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
    }
1
2
3
4
5
6
7
8
9
10
/**
* Default ForkJoinWorkerThreadFactory implementation; creates a
* new ForkJoinWorkerThread.
*/
static class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Factory for creating new {@link ForkJoinWorkerThread}s.
* A {@code ForkJoinWorkerThreadFactory} must be defined and used
* for {@code ForkJoinWorkerThread} subclasses that extend base
* functionality or initialize threads with different contexts.
*/
public static interface ForkJoinWorkerThreadFactory {
/**
* Returns a new worker thread operating in the given pool.
*
* @param pool the pool this thread works in
* @throws NullPointerException if the pool is null
*/
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}

ForkJoinPool#submit()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Submits a ForkJoinTask for execution.
*
* @param task the task to submit
* @return the task
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
forkOrSubmit(task);
return task;
}

/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Callable<T> task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task);
forkOrSubmit(job);
return job;
}

/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
forkOrSubmit(job);
return job;
}

/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public ForkJoinTask<?> submit(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
forkOrSubmit(job);
return job;
}

ForkJoinPool中定义了一些列重载的submit(),这些submit()ForkJoinTask内部会先将CallableRunnable包装(适配)成ForkJoinTask,然后再进行提交。

ForkJoinTask#adapt()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Returns a new {@code ForkJoinTask} that performs the {@code run}
* method of the given {@code Runnable} as its action, and returns
* a null result upon {@link #join}.
*
* @param runnable the runnable action
* @return the task
*/
public static ForkJoinTask<?> adapt(Runnable runnable) {
return new AdaptedRunnable<Void>(runnable, null);
}

/**
* Returns a new {@code ForkJoinTask} that performs the {@code run}
* method of the given {@code Runnable} as its action, and returns
* the given result upon {@link #join}.
*
* @param runnable the runnable action
* @param result the result upon completion
* @return the task
*/
public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
return new AdaptedRunnable<T>(runnable, result);
}

/**
* Returns a new {@code ForkJoinTask} that performs the {@code call}
* method of the given {@code Callable} as its action, and returns
* its result upon {@link #join}, translating any checked exceptions
* encountered into {@code RuntimeException}.
*
* @param callable the callable action
* @return the task
*/
public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
return new AdaptedCallable<T>(callable);
}

ForkJoinTask#AdaptedCallable()

Callable会包装成一个AdaptedCallable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Adaptor for Callables
*/
static final class AdaptedCallable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
final Callable<? extends T> callable;
T result;
AdaptedCallable(Callable<? extends T> callable) {
if (callable == null) throw new NullPointerException();
this.callable = callable;
}
public T getRawResult() { return result; }
public void setRawResult(T v) { result = v; }
public boolean exec() {
try {
result = callable.call();
return true;
} catch (Error err) {
throw err;
} catch (RuntimeException rex) {
throw rex;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public void run() { invoke(); }
private static final long serialVersionUID = 2838392045355241008L;
}

ForkJoinTask#AdaptedRunnable()

Runnable会包装成一个AdaptedRunnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Adaptor for Runnables. This implements RunnableFuture
* to be compliant with AbstractExecutorService constraints
* when used in ForkJoinPool.
*/
static final class AdaptedRunnable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
final Runnable runnable;
final T resultOnCompletion;
T result;
AdaptedRunnable(Runnable runnable, T result) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
this.resultOnCompletion = result;
}
public T getRawResult() { return result; }
public void setRawResult(T v) { result = v; }
public boolean exec() {
runnable.run();
result = resultOnCompletion;
return true;
}
public void run() { invoke(); }
private static final long serialVersionUID = 5232453952276885070L;
}

ForkJoinPool#forkOrSubmit()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Unless terminating, forks task if within an ongoing FJ
* computation in the current pool, else submits as external task.
*/
private <T> void forkOrSubmit(ForkJoinTask<T> task) {
ForkJoinWorkerThread w;
Thread t = Thread.currentThread();
if (shutdown)
throw new RejectedExecutionException();
if ((t instanceof ForkJoinWorkerThread) &&
(w = (ForkJoinWorkerThread)t).pool == this)
w.pushTask(task);
else
addSubmission(task);
}

如果当前线程是ForkJoin工作线程,说明是在ForkJoinTask内部提交的任务(比如分裂出子任务然后提交执行),这种情况下会将任务添加到工作线程的任务队列中;如果当前线程不是ForkJoin工作线程,说明是初始提交ForkJoin任务(外部将ForkJoinTask初次提交给ForkJoinPool),这种情况下会将任务添加到ForkJoinPool的任务队列中。

ForkJoinPool#addSubmission()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Enqueues the given task in the submissionQueue. Same idea as
* ForkJoinWorkerThread.pushTask except for use of submissionLock.
*
* @param t the task
*/
private void addSubmission(ForkJoinTask<?> t) {
final ReentrantLock lock = this.submissionLock;
lock.lock();
try {
ForkJoinTask<?>[] q; int s, m;
if ((q = submissionQueue) != null) { // ignore if queue removed
// 1
long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
// 2
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1;
if (s - queueBase == m)
// 3
growSubmissionQueue();
}
} finally {
lock.unlock();
}
// 4
signalWork();
}

标注代码分析

  1. 这步计算内存偏移地址。
  2. 将t设置到q的对应位置。(LazySet)
  3. 如果队列满了,扩展队列。
  4. 唤醒工作线程。

    addSubmission()

    添加一个任务到提交任务队列(过程要加锁),如果队列满了,扩展一下。然后唤醒工作线程。
    这时候是非fork-join提交任务。在forkOrSubmit()中一定会走addSubmission()的分支。
    代码可以看到通过Unsafe设置Task到数组的方式,之后所有的设置任务到数组都会采取这种方式,之所以这样做是为了提高性能:这种方式其实和数组原子量(AtomicReferenceArray)中一致,但减少了2方面的性能损耗。
  • 不用像AtomicReferenceArray内部一样再做边界检测(由外部保证)。
  • 由于队列最大容量的限制(工作线程中的任务队列也一样),不用像AtomicReferenceArray一样在计算偏移量过程中不会进行从intlong的提升。

    ForkJoinPool#growSubmissionQueue()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    /**
    * Creates or doubles submissionQueue array.
    * Basically identical to ForkJoinWorkerThread version.
    */
    private void growSubmissionQueue() {
    ForkJoinTask<?>[] oldQ = submissionQueue;
    int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
    if (size > MAXIMUM_QUEUE_CAPACITY)
    throw new RejectedExecutionException("Queue capacity exceeded");
    if (size < INITIAL_QUEUE_CAPACITY)
    size = INITIAL_QUEUE_CAPACITY;
    ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size];
    int mask = size - 1;
    int top = queueTop;
    int oldMask;
    if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
    for (int b = queueBase; b != top; ++b) {
    long u = ((b & oldMask) << ASHIFT) + ABASE;
    Object x = UNSAFE.getObjectVolatile(oldQ, u);
    if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
    UNSAFE.putObjectVolatile
    (q, ((b & mask) << ASHIFT) + ABASE, x);
    }
    }
    }

很容易看懂,最小容量为INITIAL_QUEUE_CAPACITY = 8,每次扩展为原来的2倍,最大不能超过MAXIMUM_QUEUE_CAPACITY = 1 << 24(16777216)
现在任务已经提交到ForkJoinPool#submissionQueue()。还会执行一个唤醒工作线程的动作,这样就会有工作线程来执行我们提交的任务了。

ForkJoinPool#signalWork()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* Wakes up or creates a worker.
*/
final void signalWork() {
/*
* The while condition is true if: (there is are too few total
* workers OR there is at least one waiter) AND (there are too
* few active workers OR the pool is terminating). The value
* of e distinguishes the remaining cases: zero (no waiters)
* for create, negative if terminating (in which case do
* nothing), else release a waiter. The secondary checks for
* release (non-null array etc) can fail if the pool begins
* terminating after the test, and don't impose any added cost
* because JVMs must perform null and bounds checks anyway.
*/
long c; int e, u;
// 0
while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &
(INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) {
if (e > 0) { // release a waiting worker
// 1
int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
// 2
if ((ws = workers) == null ||
(i = ~e & SMASK) >= ws.length ||
(w = ws[i]) == null)
break;
// 3
long nc = (((long)(w.nextWait & E_MASK)) |
((long)(u + UAC_UNIT) << 32));
if (w.eventCount == e &&
UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
// 4
w.eventCount = (e + EC_UNIT) & E_MASK;
if (w.parked)
// 5
UNSAFE.unpark(w);
break;
}
}//6
else if (UNSAFE.compareAndSwapLong
(this, ctlOffset, c,
(long)(((u + UTC_UNIT) & UTC_MASK) |
((u + UAC_UNIT) & UAC_MASK)) << 32)) {
// 8
addWorker();
break;
}
}
}

标注代码分析

  1. 唤醒一个等待的工作线程。
  2. 等待工作线程的下标比workerssize大||获取不到等待线程。
  3. 将Treiber stack中下一个等待线程的信息(下标)放到控制信息上||控制信息上添加一个AC计数。
  4. 累加w的等待次数。
  5. 唤醒w。
  6. 添加一个工作线程。CAS操作累加控制信息上ACTC
  7. 添加工作线程。

    signalWork

    标注0
    while循环内部由e来区分,当e==0,表示当前没有等待的工作线程,这种情况下要创建一个工作线程;当e>0,说明当前有等待线程,这种情况下唤醒一下等待的工作线程。
    e>=0前代码
  • 当e的第32位bit为1或者u的第32位bit为1 并且 e的第16位bit为1或者u的第16位bit为1,结合上一篇我们了解到的ForkJoinPool中的控制信息可知。
  • e的第32位bit为1,说明ST为1,表示当前ForkJoinPool正在关闭。
  • u的第32位bit为1,说明AC为负数,表示没有足够多的活动的工作线程。
  • e的第16位bit为1,说明ID为负数,表示至少有一个等待线程。
  • u的第16位bit为1,说明TC为负数,表示没有足够多的(总的)工作线程。
    e>=0代码
  • 排除了当前ForkJoinPool正在关闭的情况。

    ForkJoinPool#addWorker()

    新创建的Pool,里面还没有工作线程,所以signalWork()中一定会走添加工作线程的流程。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    /**
    * Tries to create and start a worker; minimally rolls back counts
    * on failure.
    */
    private void addWorker() {
    Throwable ex = null;
    ForkJoinWorkerThread t = null;
    try {
    t = factory.newThread(this);
    } catch (Throwable e) {
    ex = e;
    }
    if (t == null) { // null or exceptional factory return
    // 1
    long c; // adjust counts
    do {} while (!UNSAFE.compareAndSwapLong
    (this, ctlOffset, c = ctl,
    (((c - AC_UNIT) & AC_MASK) |
    ((c - TC_UNIT) & TC_MASK) |
    (c & ~(AC_MASK|TC_MASK)))));
    // Propagate exception if originating from an external caller
    // 2
    if (!tryTerminate(false) && ex != null &&
    !(Thread.currentThread() instanceof ForkJoinWorkerThread))
    UNSAFE.throwException(ex);
    }
    else // 3
    t.start();
    }

标注代码分析

  1. 如果发生了异常导致工作线程创建失败,需要把之前累加的到控制信息的ACTC计数减回去。
  2. 如果调用来之外部,需要将异常传递出去。
  3. 创建成功的话,启动工作线程。

    ForkJoinPool#ForkJoinWorkerThreadFactory#newThread()

    工作线程的创建过程。调用ForkJoinWorkerThread的构造方法。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    /**
    * Factory for creating new {@link ForkJoinWorkerThread}s.
    * A {@code ForkJoinWorkerThreadFactory} must be defined and used
    * for {@code ForkJoinWorkerThread} subclasses that extend base
    * functionality or initialize threads with different contexts.
    */
    public static interface ForkJoinWorkerThreadFactory {
    /**
    * Returns a new worker thread operating in the given pool.
    *
    * @param pool the pool this thread works in
    * @throws NullPointerException if the pool is null
    */
    public ForkJoinWorkerThread newThread(ForkJoinPool pool);
    }

    /**
    * Default ForkJoinWorkerThreadFactory implementation; creates a
    * new ForkJoinWorkerThread.
    */
    static class DefaultForkJoinWorkerThreadFactory
    implements ForkJoinWorkerThreadFactory {
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    return new ForkJoinWorkerThread(pool);
    }
    }

ForkJoinPool#nextWorkerName()

设置线程名称

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Counter for worker Thread names (unrelated to their poolIndex)
*/
private volatile int nextWorkerNumber;
/**
* Callback from ForkJoinWorkerThread constructor to assign a
* public name
*/
final String nextWorkerName() {
for (int n;;) {
if (UNSAFE.compareAndSwapInt(this, nextWorkerNumberOffset,
n = nextWorkerNumber, ++n))
return workerNamePrefix + n;
}
}

ForkJoinPool#registerWorker()

ForkJoinWorkerThread注册到ForkJoinPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Callback from ForkJoinWorkerThread constructor to
* determine its poolIndex and record in workers array.
*
* @param w the worker
* @return the worker's pool index
*/
final int registerWorker(ForkJoinWorkerThread w) {
/*
* In the typical case, a new worker acquires the lock, uses
* next available index and returns quickly. Since we should
* not block callers (ultimately from signalWork or
* tryPreBlock) waiting for the lock needed to do this, we
* instead help release other workers while waiting for the
* lock.
*/
for (int g;;) {
ForkJoinWorkerThread[] ws;
// 1
if (((g = scanGuard) & SG_UNIT) == 0 &&
UNSAFE.compareAndSwapInt(this, scanGuardOffset,
g, g | SG_UNIT)) {
int k = nextWorkerIndex;
try {
if ((ws = workers) != null) { // ignore on shutdown
int n = ws.length;
if (k < 0 || k >= n || ws[k] != null) {
for (k = 0; k < n && ws[k] != null; ++k)
;
if (k == n)
ws = workers = Arrays.copyOf(ws, n << 1);
}
ws[k] = w;
nextWorkerIndex = k + 1;
int m = g & SMASK;
g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
}
} finally {
scanGuard = g;
}
return k;
}
else if ((ws = workers) != null) { // help release others
for (ForkJoinWorkerThread u : ws) {
if (u != null && u.queueBase != u.queueTop) {
if (tryReleaseWaiter())
break;
}
}
}
}
}

标注代码分析

  1. 如果当前scanGuard中没有SG_UNIT标记,尝试设置SG_UNIT标记。这是一个加锁的过程。

    registerWorker()

    registerWorker()的主要逻辑就是要注册一个工作线程到ForkJoinPool,然后返回工作线程在Pool内部工作线程数组的下标。说明一下。
  • 方法内部的主流程(无限循环)中,首先尝试获取一个顺序锁。如果获取失败,会遍历下所有的工作线程,如果发现有工作线程的任务队列里还有未处理的任务,就会尝试唤醒等待的工作线程,然后再尝试去获取顺序锁。
  • 如果获取顺序锁成功,内部会将传入的工作线程设置到相应的位置(必要的时候工作线程数组需要扩容),然后返回下标。过程中会调整scanGuard的低16比特。

这里再次分析一下scanGuard这个神奇的域。

  • 它的高16位用来做顺序锁,我们看到在主流程中首先会查看scanGuard中是否含有SG_UNIT对应的bit信息,如果有说明已经有其他线程持有这个锁了;如果没有,就可以通过一个CAS操作来获取这个锁,获取动作就是给scanGuard上添加上SG_UNIT对应的bit信息,在内部完成逻辑后会清除scanGuard上的SG_UNIT信息。
  • 它的低16位就像它的命名一样,表示扫描的边界。上面的代码中可以看到,在注册工作线程时候会调整这个边界值,规律是这样,边界值的大小=比当前工作线程最大下标大的最小的2的幂减1(有点绕,不要晕),举几个栗子:maxIndex=5,guard=7、maxIndex=10,guard=15,看出规律了吧。这个值的作用是为了避免不必要的扫描,因为Pool内部的工作线程数组size可能比较大(初始化的时候就比并行度要大很多,回去看下Pool的构造方法。而且还有可能扩展),但实际的工作线程数量可能比较小(比如可能数组size是16,但里面只有4个工作线程,14个空位),如果扫描的时候扫描范围是全部数组,那一定会在空位上浪费很多时间,有了这个guard作为边界(而不是数组的length-1),会避免这些时间的浪费。

    ForkJoinWorkerThread#run()

    ForkJoinPool#addWorker()t.start(),实际上是执行ForkJoinWorkerThread#run()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    /**
    * This method is required to be public, but should never be
    * called explicitly. It performs the main run loop to execute
    * {@link ForkJoinTask}s.
    */
    public void run() {
    Throwable exception = null;
    try {
    onStart();
    pool.work(this);
    } catch (Throwable ex) {
    exception = ex;
    } finally {
    onTermination(exception);
    }
    }

启动前回调下onStart(),然后是主流程(pool#work),如果方法结束,还会回调下onTermination()

ForkJoinWorkerThread#onStart()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
* invoke {@code super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
* processing tasks.
*/
protected void onStart() {
// 1
queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
// 2
int r = pool.workerSeedGenerator.nextInt();
// 3
seed = (r == 0) ? 1 : r; // must be nonzero
}

标注代码分析

  1. 初始化工作线程的任务队列。
  2. 生成工作线程的种子。
  3. 确保种子不为0。

    ForkJoinWorkerThread#onTermination()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    /**
    * Performs cleanup associated with termination of this worker
    * thread. If you override this method, you must invoke
    * {@code super.onTermination} at the end of the overridden method.
    *
    * @param exception the exception causing this thread to abort due
    * to an unrecoverable error, or {@code null} if completed normally
    */
    protected void onTermination(Throwable exception) {
    try {
    // 1
    terminate = true;
    // 2
    cancelTasks();
    // 3
    pool.deregisterWorker(this, exception);
    } catch (Throwable ex) { // Shouldn't ever happen
    if (exception == null) // but if so, at least rethrown
    exception = ex;
    } finally {
    if (exception != null)
    UNSAFE.throwException(exception);
    }
    }

标注代码分析

  1. 设置关闭标志。
  2. 取消任务。
  3. ForkJoinPool上注销当前工作线程。

    ForkJoinPool#work

    ForkJoinWorkerThread#run()pool#work(this)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    /**
    * Top-level loop for worker threads: On each step: if the
    * previous step swept through all queues and found no tasks, or
    * there are excess threads, then possibly blocks. Otherwise,
    * scans for and, if found, executes a task. Returns when pool
    * and/or worker terminate.
    *
    * @param w the worker
    */
    final void work(ForkJoinWorkerThread w) {
    boolean swept = false; // true on empty scans
    long c;
    // 1
    while (!w.terminate && (int)(c = ctl) >= 0) {
    int a; // active count
    // 2
    if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
    swept = scan(w, a);
    else if (tryAwaitWork(w, c))// 3
    swept = false;
    }
    }

标注代码分析

  1. while条件就是当前工作线程未结束且pool未关闭。while循环中,会不断的扫描(scan)或等待(tryAwaitWork)。
  2. 如果扫描标志(起始为false)表示上一次扫描不是空扫描(false),并且当前活动线程数量小于处理器核数(这里要注意下,返回去在看下控制信息中AC的定义),那么执行scan(w, a)
  3. 当前没有任务需要执行的任务 或者 当前活动的线程数量已经大于处理器核心数。进行等待(tryAwaitWork(w, c))

    ForkJoinPool#scan()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    /**
    * Scans for and, if found, executes one task. Scans start at a
    * random index of workers array, and randomly select the first
    * (2*#workers)-1 probes, and then, if all empty, resort to 2
    * circular sweeps, which is necessary to check quiescence. and
    * taking a submission only if no stealable tasks were found. The
    * steal code inside the loop is a specialized form of
    * ForkJoinWorkerThread.deqTask, followed bookkeeping to support
    * helpJoinTask and signal propagation. The code for submission
    * queues is almost identical. On each steal, the worker completes
    * not only the task, but also all local tasks that this task may
    * have generated. On detecting staleness or contention when
    * trying to take a task, this method returns without finishing
    * sweep, which allows global state rechecks before retry.
    *
    * @param w the worker
    * @param a the number of active workers
    * @return true if swept all queues without finding a task
    */
    private boolean scan(ForkJoinWorkerThread w, int a) {
    int g = scanGuard; // mask 0 avoids useless scans if only one active
    // 1
    int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
    ForkJoinWorkerThread[] ws = workers;
    if (ws == null || ws.length <= m) // staleness check
    return false;
    for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
    ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
    // 2
    ForkJoinWorkerThread v = ws[k & m];
    if (v != null && (b = v.queueBase) != v.queueTop &&
    (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
    // 3
    long u = (i << ASHIFT) + ABASE;
    if ((t = q[i]) != null && v.queueBase == b &&
    UNSAFE.compareAndSwapObject(q, u, t, null)) {
    // 4
    int d = (v.queueBase = b + 1) - v.queueTop;
    // 5
    v.stealHint = w.poolIndex;
    if (d != 0)
    // 6
    signalWork(); // propagate if nonempty
    w.execTask(t);
    }
    // 7
    r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
    // 不是一个空扫描
    return false; // store next seed
    }
    else if (j < 0) { // xorshift
    // 8
    r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
    }
    else // 9
    ++k;
    }
    if (scanGuard != g) // staleness check
    return false;
    else { // try to take submission
    // 10
    ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
    if ((b = queueBase) != queueTop &&
    (q = submissionQueue) != null &&
    (i = (q.length - 1) & b) >= 0) {
    long u = (i << ASHIFT) + ABASE;
    if ((t = q[i]) != null && queueBase == b &&
    UNSAFE.compareAndSwapObject(q, u, t, null)) {
    queueBase = b + 1;
    w.execTask(t);
    }
    return false;
    }
    // 11
    return true; // all queues empty
    }
    }

标注代码分析

  1. 如果当前只有一个工作线程,将m设置为0,避免没用的扫描。否则获取guard值。
  2. 随机选出一个牺牲者(工作线程)。
  3. 如果这个牺牲者的任务队列中还有任务,尝试窃取这个任务。
  4. 窃取成功后,调整queueBase
  5. 将牺牲者的stealHint设置为当前工作线程在pool中的下标。
  6. 如果牺牲者的任务队列还有任务,继续唤醒(或创建)线程。
  7. 计算出下一个随机种子。
  8. 2*m次,随机扫描。
  9. 2*m次,顺序扫描。
  10. 如果扫描完毕后没找到可窃取的任务,那么从Pool的提交任务队列中取一个任务来执行。
  11. 如果所有的队列(工作线程的任务队列和pool的任务队列)都是空的,返回true。

    scan()

  12. 需要确定扫描边界值m,如果当前只有一个工作线程,那么m就为0,避免多余的扫描;如果当前有多个工作线程,那么m就是scanGuard的低16位表示的值(去前面看看scanGuard)。
  13. 确定m以后,开始一个for循环进行扫描。扫描的目的就是要通过工作线程的seed(这个域之前没提,使用来选择一个窃取牺牲者的)算出一个牺牲者(victim)的下标,牺牲者也是一个工作线程,然后当前工作线程便会从牺牲者的任务队列里面窃取一个任务来执行。当然如果算出的下标对应的位置上没有牺牲者(工作线程),或者牺牲者的任务队列里没有任务,就会进行下一次尝试。整个for循环最多会尝试4*m次,前2*m次是随机算下标,每次会通过xorshift算法来生成新的k。后2*m次是顺序递增k来算下标。
  14. 如果for循环结束了还没有扫描到任务,那么就会从PoolsubmissionQueue中窃取一个任务来执行了。

    ForkJoinWorkerThread#execTask()

    由于是初始提交,scan()中一定是从ForkJoinPool#submissionQueue()中获取提交的任务,然后执行execTask()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    /**
    * Runs the given task, plus any local tasks until queue is empty
    */
    final void execTask(ForkJoinTask<?> t) {
    currentSteal = t;
    for (;;) {
    if (t != null)
    // 1
    t.doExec();
    // 2
    if (queueTop == queueBase)
    break;
    // 3
    t = locallyFifo ? locallyDeqTask() : popTask();
    }
    // 4
    ++stealCount;
    currentSteal = null;
    }

标注代码分析

  1. 执行任务。
  2. 如果当前工作线程的任务队列里没有任务了,退出循环。
  3. 根据模式来获取任务。如果Pool中指定为异步模式,这里从当前任务队列的尾部获取任务;否则,从任务队列头部获取任务。
  4. 最后累加窃取任务计数。

    ForJoinTask#doExec()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    /**
    * Primary execution method for stolen tasks. Unless done, calls
    * exec and records status if completed, but doesn't wait for
    * completion otherwise.
    */
    final void doExec() {
    if (status >= 0) {
    boolean completed;
    try {
    // 1
    completed = exec();
    } catch (Throwable rex) {
    // 2
    setExceptionalCompletion(rex);
    return;
    }
    if (completed)
    // 3
    setCompletion(NORMAL); // must be outside try block
    }
    }

标注代码分析

  1. 调用exec()执行任务。
  2. 设置异常完成结果。
  3. 设置正常完成结果。

ForJoinTask#exec()是一个抽象方法,具体执行逻辑交给子类去实现,前面看到的适配类AdaptedRunnableAdaptedCallable里面,会在exec()里面分别调用runnable#run()callable#call();而在ForJoinTask的两个子类RecursiveActionRecursiveTask里面,exec()里面调用的是compute()

RecursiveAction#exec()

1
2
3
4
5
6
7
/**
* Implements execution conventions for RecursiveActions.
*/
protected final boolean exec() {
compute();
return true;
}

RecursiveAction#compute()

1
2
3
4
/**
* The main computation performed by this task.
*/
protected abstract void compute();

RecursiveTask#exec()

1
2
3
4
5
6
7
/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute();
return true;
}

RecursiveTask#compute()

1
2
3
4
/**
* The main computation performed by this task.
*/
protected abstract V compute();

ForJoinTask#exec()有返回值,表示任务是否执行完毕。
doExec()中会根据这个返回值来设置任务的完成状态,如果任务正常完成,会调用setCompletion(NORMAL)

ForkJoinTask#setCompletion()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
volatile int status; // accessed directly by pool and workers
private static final int NORMAL = -1;
/**
* Marks completion and wakes up threads waiting to join this task,
* also clearing signal request bits.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
*/
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
// 1
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
if (s != 0)
// 2
synchronized (this) { notifyAll(); }
return completion;
}
}
}

标注代码分析

  1. 尝试将任务状态设置为正常完成。
  2. 同时唤醒合并当前任务的等待线程。

    ForkJoinTask#setExceptionalCompletion()

    doExec()中如果执行exec()发生异常,会调用setExceptionalCompletion()来设置异常完成状态。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    /**
    * Records exception and sets exceptional completion.
    *
    * @return status on exit
    */
    private int setExceptionalCompletion(Throwable ex) {
    // 1
    int h = System.identityHashCode(this);
    final ReentrantLock lock = exceptionTableLock;
    lock.lock();
    try {
    // 2
    expungeStaleExceptions();
    // 3
    ExceptionNode[] t = exceptionTable;
    // 4
    int i = h & (t.length - 1);
    for (ExceptionNode e = t[i]; ; e = e.next) {
    if (e == null) {
    t[i] = new ExceptionNode(this, ex, t[i]);
    break;
    }
    if (e.get() == this) // already present
    break;
    }
    } finally {
    lock.unlock();
    }
    return setCompletion(EXCEPTIONAL);
    }

标注代码分析

  1. 计算当前对象的原生hashCode
  2. 删除异常表中过期的异常。
  3. 获取异常数组。
  4. 通过当前对象hashCode获取在异常表中的下标。

注意到setExceptionalCompletion()中最后还是调用setCompletion,但之前会做一下将异常放入一个异常表的工作。
看到这里可能会有疑问,我们通过2个问题来了解下这个异常表。

这里为什么会有一个异常表呢?

因为异常很少发生,所以没有将异常保存在任务对象内,而是放在一个弱引用异常表里(异常表里不会保存取消异常)。

异常表的结构是怎么样的呢?

这里的异常表结构上是一个哈希表,每个桶里是单链,弱引用Key。

Exception table

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Table of exceptions thrown by tasks, to enable reporting by
* callers. Because exceptions are rare, we don't directly keep
* them with task objects, but instead use a weak ref table. Note
* that cancellation exceptions don't appear in the table, but are
* instead recorded as status values.
*
* Note: These statics are initialized below in static block.
*/
private static final ExceptionNode[] exceptionTable;
private static final ReentrantLock exceptionTableLock;
private static final ReferenceQueue<Object> exceptionTableRefQueue;

/**
* Fixed capacity for exceptionTable.
*/
private static final int EXCEPTION_MAP_CAPACITY = 32;

/**
* Key-value nodes for exception table. The chained hash table
* uses identity comparisons, full locking, and weak references
* for keys. The table has a fixed capacity because it only
* maintains task exceptions long enough for joiners to access
* them, so should never become very large for sustained
* periods. However, since we do not know when the last joiner
* completes, we must use weak references and expunge them. We do
* so on each operation (hence full locking). Also, some thread in
* any ForkJoinPool will call helpExpungeStaleExceptions when its
* pool becomes isQuiescent.
*/
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>>{
final Throwable ex;
ExceptionNode next;
final long thrower; // use id not ref to avoid weak cycles
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
super(task, exceptionTableRefQueue);
this.ex = ex;
this.next = next;
this.thrower = Thread.currentThread().getId();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long statusOffset;
static {
exceptionTableLock = new ReentrantLock();
exceptionTableRefQueue = new ReferenceQueue<Object>();
exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
statusOffset = UNSAFE.objectFieldOffset
(ForkJoinTask.class.getDeclaredField("status"));
} catch (Exception e) {
throw new Error(e);
}
}

继续流程,执行完doExec(),方法返回到execTask(),接下来由于当前工作线程自身的任务队列中并没有任务,所以queueTop == queueBase成立,execTask()退出到scan()scan()返回false到ForkJoinPool#work(),进行下一次扫描(scan),由于工作线程本身的任务队列和Pool的任务队列都为空,所以下一次扫描一定是个空扫描,然后程序会走到work()tryAwaitWork分支,看下tryAwaitWork()

ForkJoinPool#tryAwaitWork()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/**
* Tries to enqueue worker w in wait queue and await change in
* worker's eventCount. If the pool is quiescent and there is
* more than one worker, possibly terminates worker upon exit.
* Otherwise, before blocking, rescans queues to avoid missed
* signals. Upon finding work, releases at least one worker
* (which may be the current worker). Rescans restart upon
* detected staleness or failure to release due to
* contention. Note the unusual conventions about Thread.interrupt
* here and elsewhere: Because interrupts are used solely to alert
* threads to check termination, which is checked here anyway, we
* clear status (using Thread.interrupted) before any call to
* park, so that park does not immediately return due to status
* being set via some other unrelated call to interrupt in user
* code.
*
* @param w the calling worker
* @param c the ctl value on entry
* @return true if waited or another thread was released upon enq
*/
private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
int v = w.eventCount;
// 1
w.nextWait = (int)c; // w's successor record
// 2
long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
long d = ctl; // return true if lost to a deq, to force scan
// 3
return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
}
for (int sc = w.stealCount; sc != 0;) { // accumulate stealCount
// 4
long s = stealCount;
if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
sc = w.stealCount = 0;
else if (w.eventCount != v)
// 5
return true; // update next time
}
// 6
if ((!shutdown || !tryTerminate(false)) &&
(int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
blockedCount == 0 && quiescerCount == 0)
// 7
idleAwaitWork(w, nc, c, v); // quiescent
for (boolean rescanned = false;;) {
if (w.eventCount != v)
// 8
return true;
if (!rescanned) {
int g = scanGuard, m = g & SMASK;
ForkJoinWorkerThread[] ws = workers;
if (ws != null && m < ws.length) {
rescanned = true;
// 9
for (int i = 0; i <= m; ++i) {
ForkJoinWorkerThread u = ws[i];
if (u != null) {
if (u.queueBase != u.queueTop &&
!tryReleaseWaiter())
// 10
rescanned = false; // contended
if (w.eventCount != v)
return true;
}
}
}
// 11
if (scanGuard != g || // stale
(queueBase != queueTop && !tryReleaseWaiter()))
rescanned = false;
if (!rescanned)
// 12
Thread.yield(); // reduce contention
else
// 13
Thread.interrupted(); // clear before park
}
else {
// 14
w.parked = true; // must recheck
if (w.eventCount != v) {
w.parked = false;
return true;
}
LockSupport.park(this);
rescanned = w.parked = false;
}
}
}

标注代码分析

  1. w#nextWait保存的是等待之前Pool的控制信息。
  2. 这里是将当前线程的ID信息(下标取反)记录到Pool控制信息上,同时将控制信息上的活动工作线程计数减1。
  3. 如果和另外的一个窃取线程竞争并失败,这里返回true,work()中会继续扫描。
  4. 将工作线程上的stealCount原子累加到Pool#stealCount上面。
  5. 如果eventCount发生变化,重试。
  6. Pool未关闭且有工作线程且活动的工作线程数量等于cpu核心数量,且没有工作线程在合并过程中阻塞且没有工作线程休眠。
  7. 如果满足条件,说明当前Pool休眠,需要调用下idleAwaitWork进行处理。
  8. 如果eventCount发生变化,重试。
  9. 这里再重新扫描一下,如果从其他工作线程任务队列里找到任务,尝试唤醒等待的工作线程。
  10. 发生竞争,再次扫描。
  11. scanGuard发生变化或者从Pool任务队列中找到任务,再次扫描。
  12. 出让cpu,减少竞争。
  13. park前清除中断标记。
  14. 设置park标记。

这个方法的目的就是阻塞工作线程w,但过程中有一些细节。

  • 首先,方法中要调整Pool的控制信息ctl,将w#ID信息设置到ctl上并将ctl上保存的活动工作线程数量减1。
  • 其次,在阻塞前,还会将w的窃取任务数量累计到Pool的总窃取任务数量(stealCount)上;再次,如果当前Pool正好处理休眠状态,那么要调用idleAwaitWork处理一下(可能会结束工作线程)。
  • 最后,再真正阻塞前还要扫描一下工作线程任务队列和Pool任务队列,如果发现有任务,会尝试唤醒一个等待工作线程(可能是自己),这个过程要结束时会清除一下当前线程的中断标记,然后进行阻塞(以PoolBlocker,相当于进入Pool的等待队列)。

    ForkJoinPool#idleAwaitWork()

    如果按照流程,到这儿就结束了,工作线程已经执行提交的任务,然后阻塞等待了。
    但如果当前Pool正好处于休眠状态了,看看会怎么样,继续看下上面方法中调用的idleAwaitWork()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    /**
    * If inactivating worker w has caused pool to become
    * quiescent, check for pool termination, and wait for event
    * for up to SHRINK_RATE nanosecs (rescans are unnecessary in
    * this case because quiescence reflects consensus about lack
    * of work). On timeout, if ctl has not changed, terminate the
    * worker. Upon its termination (see deregisterWorker), it may
    * wake up another worker to possibly repeat this process.
    *
    * @param w the calling worker
    * @param currentCtl the ctl value after enqueuing w
    * @param prevCtl the ctl value if w terminated
    * @param v the eventCount w awaits change
    */
    private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
    long prevCtl, int v) {
    if (w.eventCount == v) {
    if (shutdown)
    // 1
    tryTerminate(false);
    // 2
    ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
    while (ctl == currentCtl) {
    long startTime = System.nanoTime();
    w.parked = true;
    if (w.eventCount == v) // must recheck
    // 3
    LockSupport.parkNanos(this, SHRINK_RATE);
    w.parked = false;
    if (w.eventCount != v)
    break;
    else if (System.nanoTime() - startTime <
    SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop
    // 4
    Thread.interrupted(); // spurious wakeup
    else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
    currentCtl, prevCtl)) {// 5

    // 6
    w.terminate = true; // restore previous
    // 7
    w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK;
    break;
    }
    }
    }
    }

标注代码分析

  1. 如果关闭方法已经被调用,那么调用tryTerminate()
  2. 清理一下异常表中weak key
  3. 阻塞给定时间(4秒)。
  4. 如果发生伪唤醒,清除中断标志。
  5. 恢复之前的ctl,如果ctl一直没发生变化,会进入if。
  6. 结束工作线程,设置结束标志。
  7. 设置w#eventCount

idleAwaitWork()中开始会检测一下Pool是否正在关闭,是的话要调用tryTerminate;否则会先将工作线程w阻塞一段时间(4s),如果超过了这个时间,Pool的控制信息还没发生变化(说明Pool还是休眠状态),那么就需要将w结束掉,会设置w的结束标记为true,同时设置w#eventCount,然后退出到tryAwaitWork()tryAwaitWork()中检测到w#eventCount发生变化,会退出到work()work()中检测到w的结束标记为true,主循环回退出,工作线程w就要结束了,结束时会调用w#onTermination()

ForkJoinWorkerThread#onTermination()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Performs cleanup associated with termination of this worker
* thread. If you override this method, you must invoke
* {@code super.onTermination} at the end of the overridden method.
*
* @param exception the exception causing this thread to abort due
* to an unrecoverable error, or {@code null} if completed normally
*/
protected void onTermination(Throwable exception) {
try {
terminate = true;
cancelTasks();
pool.deregisterWorker(this, exception);
} catch (Throwable ex) { // Shouldn't ever happen
if (exception == null) // but if so, at least rethrown
exception = ex;
} finally {
if (exception != null)
UNSAFE.throwException(exception);
}
}

ForkJoinWorkerThread#cancelTasks()

取消任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Removes and cancels all tasks in queue. Can be called from any
* thread.
*/
final void cancelTasks() {
ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
if (cj != null && cj.status >= 0)
// 1
cj.cancelIgnoringExceptions();
ForkJoinTask<?> cs = currentSteal;
if (cs != null && cs.status >= 0)
// 2
cs.cancelIgnoringExceptions();
while (queueBase != queueTop) {
// 3
ForkJoinTask<?> t = deqTask();
if (t != null)
t.cancelIgnoringExceptions();
}
}

标注代码分析

  1. 取消正在合并的任务。
  2. 取消窃取的任务。
  3. 取消工作线程任务队列中的所有任务。

    ForkJoinTask#cancel()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    .....
    public boolean cancel(boolean mayInterruptIfRunning) {
    return setCompletion(CANCELLED) == CANCELLED;
    }

    /**
    * Cancels, ignoring any exceptions thrown by cancel. Used during
    * worker and pool shutdown. Cancel is spec'ed not to throw any
    * exceptions, but if it does anyway, we have no recourse during
    * shutdown, so guard against this case.
    */
    final void cancelIgnoringExceptions() {
    try {
    cancel(false);
    } catch (Throwable ignore) {
    }
    }
    /**
    * Marks completion and wakes up threads waiting to join this task,
    * also clearing signal request bits.
    *
    * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
    * @return completion status on exit
    */
    private int setCompletion(int completion) {
    for (int s;;) {
    if ((s = status) < 0)
    return s;
    if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
    if (s != 0)
    synchronized (this) { notifyAll(); }
    return completion;
    }
    }
    }

ForkJoinPool#deregisterWorker()

从Pool中注销自己。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* Final callback from terminating worker. Removes record of
* worker from array, and adjusts counts. If pool is shutting
* down, tries to complete termination.
*
* @param w the worker
*/
final void deregisterWorker(ForkJoinWorkerThread w, Throwable ex) {
int idx = w.poolIndex;
int sc = w.stealCount;
int steps = 0;
// Remove from array, adjust worker counts and collect steal count.
// We can intermix failed removes or adjusts with steal updates
do {
long s, c;
int g;
if (steps == 0 && ((g = scanGuard) & SG_UNIT) == 0 &&
UNSAFE.compareAndSwapInt(this, scanGuardOffset,
g, g |= SG_UNIT)) {
ForkJoinWorkerThread[] ws = workers;
if (ws != null && idx >= 0 &&
idx < ws.length && ws[idx] == w)
ws[idx] = null; // verify
nextWorkerIndex = idx;
scanGuard = g + SG_UNIT;
steps = 1;
}
if (steps == 1 &&
UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
(((c - AC_UNIT) & AC_MASK) |
((c - TC_UNIT) & TC_MASK) |
(c & ~(AC_MASK|TC_MASK)))))
steps = 2;
if (sc != 0 &&
UNSAFE.compareAndSwapLong(this, stealCountOffset,
s = stealCount, s + sc))
sc = 0;
} while (steps != 2 || sc != 0);
if (!tryTerminate(false)) {
if (ex != null) // possibly replace if died abnormally
signalWork();
else
tryReleaseWaiter();
}
}

deregisterWorker()中首先在一个while无限循环中完成工作线程注销的工作,包括3个阶段,全部完成后再执行下一步。之所以这样做是由于每个阶段都可能发生竞争,需要重试。

  • 第1阶段,会在获取顺序锁(scanGuard高16位)的情况下将Pool中工作线程对应的数组位置置空,并调整nextWorkerIndex
  • 第2阶段,将控制信息ctl中的活动工作线程数量和总工作线程数量减1。
  • 第3阶段,将要注销工作线程的窃取任务数量累加到Pool的总窃取任务数量上。

deregisterWorker()在完成注销线程工作后,还有可能会唤醒其他等待线程,首先会调用一下tryTerminate(false)

ForkJoinPool#tryTerminate()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Possibly initiates and/or completes termination.
*
* @param now if true, unconditionally terminate, else only
* if shutdown and empty queue and no active workers
* @return true if now terminating or terminated
*/
private boolean tryTerminate(boolean now) {
long c;
while (((c = ctl) & STOP_BIT) == 0) {
if (!now) {
if ((int)(c >> AC_SHIFT) != -parallelism)
return false;
if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
queueBase != queueTop) {
if (ctl == c) // staleness check
return false;
continue;
}
}
if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
startTerminating();
}
if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
final ReentrantLock lock = this.submissionLock;
lock.lock();
try {
termination.signalAll();
} finally {
lock.unlock();
}
}
return true;
}

参数为false

参数为false(表示不会马上关闭Pool),那么实现中会查看是否有活动的工作线程,有的话返回false。然后检查Pool是否还在运行中(包括Pool有没有被关闭、有没有等待合并的工作线程、有没有空闲的工作线程、提交队列中是否有任务等),如果还在运行中,返回false。 否则会尝试设置关闭标志到控制信息,然后调用startTerminating()

参数为true

参数为true的话也会直接进行这些操作。然后再检测下如果当前总的工作线程为0,就会唤醒在termination条件上等待的线程了。

ForkJoinPool#startTerminating()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Runs up to three passes through workers: (0) Setting
* termination status for each worker, followed by wakeups up to
* queued workers; (1) helping cancel tasks; (2) interrupting
* lagging threads (likely in external tasks, but possibly also
* blocked in joins). Each pass repeats previous steps because of
* potential lagging thread creation.
*/
private void startTerminating() {
// 1
cancelSubmissions();
for (int pass = 0; pass < 3; ++pass) {
ForkJoinWorkerThread[] ws = workers;
if (ws != null) {
for (ForkJoinWorkerThread w : ws) {
if (w != null) {
// 2
w.terminate = true;
if (pass > 0) {
// 3
w.cancelTasks();
if (pass > 1 && !w.isInterrupted()) {
try {
// 4
w.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
}
// 5
terminateWaiters();
}
}
}

标注代码分析

  1. 取消Pool提交任务队列中的任务。
  2. 结束工作线程。
  3. 取消工作线程中任务队列的任务。
  4. 中断工作线程。
  5. 结束等待的工作线程。

    startTerminating流程

  6. 取消Pool#submissionQueue中的任务。
  7. 将所有的工作线程的结束状态设置为true。
  8. 取消所有工作线程的任务队列中未完成的任务。
  9. 中断所有工作线程。
  10. 结束还在等待的工作线程。

    ForkJoinPool#cancelSubmissions()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /**
    * Polls and cancels all submissions. Called only during termination.
    */
    private void cancelSubmissions() {
    while (queueBase != queueTop) {
    ForkJoinTask<?> task = pollSubmission();
    if (task != null) {
    try {
    task.cancel(false);
    } catch (Throwable ignore) {
    }
    }
    }
    }

ForkJoinPool#pollSubmission()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Removes and returns the next unexecuted submission if one is
* available. This method may be useful in extensions to this
* class that re-assign work in systems with multiple pools.
*
* @return the next submission, or {@code null} if none
*/
protected ForkJoinTask<?> pollSubmission() {
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
while ((b = queueBase) != queueTop &&
(q = submissionQueue) != null &&
(i = (q.length - 1) & b) >= 0) {
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null &&
queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueBase = b + 1;
return t;
}
}
return null;
}

ForkJoinPool#terminateWaiters()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Tries to set the termination status of waiting workers, and
* then wakes them up (after which they will terminate).
*/
private void terminateWaiters() {
ForkJoinWorkerThread[] ws = workers;
if (ws != null) {
ForkJoinWorkerThread w; long c; int i, e;
int n = ws.length;
while ((i = ~(e = (int)(c = ctl)) & SMASK) < n &&
(w = ws[i]) != null && w.eventCount == (e & E_MASK)) {
if (UNSAFE.compareAndSwapLong(this, ctlOffset, c,
(long)(w.nextWait & E_MASK) |
((c + AC_UNIT) & AC_MASK) |
(c & (TC_MASK|STOP_BIT)))) {
w.terminate = true;
w.eventCount = e + EC_UNIT;
if (w.parked)
UNSAFE.unpark(w);
}
}
}
}

隐式的等待工作线程组成的链(也叫Treiber stack)。这里可能猛地一看,方法内部只唤醒了一个等待线程,这个等待线程的ID信息存储在Pool的控制信息中。但仔细看会发现,在设置(调整)ctl的时候,有这句w.nextWait & E_MASK,也就是说,唤醒w之后,ctl里又会有另一个等待工作线程的ID信息(这个信息之前是存在w#nextWait上面的)。
deregisterWorker()里,假设当前Pool还在运行,那么tryTerminate(false)返回false,就会执行if里的语句。

1
2
3
4
5
6
7
if (!tryTerminate(false)) {
// 1
if (ex != null) // possibly replace if died abnormally
signalWork();
else// 2
tryReleaseWaiter();
}

标注代码分析

  1. 如果有异常发生,会调用signalWork()来唤醒或者创建一个工作线程。
  2. 调用tryReleaseWaiter()来尝试唤醒一个等待工作线程。

    ForkJoinPool#tryReleaseWaiter()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    /**
    * Variant of signalWork to help release waiters on rescans.
    * Tries once to release a waiter if active count < 0.
    *
    * @return false if failed due to contention, else true
    */
    private boolean tryReleaseWaiter() {
    long c; int e, i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
    // 1
    if ((e = (int)(c = ctl)) > 0 &&
    (int)(c >> AC_SHIFT) < 0 &&
    (ws = workers) != null &&
    (i = ~e & SMASK) < ws.length &&
    (w = ws[i]) != null) {
    // 2
    long nc = ((long)(w.nextWait & E_MASK) |
    ((c + AC_UNIT) & (AC_MASK|TC_MASK)));
    if (w.eventCount != e ||
    !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
    // 3
    return false;
    // 4
    w.eventCount = (e + EC_UNIT) & E_MASK;
    if (w.parked)
    // 5
    UNSAFE.unpark(w);
    }
    return true;
    }

标注代码分析

  1. (e = (int)(c = ctl)) > 0如果有等待线程,(int)(c >> AC_SHIFT) < 0当前活动线程数小于CPU核数,(ws = workers) != null检测工作线程数组合法性,(i = ~e & SMASK) < ws.length检测控制信息中等待的工作线程的ID信息的合法性,(w = ws[i]) != null检测工作线程的合法性。
  2. 尝试调整控制信息,增加活动工作线程计数,将Treiber stack下一个等待线程的ID信息设置到ctl
  3. 如果发生冲突。
  4. 累加w#eventCount
  5. 唤醒w。

    ForkJoinWorkerThread#execTask()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    /**
    * Runs the given task, plus any local tasks until queue is empty
    */
    final void execTask(ForkJoinTask<?> t) {
    currentSteal = t;
    for (;;) {
    if (t != null)
    t.doExec();
    if (queueTop == queueBase)
    break;
    t = locallyFifo ? locallyDeqTask() : popTask();
    }
    ++stealCount;
    currentSteal = null;
    }

假如执行完t,发现当前工作的任务队列中还有任务,那么接下来就会根据当前Pool的工作模式(是否是同步模式),来通过locallyDeqTask()或者popTask()获取一个任务出来继续执行。
工作线程中的任务队列(Pool中的任务队列也一样),形式上是一个数组,但概念上是一个双端队列。但和其他双端队列(JDK里另外的双端队列实现)不一样的是,这里的队列只定义了三种操作:从队列首部入队(push)、从队列首部出队(pop)、从队列尾部出队(deg)。这里既然说是队列,但又说什么push、pop可能会让人感觉晕晕的,其实可以这样理解,双端队列就可以看成是栈和队列的混血。结合使用Pool内部工作原理来说,如果不是异步模式(默认),那么就会把任务队列当成一个FIFO队列来使用;否则就相当于把任务队列当成一个栈来使用。

ForkJoinWorkerThread#popTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Returns a popped task, or null if empty.
* Called only by this thread.
*/
private ForkJoinTask<?> popTask() {
int m;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
for (int s; (s = queueTop) != queueBase;) {
int i = m & --s;
long u = (i << ASHIFT) + ABASE; // raw offset
ForkJoinTask<?> t = q[i];
if (t == null) // lost to stealer
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueTop = s; // or putOrderedInt
return t;
}
}
}
return null;
}

如果Pool不是异步模式(locallyFifo为false),那么会执行popTask()。从当前任务队列pop一个任务出来。

ForkJoinWorkerThread#locallyDeqTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Tries to take a task from the base of own queue. Called only
* by this thread.
*
* @return a task, or null if none
*/
final ForkJoinTask<?> locallyDeqTask() {
ForkJoinTask<?> t; int m, b, i;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
while (queueTop != (b = queueBase)) {
if ((t = q[i = m & b]) != null &&
queueBase == b &&
UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
t, null)) {
queueBase = b + 1;
return t;
}
}
}
return null;
}

如果Pool是异步模式(locallyFifotrue),那么会执行locallyDeqTask()。就是从当前任务队列deg一个任务出来。

总结

  1. 被创建,注册到Pool中,然后启动。
  2. 扫描所有工作线程的队列和Pool中的任务队列,窃取一个任务来执行。
  3. 执行完毕后,在从自身工作队列中获取任务来执行。
  4. 没任务执行了,会阻塞等待,如果正好赶上了Pool休眠,那么会被结束掉,从Pool中注销。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×