Fork-Join其他方法

ForkJoinTask

ForkJoinTask本身也是Future的实现,所以也会有取消过程。

ForkJoinTask#cancel()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed or could not be
* cancelled for some other reason. If successful, and this task
* has not started when {@code cancel} is called, execution of
* this task is suppressed. After this method returns
* successfully, unless there is an intervening call to {@link
* #reinitialize}, subsequent calls to {@link #isCancelled},
* {@link #isDone}, and {@code cancel} will return {@code true}
* and calls to {@link #join} and related methods will result in
* {@code CancellationException}.
*
* <p>This method may be overridden in subclasses, but if so, must
* still ensure that these properties hold. In particular, the
* {@code cancel} method itself must not throw exceptions.
*
* <p>This method is designed to be invoked by <em>other</em>
* tasks. To terminate the current task, you can just return or
* throw an unchecked exception from its computation method, or
* invoke {@link #completeExceptionally}.
*
* @param mayInterruptIfRunning this value has no effect in the
* default implementation because interrupts are not used to
* control cancellation.
*
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
return setCompletion(CANCELLED) == CANCELLED;
}

ForkJoinTask#setCompletion()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Marks completion and wakes up threads waiting to join this task,
* also clearing signal request bits.
*
* @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
* @return completion status on exit
*/
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
if (s != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}

设置任务运行状态为取消,然后唤醒在任务上等待的线程。

非Fork/Join源码分析

ForkJoinPool属性

部分参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Main pool control -- a long packed with:
* AC: Number of active running workers minus target parallelism (16 bits)
* TC: Number of total workers minus target parallelism (16bits)
* ST: true if pool is terminating (1 bit)
* EC: the wait count of top waiting thread (15 bits)
* ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits)
*
* When convenient, we can extract the upper 32 bits of counts and
* the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
* (int)ctl. The ec field is never accessed alone, but always
* together with id and st. The offsets of counts by the target
* parallelism and the positionings of fields makes it possible to
* perform the most common checks via sign tests of fields: When
* ac is negative, there are not enough active workers, when tc is
* negative, there are not enough total workers, when id is
* negative, there is at least one waiting worker, and when e is
* negative, the pool is terminating. To deal with these possibly
* negative fields, we use casts in and out of "short" and/or
* signed shifts to maintain signedness.
*/
volatile long ctl;

// bit positions/shifts for fields
private static final int AC_SHIFT = 48;
private static final int TC_SHIFT = 32;
private static final int ST_SHIFT = 31;
private static final int EC_SHIFT = 16;

// bounds
private static final int MAX_ID = 0x7fff; // max poolIndex
private static final int SMASK = 0xffff; // mask short bits
private static final int SHORT_SIGN = 1 << 15;
private static final int INT_SIGN = 1 << 31;

// masks
private static final long STOP_BIT = 0x0001L << ST_SHIFT;
private static final long AC_MASK = ((long)SMASK) << AC_SHIFT;
private static final long TC_MASK = ((long)SMASK) << TC_SHIFT;

// units for incrementing and decrementing
private static final long TC_UNIT = 1L << TC_SHIFT;
private static final long AC_UNIT = 1L << AC_SHIFT;

// masks and units for dealing with u = (int)(ctl >>> 32)
private static final int UAC_SHIFT = AC_SHIFT - 32;
private static final int UTC_SHIFT = TC_SHIFT - 32;
private static final int UAC_MASK = SMASK << UAC_SHIFT;
private static final int UTC_MASK = SMASK << UTC_SHIFT;
private static final int UAC_UNIT = 1 << UAC_SHIFT;
private static final int UTC_UNIT = 1 << UTC_SHIFT;

// masks and units for dealing with e = (int)ctl
private static final int E_MASK = 0x7fffffff; // no STOP_BIT
private static final int EC_UNIT = 1 << EC_SHIFT;

ForkJoinPool的总控制信息,包含在一个long(64bit)里面。

  • AC: 表示当前活动的工作线程的数量减去并行度得到的数值。(16 bits)
  • TC: 表示全部工作线程的数量减去并行度得到的数值。(16bits)
  • ST: 表示当前ForkJoinPool是否正在关闭。(1 bit)
  • EC: 表示Treiber stack顶端的等待工作线程的等待次数。(15 bits)
  • ID: Treiber stack顶端的等待工作线程的下标取反。(16 bits)
1111111111111111 1111111111111111 1 111111111111111 1111111111111111
AC TC ST EC ID
  • 如果AC为负数,说明没有足够的活动工作线程。
  • 如果TC为负数,说明工作线程数量没达到最大工作线程数量。
  • 如果ID为负数,说明至少有一个等待的工作线程。
  • 如果(int)ctl为负数,说明ForkJoinPool正在关闭。

Fork-Join源码分析

这篇源码分析基于JDK7。

Fork

通过分析一个Fork-Join任务的执行过程来分析Fork-Join的相关代码,主要侧重于分裂(Fork)/合并(Join)过程。

SumTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SumTask extends RecursiveTask<Long>{  
private static final int THRESHOLD = 10;

private long start;
private long end;

public SumTask(long n) {
this(1,n);
}

private SumTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if((end - start) <= THRESHOLD){
for(long l = start; l <= end; l++){
sum += l;
}
}else{
long mid = (start + end) >>> 1;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
left.fork();
right.fork();
sum = left.join() + right.join();
}
return sum;
}
private static final long serialVersionUID = 1L;
}

ForkJoinTask#fork()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Arranges to asynchronously execute this task. While it is not
* necessarily enforced, it is a usage error to fork a task more
* than once unless it has completed and been reinitialized.
* Subsequent modifications to the state of this task or any data
* it operates on are not necessarily consistently observable by
* any thread other than the one executing it unless preceded by a
* call to {@link #join} or related methods, or a call to {@link
* #isDone} returning {@code true}.
*
* <p>This method may be invoked only from within {@code
* ForkJoinPool} computations (as may be determined using method
* {@link #inForkJoinPool}). Attempts to invoke in other contexts
* result in exceptions or errors, possibly including {@code
* ClassCastException}.
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask<V> fork() {
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);
return this;
}

ForkJoinWorkerThread#pushTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Pushes a task. Call only from this thread.
*
* @param t the task. Caller must ensure non-null.
*/
final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
// 1
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
// 2
UNSAFE.putOrderedObject(q, u, t);
// 3
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
pool.signalWork();
else if (s == m)
// 4
growQueue();
}
}

标注代码分析

  1. 这里首先根据当前的queueTop对队列(数组)长度取模来算出放置任务的下标,然后再通过下标算出偏移地址,提供给Unsafe使用。
  2. 设置任务。
  3. 修改queueTop
  4. 如果队列满了,扩展一下队列容量。

    ForkJoinWorkerThread#growQueue()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    /**
    * Creates or doubles queue array. Transfers elements by
    * emulating steals (deqs) from old array and placing, oldest
    * first, into new array.
    */
    private void growQueue() {
    ForkJoinTask<?>[] oldQ = queue;
    int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
    // 1
    if (size > MAXIMUM_QUEUE_CAPACITY)
    throw new RejectedExecutionException("Queue capacity exceeded");
    // 2
    if (size < INITIAL_QUEUE_CAPACITY)
    size = INITIAL_QUEUE_CAPACITY;
    ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
    int mask = size - 1;
    int top = queueTop;
    int oldMask;
    if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
    for (int b = queueBase; b != top; ++b) {
    long u = ((b & oldMask) << ASHIFT) + ABASE;
    Object x = UNSAFE.getObjectVolatile(oldQ, u);
    if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
    UNSAFE.putObjectVolatile
    (q, ((b & mask) << ASHIFT) + ABASE, x);
    }
    }
    }

