Fork-Join其他方法

ForkJoinTask

ForkJoinTask本身也是Future的实现,所以也会有取消过程。

ForkJoinTask#cancel()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed or could not be
* cancelled for some other reason. If successful, and this task
* has not started when {@code cancel} is called, execution of
* this task is suppressed. After this method returns
* successfully, unless there is an intervening call to {@link
* #reinitialize}, subsequent calls to {@link #isCancelled},
* {@link #isDone}, and {@code cancel} will return {@code true}
* and calls to {@link #join} and related methods will result in
* {@code CancellationException}.
*
* <p>This method may be overridden in subclasses, but if so, must
* still ensure that these properties hold. In particular, the
* {@code cancel} method itself must not throw exceptions.
*
* <p>This method is designed to be invoked by <em>other</em>
* tasks. To terminate the current task, you can just return or
* throw an unchecked exception from its computation method, or
* invoke {@link #completeExceptionally}.
*
* @param mayInterruptIfRunning this value has no effect in the
* default implementation because interrupts are not used to
* control cancellation.
*
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
return setCompletion(CANCELLED) == CANCELLED;
}

ForkJoinTask#setCompletion()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Marks completion and wakes up threads waiting to join this task,
* also clearing signal request bits.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
*/
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
if (s != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}

设置任务运行状态为取消,然后唤醒在任务上等待的线程。

非Fork/Join源码分析

ForkJoinPool属性

部分参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Main pool control -- a long packed with:
* AC: Number of active running workers minus target parallelism (16 bits)
* TC: Number of total workers minus target parallelism (16bits)
* ST: true if pool is terminating (1 bit)
* EC: the wait count of top waiting thread (15 bits)
* ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits)
*
* When convenient, we can extract the upper 32 bits of counts and
* the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
* (int)ctl. The ec field is never accessed alone, but always
* together with id and st. The offsets of counts by the target
* parallelism and the positionings of fields makes it possible to
* perform the most common checks via sign tests of fields: When
* ac is negative, there are not enough active workers, when tc is
* negative, there are not enough total workers, when id is
* negative, there is at least one waiting worker, and when e is
* negative, the pool is terminating. To deal with these possibly
* negative fields, we use casts in and out of "short" and/or
* signed shifts to maintain signedness.
*/
volatile long ctl;

// bit positions/shifts for fields
private static final int AC_SHIFT = 48;
private static final int TC_SHIFT = 32;
private static final int ST_SHIFT = 31;
private static final int EC_SHIFT = 16;

// bounds
private static final int MAX_ID = 0x7fff; // max poolIndex
private static final int SMASK = 0xffff; // mask short bits
private static final int SHORT_SIGN = 1 << 15;
private static final int INT_SIGN = 1 << 31;

// masks
private static final long STOP_BIT = 0x0001L << ST_SHIFT;
private static final long AC_MASK = ((long)SMASK) << AC_SHIFT;
private static final long TC_MASK = ((long)SMASK) << TC_SHIFT;

// units for incrementing and decrementing
private static final long TC_UNIT = 1L << TC_SHIFT;
private static final long AC_UNIT = 1L << AC_SHIFT;

// masks and units for dealing with u = (int)(ctl >>> 32)
private static final int UAC_SHIFT = AC_SHIFT - 32;
private static final int UTC_SHIFT = TC_SHIFT - 32;
private static final int UAC_MASK = SMASK << UAC_SHIFT;
private static final int UTC_MASK = SMASK << UTC_SHIFT;
private static final int UAC_UNIT = 1 << UAC_SHIFT;
private static final int UTC_UNIT = 1 << UTC_SHIFT;

// masks and units for dealing with e = (int)ctl
private static final int E_MASK = 0x7fffffff; // no STOP_BIT
private static final int EC_UNIT = 1 << EC_SHIFT;

ForkJoinPool的总控制信息,包含在一个long(64bit)里面。

  • AC: 表示当前活动的工作线程的数量减去并行度得到的数值。(16 bits)
  • TC: 表示全部工作线程的数量减去并行度得到的数值。(16bits)
  • ST: 表示当前ForkJoinPool是否正在关闭。(1 bit)
  • EC: 表示Treiber stack顶端的等待工作线程的等待次数。(15 bits)
  • ID: Treiber stack顶端的等待工作线程的下标取反。(16 bits)
1111111111111111 1111111111111111 1 111111111111111 1111111111111111
AC TC ST EC ID
  • 如果AC为负数,说明没有足够的活动工作线程。
  • 如果TC为负数,说明工作线程数量没达到最大工作线程数量。
  • 如果ID为负数,说明至少有一个等待的工作线程。
  • 如果(int)ctl为负数,说明ForkJoinPool正在关闭。

Fork-Join源码分析

这篇源码分析基于JDK7。

Fork

通过分析一个Fork-Join任务的执行过程来分析Fork-Join的相关代码,主要侧重于分裂(Fork)/合并(Join)过程。

SumTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SumTask extends RecursiveTask<Long>{  
private static final int THRESHOLD = 10;

private long start;
private long end;

public SumTask(long n) {
this(1,n);
}

private SumTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if((end - start) <= THRESHOLD){
for(long l = start; l <= end; l++){
sum += l;
}
}else{
long mid = (start + end) >>> 1;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
left.fork();
right.fork();
sum = left.join() + right.join();
}
return sum;
}
private static final long serialVersionUID = 1L;
}

ForkJoinTask#fork()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Arranges to asynchronously execute this task. While it is not
* necessarily enforced, it is a usage error to fork a task more
* than once unless it has completed and been reinitialized.
* Subsequent modifications to the state of this task or any data
* it operates on are not necessarily consistently observable by
* any thread other than the one executing it unless preceded by a
* call to {@link #join} or related methods, or a call to {@link
* #isDone} returning {@code true}.
*
* <p>This method may be invoked only from within {@code
* ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask<V> fork() {
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);
return this;
}

ForkJoinWorkerThread#pushTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Pushes a task. Call only from this thread.
*
* @param t the task. Caller must ensure non-null.
*/
final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
// 1
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
// 2
UNSAFE.putOrderedObject(q, u, t);
// 3
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
pool.signalWork();
else if (s == m)
// 4
growQueue();
}
}

