JDK1.7 FutureTask

功能简介

  • FutureTask是一种异步任务(或异步计算),举个栗子,主线程的逻辑中需要使用某个值,但这个值需要负责的运算得来,那么主线程可以提前建立一个异步任务来计算这个值(在其他的线程中计算),然后去做其他事情,当需要这个值的时候再通过刚才建立的异步任务来获取这个值,有点并行的意思,这样可以缩短整个主线程逻辑的执行时间。
  • 1.7与1.6版本不同,1.7的FutureTask不再基于AQS来构建,而是在内部采用简单的Treiber Stack来保存等待线程。

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FutureTask<V> implements RunnableFuture<V> {  


private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// 1
private Callable<V> callable;
// 2
private Object outcome;
// 3
private volatile Thread runner;
// 4
private volatile WaitNode waiters;

内部状态可能得迁转过程

  • NEW -> COMPLETING -> NORMAL //正常完成
  • NEW -> COMPLETING -> EXCEPTIONAL //发生异常
  • NEW -> CANCELLED //取消
  • NEW -> INTERRUPTING -> INTERRUPTED //中断

标注代码分析

  1. 内部的callable,运行完成后设置为null。
  2. 如果正常完成,就是执行结果,通过get方法获取;如果发生异常,就是具体的异常对象,通过get方法抛出。本身没有volatile修饰, 依赖state的读写来保证可见性。
  3. 执行内部callable的线程。
  4. 存放等待线程的Treiber Stack。

WaitNode

1
2
3
4
5
static final class WaitNode {  
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

包含当前线程对象,并有指向下一个WaitNode的指针,所谓的Treiber Stack就是由WaitNode组成的(一个单向链表)。

分析源码

按照FutureTask的运行过程来分析。

  1. 创建任务,实际使用时,一般会结合线程池(ThreadPoolExecutor)使用,所以是在线程池内部创建FutureTask
  2. 执行任务,一般会有由工作线程(对于我们当前线程来说的其他线程)调用FutureTask#run(),完成执行。
  3. 获取结果,一般会有我们的当前线程去调用get()来获取执行结果,如果获取时,任务并没有被执行完毕,当前线程就会被阻塞,直到任务被执行完毕,然后获取结果。
  4. 取消任务,某些情况下会放弃任务的执行,进行任务取消。

    创建任务

    FutureTask

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public FutureTask(Callable<V> callable) {  
    if (callable == null)
    throw new NullPointerException();
    this.callable = callable;
    this.state = NEW; // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW; // ensure visibility of callable
    }

构造一个FutureTask很简单,可以通过一个Callable来构建,也可以通过一个Runnable和一个result来构建。
这里要注意的是必须把state的写放到最后,因为state本身由volatile修饰,所以可以保证callable的可见性。(因为后续读callable之前会先读state,还记得这个volatile写读的HB规则吧)。

执行任务

run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void run() {  
//1
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//2
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//3
setException(ex);
}
if (ran)
set(result); //4
}
} finally {
//5
runner = null;
//6
int s = state;
if (s >= INTERRUPTING)
//7
handlePossibleCancellationInterrupt(s);
}
}

标注代码分析

  1. 如果state不为null或者尝试设置runner为当前线程,失败就退出。
  2. 执行任务。
  3. 如果发生异常,设置异常。
  4. 如果正常执行完成,设置执行结果。
  5. runner必须在设置了state之后再置空,避免run方法出现并发问题。
  6. 这里还必须再读一次state,避免丢失中断。
  7. 处理可能发生的取消中断(cancel(true))。

    set()

    1
    2
    3
    4
    5
    6
    7
    8
    protected void set(V v) {  
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = v;
    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    //1
    finishCompletion();
    }
    }

set过程中,首先尝试将当前任务状态stateNEW改为COMPLETING。如果成功的话,再设置执行结果到outcome。然后将state再次设置为NORMAL,注意这次使用的是putOrderedInt,其实就是原子量的LazySet内部使用的方法。为什么使用这个方法?首先LazySet相对于Volatile-Write来说更廉价,因为它没有昂贵的Store/Load屏障,只有Store/Store屏障(x86下Store/Store屏障是一个空操作),其次,后续线程不会及时的看到stateCOMPLETING变为NORMAL,但这没什么关系,而且NORMALstate的最终状态之一,以后不会在变化了。
标注代码分析

  1. 唤醒Treiber Stack中所有等待线程。