标注代码分析

  1. 容量为原来的2倍,不超过MAXIMUM_QUEUE_CAPACITY(1 << 24)
  2. 最小为INITIAL_QUEUE_CAPACITY(1 << 13)

Fork-Join模式

什么是Fork-Join

“分治”问题可以很容易地通过Callable线程的Executor接口来解决。通过为每个任务实例化一 个Callable实例,并在ExecutorService类中汇总计算结果来得出最终结果可以实现这一目的。那么自然而然想到的问题就是,如果这接口已经做得不错了,我们为什么还需要Java 7的其他框架?
使用ExecutorServiceCallable的主要问题是,Callable实例在本质上是阻塞的。一旦一个Callable实例开始执行,其他所有Callable都会被阻塞。由于队列后面的Callable实例在前一实例未执行完成的时候不会被执行,因此许多资源无法得到利用。Fork-Join框架被引入来解决这一并行问题,而Executor解决的是并发问题(译者注:并发和并行的区别就是一个处理器同时处理多个任务和多个处理器或者是多核的处理器同时处理多个不同的任务)。
Fork-Join模式,分而治之,然后合并结果,这么一种编程模式。(注:Fork-Join是一个单机框架,类似的分布式的框架有Hadoop这类的,它们的计算模型是MapReduce,体现了和Fork-Join一样的思想-分而治之。)
Fork-Join框架是一个”多核友好的、轻量级并行框架”,它支持并行编程风格,将问题递归拆分成多个更小片断,以并行和调配的方式解决。Fork-join融合了分而治之技术;获取问题后,递归地将它分成多个子问题,直到每个子问题都足够小,以至于可以高效地串行地解决它们。递归的过程将会把问题分成两个或者多个子问题,然后把这些问题放入队列中等待处理(fork步骤),接下来等待所有子问题的结果(join步骤),把多个结果合并到一起。
Fork-Join模式有自己的适用范围。如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用Fork-Join模式来解决。
一个Fork-Join模式的示意图,位于图上部的Task依赖于位于其下的Task的执行,只有当所有的子任务都完成之后,调用者才能获得Task 0的返回结果。如下图。