标注代码分析

  1. 这里首先根据当前的queueTop对队列(数组)长度取模来算出放置任务的下标,然后再通过下标算出偏移地址,提供给Unsafe使用。
  2. 设置任务。
  3. 修改queueTop
  4. 如果队列满了,扩展一下队列容量。

    ForkJoinWorkerThread#growQueue()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    /**
    * Creates or doubles queue array. Transfers elements by
    * emulating steals (deqs) from old array and placing, oldest
    * first, into new array.
    */
    private void growQueue() {
    ForkJoinTask<?>[] oldQ = queue;
    int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
    // 1
    if (size > MAXIMUM_QUEUE_CAPACITY)
    throw new RejectedExecutionException("Queue capacity exceeded");
    // 2
    if (size < INITIAL_QUEUE_CAPACITY)
    size = INITIAL_QUEUE_CAPACITY;
    ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
    int mask = size - 1;
    int top = queueTop;
    int oldMask;
    if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
    for (int b = queueBase; b != top; ++b) {
    long u = ((b & oldMask) << ASHIFT) + ABASE;
    Object x = UNSAFE.getObjectVolatile(oldQ, u);
    if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
    UNSAFE.putObjectVolatile
    (q, ((b & mask) << ASHIFT) + ABASE, x);
    }
    }
    }

标注代码分析

  1. 容量为原来的2倍,不超过MAXIMUM_QUEUE_CAPACITY(1 << 24)
  2. 最小为INITIAL_QUEUE_CAPACITY(1 << 13)

Fork-Join模式

什么是Fork-Join

“分治”问题可以很容易地通过Callable线程的Executor接口来解决。通过为每个任务实例化一 个Callable实例,并在ExecutorService类中汇总计算结果来得出最终结果可以实现这一目的。那么自然而然想到的问题就是,如果这接口已经做得不错了,我们为什么还需要Java 7的其他框架?
使用ExecutorServiceCallable的主要问题是,Callable实例在本质上是阻塞的。一旦一个Callable实例开始执行,其他所有Callable都会被阻塞。由于队列后面的Callable实例在前一实例未执行完成的时候不会被执行,因此许多资源无法得到利用。Fork-Join框架被引入来解决这一并行问题,而Executor解决的是并发问题(译者注:并发和并行的区别就是一个处理器同时处理多个任务和多个处理器或者是多核的处理器同时处理多个不同的任务)。
Fork-Join模式,分而治之,然后合并结果,这么一种编程模式。(注:Fork-Join是一个单机框架,类似的分布式的框架有Hadoop这类的,它们的计算模型是MapReduce,体现了和Fork-Join一样的思想-分而治之。)
Fork-Join框架是一个”多核友好的、轻量级并行框架”,它支持并行编程风格,将问题递归拆分成多个更小片断,以并行和调配的方式解决。Fork-join融合了分而治之技术;获取问题后,递归地将它分成多个子问题,直到每个子问题都足够小,以至于可以高效地串行地解决它们。递归的过程将会把问题分成两个或者多个子问题,然后把这些问题放入队列中等待处理(fork步骤),接下来等待所有子问题的结果(join步骤),把多个结果合并到一起。
Fork-Join模式有自己的适用范围。如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用Fork-Join模式来解决。
一个Fork-Join模式的示意图,位于图上部的Task依赖于位于其下的Task的执行,只有当所有的子任务都完成之后,调用者才能获得Task 0的返回结果。如下图。


Fork-Join模式能够解决很多种类的并行问题。通过使用Doug Lea提供的Fork-Join框架,软件开发人员只需要关注任务的划分和中间结果的组合就能充分利用并行平台的优良性能。其他和并行相关的诸多难于处理的问题,例如负载平衡、同步等,都可以由框架采用统一的方式解决。这样,我们就能够轻松地获得并行的好处而避免了并行编程的困难且容易出错的缺点。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×