finishCompletion()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void finishCompletion() {  
for (WaitNode q; (q = waiters) != null;) {
//1
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
//2
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 3
done();
// 4
callable = null;
}

finishCompletion主要就是在任务执行完毕后,移除Treiber Stack,并将Treiber Stack中所有等待获取任务结果的线程唤醒,然后回调下done钩子方法。
run过程中如果发生异常,调用的setException()
标注代码分析

  1. 尝试将waiters设置为null。
  2. 然后将waiters中的等待线程全部唤醒。
  3. 回调下钩子方法。
  4. 置空callable,减少内存占用。

setException()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected void setException(Throwable t) {  
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
```
#### handlePossibleCancellationInterrupt()
``` java
/**
* 确保cancel(true)产生的中断发生在run或runAndReset方法过程中。
*/
private void handlePossibleCancellationInterrupt(int s) {
//1
if (s == INTERRUPTING)
//2
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt

// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}

标注代码分析

  1. 如果当前正在中断过程中,自旋等待一下,等中断完成。
  2. 这里的state状态一定是INTERRUPTED; 这里不能清除中断标记,因为没办法区分来自cancel(true)的中断。

    总结run()

  3. 只有state为NEW的时候才执行任务(调用内部callable#run())。执行前会原子的设置执行线程(runner),防止竞争。
  4. 如果任务执行成功,任务状态从NEW迁转为COMPLETING(原子),设置执行结果,任务状态从COMPLETING迁转为NORMAL(LazySet);如果任务执行过程中发生异常,任务状态从NEW迁转为COMPLETING(原子),设置异常结果,任务状态从COMPLETING迁转为EXCEPTIONAL(LazySet)
  5. 将Treiber Stack中等待当前任务执行结果的等待节点中的线程全部唤醒,同时删除这些等待节点,将整个Treiber Stack置空。
  6. 最后别忘了等一下可能发生的cancel(true)中引起的中断,让这些中断发生在执行任务过程中(别泄露出去)。

runAndReset()

周期性任务的时候用到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected boolean runAndReset() {  
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}

可见runAndResetrun方法的区别只是执行完毕后不设置结果、而且有返回值表示是否执行成功。

获取结果

get()

1
2
3
4
5
6
7
8
public V get() throws InterruptedException, ExecutionException {  
int s = state;
if (s <= COMPLETING)
//1
s = awaitDone(false, 0L);
//2
return report(s);
}

标注代码分析

  1. 如果任务还没执行完毕,等待任务执行完毕。
  2. 如果任务执行完毕,获取执行结果。

awaitDone()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private int awaitDone(boolean timed, long nanos)  
throws InterruptedException {
//1
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
//2
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
//3
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
//4
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
//5
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//6
removeWaiter(q);
return state;
}
//7
LockSupport.parkNanos(this, nanos);
}
else
//8
LockSupport.park(this);
}
}

标注代码分析

  1. 先算出到期时间。
  2. 如果当前线程被中断,移除等待节点q,然后抛出中断异常。
  3. 如果任务已经执行完毕,设置thread=null。
  4. 如果当前正在完成过程中,出让CPU。
  5. 将q(包含当前线程的等待节点)入队。
  6. 如果超时,移除等待节点q。
  7. 超时的话,就阻塞给定时间。
  8. 没设置超时的话,就阻塞当前线程。

removeWaiter()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void removeWaiter(WaitNode node) {  
if (node != null) {
//1
node.thread = null;
//2
retry:
for (;;) {
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}

标注代码分析

  1. 将node的thread域置空。
  2. 下面过程中会将node从等待队列中移除,以thread域为null为依据,如果过程中发生了竞争,重试。

get()#report

1
2
3
4
5
6
7
8
private V report(int s) throws ExecutionException {  
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

get(long timeout, TimeUnit unit)

1
2
3
4
5
6
7
8
9
10
public V get(long timeout, TimeUnit unit)  
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

总结get()

  1. 首先检查当前任务的状态,如果状态表示执行完成,进入第2步。
  2. 获取执行结果,也可能得到取消或者执行异常,get过程结束。
  3. 如果当前任务状态表示未执行或者正在执行,那么当前线程放入一个新建的等待节点,然后进入Treiber Stack进行阻塞等待。
  4. 如果任务被工作线程(对当前线程来说是其他线程)执行完毕,执行完毕时工作线程会唤醒Treiber Stack上等待的所有线程,所以当前线程被唤醒,清空当前等待节点上的线程域,然后进入第2步。
  5. 当前线程在阻塞等待结果过程中可能被中断,如果被中断,那么会移除当前线程在Treiber Stack上对应的等待节点,然后抛出中断异常,get过程结束。
  6. 当前线程也可能执行带有超时时间的阻塞等待,如果超时时间过了,还没得到执行结果,那么会除当前线程在Treiber Stack上对应的等待节点,然后抛出超时异常,get过程结束。

cancel()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean cancel(boolean mayInterruptIfRunning) {  
//1
if (state != NEW)
return false;
if (mayInterruptIfRunning) {
//2
if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
return false;
Thread t = runner;
//3
if (t != null)
t.interrupt();
//4
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
}
//5
else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
return false;
//6
finishCompletion();
return true;
}

在设置mayInterruptIfRunningtrue的情况下,内部首先通过一个原子操作将stateNEW转变为INTERRUPTING,然后中断执行任务的线程,然后在通过一个LazySet的操作将stateINTERRUPTING转变为INTERRUPTED,由于后面这个操作对其他线程并不会立即可见,所以handlePossibleCancellationInterrupt才会有一个自旋等待stateINTERRUPTING变为INTERRUPTED的过程。
标注代码分析

  1. 如果任务已经执行完毕,返回false。
  2. 如果有中断任务的标志,尝试将任务状态设置为INTERRUPTING。
  3. 上面设置成功的话,这里进行线程中断。
  4. 最后将任务状态设置为INTERRUPTED,注意这里又是LazySet。
  5. 如果没有中断任务的标志,尝试将任务状态设置为CANCELLED。
  6. 最后唤醒Treiber Stack中所有等待线程。

    查看任务状态

    1
    2
    3
    4
    5
    6
    public boolean isCancelled() {  
    return state >= CANCELLED;
    }
    public boolean isDone() {
    return state != NEW;
    }

JDK1.7和JDK1.6的区别

JDK1.7的FutureTask不像1.6那样基于AQS构建。
前面贴代码了时候故意去掉了一些注释,避免读代码的时候受影响,现在我们来看一下关键的一段。

/*

* Revision notes: This differs from previous versions of this
* class that relied on AbstractQueuedSynchronizer, mainly to
* avoid surprising users about retaining interrupt status during
* cancellation races. Sync control in the current design relies
* on a "state" field updated via CAS to track completion, along
* with a simple Treiber stack to hold waiting threads.
*
* Style note: As usual, we bypass overhead of using
* AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
*/

使用AQS的方式,可能会在取消发生竞争过程中诡异的保留了中断状态。这里之所以没有采用这种方式,是为了避免这种情况的发生。
具体什么情况下会发生呢?

1
2
3
ThreadPoolExecutor executor = ...;  
executor.submit(task1).cancel(true);
executor.submit(task2);

看上面的代码,虽然中断的是task1,但可能task2得到中断信号。
原因是什么呢?看下JDK1.6的FutureTask的中断代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
boolean innerCancel(boolean mayInterruptIfRunning) {  
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
//1
if (r != null)
//2
r.interrupt();
}
releaseShared(0);
done();
return true;
}

结合上面代码例子看一下,如果主线程执行到标注1的时候,线程池可能会认为task1已经执行结束(被取消),然后让之前执行task1工作线程去执行task2,工作线程开始执行task2之后,然后主线程执行标注2(我们会发现并没有任何同步机制来阻止这种情况的发生),这样就会导致task2被中断了。
所以现在就能更好的理解JDK1.7 FutureTask的handlePossibleCancellationInterrupt中为什么要将cancel(true)中的中断保留在当前run方法运行范围内!

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×