Fork-Join模式能够解决很多种类的并行问题。通过使用Doug Lea提供的Fork-Join框架,软件开发人员只需要关注任务的划分和中间结果的组合就能充分利用并行平台的优良性能。其他和并行相关的诸多难于处理的问题,例如负载平衡、同步等,都可以由框架采用统一的方式解决。这样,我们就能够轻松地获得并行的好处而避免了并行编程的困难且容易出错的缺点。

深入理解Java内存模型(总结)

处理器内存模型

顺序一致性内存模型是一个理论参考模型,JMM和处理器内存模型在设计时通常会把顺序一致性内存模型作为参照。JMM和处理器内存模型在设计时会对顺序一致性模型做一些放松,因为如果完全按照顺序一致性模型来实现处理器和JMM,那么很多的处理器和编译器优化都要被禁止,这对执行性能将会有很大的影响。
根据对不同类型读/写操作组合的执行顺序的放松,可以把常见处理器的内存模型划分为下面几种类型。

  1. 放松程序中写-读操作的顺序,由此产生了total store ordering内存模型(简称为TSO)。
  2. 在前面1的基础上,继续放松程序中写-写操作的顺序,由此产生了partial store order内存模型(简称为PSO)。
  3. 在前面1和2的基础上,继续放松程序中读-写和读-读操作的顺序,由此产生了relaxed memory order内存模型(简称为RMO)和PowerPC内存模型。

注意,这里处理器对读/写操作的放松,是以两个操作之间不存在数据依赖性为前提的(因为处理器要遵守as-if-serial语义,处理器不会对存在数据依赖性的两个内存操作做重排序)。
下面的表格展示了常见处理器内存模型的细节特征。


在这个表格中,我们可以看到所有处理器内存模型都允许写-读重排序,原因在第一章以说明过:它们都使用了写缓存区,写缓存区可能导致写-读操作重排序(处理器执行的内存操作和实际内存执行的不一样)。同时,我们可以看到这些处理器内存模型都允许更早读到当前处理器的写,原因同样是因为写缓存区:由于写缓存区仅对当前处理器可见,这个特性导致当前处理器可以比其他处理器先看到临时保存在自己的写缓存区中的写。
上面表格中的各种处理器内存模型,从上到下,模型由强变弱。越是追求性能的处理器,内存模型设计的会越弱。因为这些处理器希望内存模型对它们的束缚越少越好,这样它们就可以做尽可能多的优化来提高性能。
由于常见的处理器内存模型比JMM要弱,Java编译器在生成字节码时,会在执行指令序列的适当位置插入内存屏障来限制处理器的重排序。同时,由于各种处理器内存模型的强弱并不相同,为了在不同的处理器平台向程序员展示一个一致的内存模型,JMM在不同的处理器中需要插入的内存屏障的数量和种类也不相同。
下图展示了JMM在不同处理器内存模型中需要插入的内存屏障的示意图。

深入理解Java内存模型(Final)

与前面介绍的锁和volatile相比较,对final域的读和写更像是普通的变量访问。对于final域,编译器和处理器要遵守两个重排序规则。

  • 在构造函数内对一个final域的写入,与随后把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序。
  • 初次读一个包含final域的对象的引用,与随后初次读这个final域,这两个操作之间不能重排序。

下面,我们通过一些示例性的代码来分别说明这两个规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FinalExample {
int i; //普通变量
final int j; //final变量
static FinalExample obj;

public void FinalExample () { //构造函数
i = 1; //写普通域
j = 2; //写final域
}

public static void writer () { //写线程A执行
obj = new FinalExample ();
}

public static void reader () { //读线程B执行
FinalExample object = obj; //读对象引用
int a = object.i; //读普通域
int b = object.j; //读final域
}
}

这里假设一个线程A执行writer(),随后另一个线程B执行reader()。下面我们通过这两个线程的交互来说明这两个规则。

深入理解Java内存模型(锁)

锁的释放-获取建立的Happens-Before关系

锁是Java并发编程中最重要的同步机制。锁除了让临界区互斥执行外,还可以让释放锁的线程向获取同一个锁的线程发送消息。
下面是锁释放-获取的示例代码。

1
2
3
4
5
6
7
8
9
10
11
12
class MonitorExample {
int a = 0;

public synchronized void writer() { //1
a++; //2
} //3

public synchronized void reader() { //4
int i = a; //5
……
} //6
}

假设线程A执行writer(),随后线程B执行reader()
根据happens-before规则,这个过程包含的happens-before关系可以分为两类。

  1. 根据程序次序规则,1 happens-before 22 happens-before 34 happens-before 55 happens-before 6
  2. 根据监视器锁规则,3 happens-before 4
  3. 根据happens-before的传递性,2 happens-before 5

上述happens-before关系的图形化表现形式如下。

在上图中,每一个箭头链接的两个节点,代表了一个happens-before关系。黑色箭头表示程序顺序规则;橙色箭头表示监视器锁规则;蓝色箭头表示组合这些规则后提供的happens-before保证。
上图表示在线程A释放了锁之后,随后线程B获取同一个锁。在上图中,2 happens-before 5。因此,线程A在释放锁之前所有可见的共享变量,在线程B获取同一个锁之后,将立刻变得对B线程可见。

深入理解Java内存模型(重排序)

数据的依赖性

如果两个操作访问同一个变量,且这两个操作中有一个为写操作,此时这两个操作之间就存在数据依赖性。数据依赖分下列三种类型。

上面三种情况,只要重排序两个操作的执行顺序,程序的执行结果将会被改变。
前面提到过,编译器和处理器可能会对操作做重排序。编译器和处理器在重排序时,会遵守数据依赖性,编译器和处理器不会改变存在数据依赖关系的两个操作的执行顺序。
注意,这里所说的数据依赖性仅针对单个处理器中执行的指令序列和单个线程中执行的操作,不同处理器之间和不同线程之间的数据依赖性不被编译器和处理器考虑。

深入理解Java内存模型(顺序一致性)

数据竞争与顺序一致性的保证

当程序未正确同步时,就会存在数据竞争。Java内存模型规范对数据竞争的定义如下:

  1. 在一个线程中写一个变量,
  2. 在另一个线程读同一个变量,
  3. 而且写和读没有通过同步来排序。

当代码中包含数据竞争时,程序的执行往往产生违反直觉的结果。如果一个多线程程序能正确同步,这个程序将是一个没有数据竞争的程序。
JMM对正确同步的多线程程序的内存一致性做了如下保证:
如果程序是正确同步的,程序的执行将具有顺序一致性(sequentially consistent)–即程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同。这里的同步是指广义上的同步,包括对常用同步原语(lockvolatilefinal)的正确使用。

深入理解Java内存模型(Volatile-Happens before)

volatile特性

当我们声明共享变量为volatile后,对这个变量的读/写将会很特别。
理解volatile特性的一个好方法,把对volatile变量的单个读/写,看成是使用同一个监视器锁对这些单个读/写操作做了同步。下面我们通过具体的示例来说明,请看下面的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class VolatileFeaturesExample {
volatile long vl = 0L; //使用volatile声明64位的long型变量

public void set(long l) {
vl = l; //单个volatile变量的写
}

public void getAndIncrement () {
vl++; //复合(多个)volatile变量的读/写
}


public long get() {
return vl; //单个volatile变量的读
}
}

假设有多个线程分别调用上面程序的三个方法,这个程序在语意上和下面程序等价:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class VolatileFeaturesExample {
long vl = 0L; // 64位的long型普通变量

public synchronized void set(long l) { //对单个的普通 变量的写用同一个监视器同步
vl = l;
}

public void getAndIncrement () { //普通方法调用
long temp = get(); //调用已同步的读方法
temp += 1L; //普通写操作
set(temp); //调用已同步的写方法
}

public synchronized long get() {
//对单个的普通变量的读用同一个监视器同步
return vl;
}
}

如上面示例程序所示,对一个volatile变量的单个读/写操作,与对一个普通变量的读/写操作使用同一个监视器锁来同步,它们之间的执行效果相同。
监视器锁的happens-before规则保证释放监视器和获取监视器的两个线程之间的内存可见性,这意味着对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。
监视器锁的语义决定了临界区代码的执行具有原子性。这意味着即使是64位的long型和double型变量,只要它是volatile变量,对该变量的读写就将具有原子性。如果是多个volatile操作或类似于volatile++这种复合操作,这些操作整体上不具有原子性。
简而言之,volatile变量自身具有下列特性:

  • 可见性。对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。
  • 原子性。对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性。

volatile的简单变量如果当前值由该变量以前的值相关,那么volatile关键字不起作用,也就是说如下的表达式都不是原子操作。

1
2
n=n+1 ;
n++;

只有当变量的值和自身上一个值无关时对该变量的操作才是原子级别的,如n = m + 1,这个就是原子级别的。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×