JDK1.6 FutureTask

功能简介

  • FutureTask是一种异步任务(或异步计算),举个栗子,主线程的逻辑中需要使用某个值,但这个值需要复杂的运算得来,那么主线程可以提前建立一个异步任务来计算这个值(在其他的线程中计算),然后去做其他事情,当需要这个值的时候再通过刚才建立的异步任务来获取这个值,有点并行的意思,这样可以缩短整个主线程逻辑的执行时间。
  • FutureTask是基于AQS来构建的,使用共享模式,使用AQS的状态来表示异步任务的运行状态。

源码分析

RunnableFuture接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the <tt>run</tt> method causes completion of the <tt>Future</tt>
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's <tt>get</tt> method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

Runnable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* The <code>Runnable</code> interface should be implemented by any
* class whose instances are intended to be executed by a thread. The
* class must define a method of no arguments called <code>run</code>.
* <p>
* This interface is designed to provide a common protocol for objects that
* wish to execute code while they are active. For example,
* <code>Runnable</code> is implemented by class <code>Thread</code>.
* Being active simply means that a thread has been started and has not
* yet been stopped.
*
* ......
* @author Arthur van Hoff
* @version %I%, %G%
* @see java.lang.Thread
* @see java.util.concurrent.Callable
* @since JDK1.0
*/
public
interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}

Future接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/**
* A <tt>Future</tt> represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation. The result can only be retrieved using method
* <tt>get</tt> when the computation has completed, blocking if
* necessary until it is ready. Cancellation is performed by the
* <tt>cancel</tt> method. Additional methods are provided to
* determine if the task completed normally or was cancelled. Once a
* computation has completed, the computation cannot be cancelled.
* If you would like to use a <tt>Future</tt> for the sake
* of cancellability but not provide a usable result, you can
* declare types of the form <tt>Future&lt;?&gt;</tt> and
* return <tt>null</tt> as a result of the underlying task.
*
* .....
*
* @see FutureTask
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this Future's <tt>get</tt> method
*/
public interface Future<V> {

/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when <tt>cancel</tt> is called,
* this task should never run. If the task has already started,
* then the <tt>mayInterruptIfRunning</tt> parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return <tt>true</tt>. Subsequent calls to {@link #isCancelled}
* will always return <tt>true</tt> if this method returned <tt>true</tt>.
*
* @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return <tt>false</tt> if the task could not be cancelled,
* typically because it has already completed normally;
* <tt>true</tt> otherwise
*/
// 1
boolean cancel(boolean mayInterruptIfRunning);

/**
* Returns <tt>true</tt> if this task was cancelled before it completed
* normally.
*
* @return <tt>true</tt> if this task was cancelled before it completed
*/
// 2
boolean isCancelled();

/**
* Returns <tt>true</tt> if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* <tt>true</tt>.
*
* @return <tt>true</tt> if this task completed
*/
// 3
boolean isDone();

/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
// 4
V get() throws InterruptedException, ExecutionException;

/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
// 5
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

标注代码分析

  1. 尝试取消任务的执行,如果任务已经完成或者已经被取消或者由于某种原因无法取消,方法返回false。如果任务取消成功,或者任务开始执行之前调用了取消方法,那么任务就永远不会执行了。mayInterruptIfRunning参数决定了是否要中断执行任务的线程。
  2. 判断任务是否在完成之前被取消。
  3. 判断任务是否完成。
  4. 等待,直到获取任务的执行结果。如果任务还没执行完,这个方法会阻塞。
  5. 等待,在给定的时间内获取任务的执行结果。

Future提供了取消任务接口,并提供了查看任务状态的接口,最重要的是提供有阻塞行为的获取任务执行结果的接口。

FutureTask#Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Synchronization control for FutureTask. Note that this must be
* a non-static inner class in order to invoke the protected
* <tt>done</tt> method. For clarity, all inner class support
* methods are same as outer, prefixed with "inner".
*
* Uses AQS sync state to represent run status
*/
private final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7828117401763700385L;

/** State value representing that task is running */
// 1
private static final int RUNNING = 1;
/** State value representing that task ran */
// 2
private static final int RAN = 2;
/** State value representing that task was cancelled */
// 3
private static final int CANCELLED = 4;

/** The underlying callable */
// 4
private final Callable<V> callable;
/** The result to return from get() */
// 5
private V result;
/** The exception to throw from get() */
// 6
private Throwable exception;

/**
* The thread running task. When nulled after set/cancel, this
* indicates that the results are accessible. Must be
* volatile, to ensure visibility upon completion.
*/
// 7
private volatile Thread runner;

Sync(Callable<V> callable) {
this.callable = callable;
}
}
  1. 任务正在执行。
  2. 任务已经运行完毕。
  3. 任务被取消。
  4. 内部变量callable
  5. 执行结果。
  6. 执行过程中发生的异常。
  7. 执行当前任务的线程。在set()/cancel()之后置空,说明可以了。必须使用volatile来修饰,以确保任务完成后的可见性。

Callable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* A task that returns a result and may throw an exception.
* Implementors define a single method with no arguments called
* <tt>call</tt>.
*
* <p>The <tt>Callable</tt> interface is similar to {@link
* java.lang.Runnable}, in that both are designed for classes whose
* instances are potentially executed by another thread. A
* <tt>Runnable</tt>, however, does not return a result and cannot
* throw a checked exception.
*
* <p> The {@link Executors} class contains utility methods to
* convert from other common forms to <tt>Callable</tt> classes.
*
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> the result type of method <tt>call</tt>
*/
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

FutureTask#innerRun()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void innerRun() {
// 1
if (!compareAndSetState(0, RUNNING))
// 2
return;
try {
// 3
runner = Thread.currentThread();
// 4
if (getState() == RUNNING) // recheck after setting thread
// 5
innerSet(callable.call());
else
// 6
releaseShared(0); // cancel
} catch (Throwable ex) {
// 7
innerSetException(ex);
}
}

标注代码分析

  1. 尝试设置任务运行状态为正在执行。
  2. 如果设置失败,直接返回。
  3. 设置执行线程。
  4. 再次检测任务状态。
  5. 执行任务,然后设置执行结果。
  6. 说明任务已取消。
  7. 如果执行任务过程中发生异常,设置异常。

FutureTask#Sync#innerSet()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  void innerSet(V v) {
for (;;) {
// 1
int s = getState();
if (s == RAN)
// 2
return;
if (s == CANCELLED) {
// 3
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
// 4
if (compareAndSetState(s, RAN)) {
// 5
result = v;
// 6
releaseShared(0);
// 7
done();
return;
}
}
}

标注代码分析

  1. 获取任务执行状态。
  2. 如果任务已经执行完毕,退出。
  3. 这里释放AQS控制权并设置runnernull,为了避免正在和一个试图中断线程的取消请求竞。
  4. 尝试将任务状态设置为执行完成。
  5. 设置执行结果。
  6. 释放AQS控制权。
  7. 子类可覆盖这个方法,做一些定制处理。

AbstractQueuedSynchronizer#releaseShared()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

AbstractQueuedSynchronizer#tryReleaseShared()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Attempts to set the state to reflect a release in shared mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this release of shared mode may permit a
* waiting acquire (shared or exclusive) to succeed; and
* {@code false} otherwise
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

FutureTask#Sync#tryReleaseShared()

1
2
3
4
5
6
7
8
/**
* Implements AQS base release to always signal after setting
* final done status by nulling runner thread.
*/
protected boolean tryReleaseShared(int ignore) {
runner = null;
return true;
}

根据注释可以明白,Sync实现AQS#tryReleaseShared()

FutureTask#Sync#innerSetException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void innerSetException(Throwable t) {
for (;;) {
int s = getState();
if (s == RAN)
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
exception = t;
result = null;
releaseShared(0);
done();
return;
}
}
}

innerRun()innerSet()类似,只不过最后要设置异常,清空result

FutureTask#Sync#innerRunAndReset()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
boolean innerRunAndReset() {
if (!compareAndSetState(0, RUNNING))
return false;
try {
runner = Thread.currentThread();
if (getState() == RUNNING)
callable.call(); // don't set result
runner = null;
return compareAndSetState(RUNNING, 0);
} catch (Throwable ex) {
innerSetException(ex);
return false;
}
}

innerRunAndReset()不设置执行结果,最后执行完毕后重置异步任务状态为0。

FutureTask#Sync#innerGet()

1
2
3
4
5
6
7
8
9
10
11
12
V innerGet() throws InterruptedException, ExecutionException {
// 1
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
// 2
throw new CancellationException();
if (exception != null)
// 3
throw new ExecutionException(exception);
// 4
return result;
}

标注代码分析

  1. 获取共享锁,无法获取时阻塞等待。
  2. 如果任务状态为取消,那么抛出CancellationException
  3. 如果任务执行异常,抛出ExecutionException,并传递异常。
  4. 成功执行完成,返回执行结果。

FutureTask#Sync#innerGet(long nanosTimeout)

1
2
3
4
5
6
7
8
9
V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
if (!tryAcquireSharedNanos(0, nanosTimeout))
throw new TimeoutException();
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}

判断任务是否完成,要依据任务(完成或取消)状态来判断。

FutureTask#get()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}

/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return sync.innerGet(unit.toNanos(timeout));
}

FutureTask#get()由sync#innerGet()的实现。

AbstractQueuedSynchronizer#acquireSharedInterruptibly()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

AbstractQueuedSynchronizer#tryAcquireShared()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Attempts to acquire in shared mode. This method should query if
* the state of the object permits it to be acquired in the shared
* mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread.
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return a negative value on failure; zero if acquisition in shared
* mode succeeded but no subsequent shared-mode acquire can
* succeed; and a positive value if acquisition in shared
* mode succeeded and subsequent shared-mode acquires might
* also succeed, in which case a subsequent waiting thread
* must check availability. (Support for three different
* return values enables this method to be used in contexts
* where acquires only sometimes act exclusively.) Upon
* success, this object has been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

FutureTask#Sync#tryAcquireShared()

1
2
3
4
5
6
/**
* Implements AQS base acquire to succeed if ran or cancelled
*/
protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
}

根据注释可以明白,Sync实现AQS#tryAcquireShared()

FutureTask#Sync#innerIsDone()

1
2
3
boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}

FutureTask#Sync#ranOrCancelled()

1
2
3
private boolean ranOrCancelled(int state) {
return (state & (RAN | CANCELLED)) != 0;
}

FutureTask#Sync#innerCancel()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
// 1
return false;
// 2
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
// 3
r.interrupt();
}
// 4
releaseShared(0);
// 5
done();
return true;
}

标注代码分析

  1. 如果任务已经执行完毕或者取消。
  2. 否则尝试设置任务状态为取消。
  3. 设置mayInterruptIfRunning=true,需要中断线程,
  4. 释放AQS的控制权。
  5. 这里也会调用done(),定制子类时需要注意下。

FutureTask#cancel()

1
2
3
public boolean cancel(boolean mayInterruptIfRunning) {
return sync.innerCancel(mayInterruptIfRunning);
}

由sync#innerCancel()实现。

FutureTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/**
* A cancellable asynchronous computation. This class provides a base
* implementation of {@link Future}, with methods to start and cancel
* a computation, query to see if the computation is complete, and
* retrieve the result of the computation. The result can only be
* retrieved when the computation has completed; the <tt>get</tt>
* method will block if the computation has not yet completed. Once
* the computation has completed, the computation cannot be restarted
* or cancelled.
*
* <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
* {@link java.lang.Runnable} object. Because <tt>FutureTask</tt>
* implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
* submitted to an {@link Executor} for execution.
*
* <p>In addition to serving as a standalone class, this class provides
* <tt>protected</tt> functionality that may be useful when creating
* customized task classes.
*
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this FutureTask's <tt>get</tt> method
*/
public class FutureTask<V> implements RunnableFuture<V> {
/** Synchronization control for FutureTask */
private final Sync sync;

/**
* Creates a <tt>FutureTask</tt> that will upon running, execute the
* given <tt>Callable</tt>.
*
* @param callable the callable task
* @throws NullPointerException if callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}

/**
* Creates a <tt>FutureTask</tt> that will upon running, execute the
* given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* <tt>Future&lt;?&gt; f = new FutureTask&lt;Object&gt;(runnable, null)</tt>
* @throws NullPointerException if runnable is null
*/
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}

public boolean isCancelled() {
return sync.innerIsCancelled();
}

public boolean isDone() {
return sync.innerIsDone();
}

public boolean cancel(boolean mayInterruptIfRunning) {
return sync.innerCancel(mayInterruptIfRunning);
}

/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}

/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return sync.innerGet(unit.toNanos(timeout));
}

/**
* Protected method invoked when this task transitions to state
* <tt>isDone</tt> (whether normally or via cancellation). The
* default implementation does nothing. Subclasses may override
* this method to invoke completion callbacks or perform
* bookkeeping. Note that you can query status inside the
* implementation of this method to determine whether this task
* has been cancelled.
*/
protected void done() { }

/**
* Sets the result of this Future to the given value unless
* this future has already been set or has been cancelled.
* This method is invoked internally by the <tt>run</tt> method
* upon successful completion of the computation.
* @param v the value
*/
protected void set(V v) {
sync.innerSet(v);
}

/**
* Causes this future to report an <tt>ExecutionException</tt>
* with the given throwable as its cause, unless this Future has
* already been set or has been cancelled.
* This method is invoked internally by the <tt>run</tt> method
* upon failure of the computation.
* @param t the cause of failure
*/
protected void setException(Throwable t) {
sync.innerSetException(t);
}

// The following (duplicated) doc comment can be removed once
//
// 6270645: Javadoc comments should be inherited from most derived
// superinterface or superclass
// is fixed.
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
public void run() {
sync.innerRun();
}

/**
* Executes the computation without setting its result, and then
* resets this Future to initial state, failing to do so if the
* computation encounters an exception or is cancelled. This is
* designed for use with tasks that intrinsically execute more
* than once.
* @return true if successfully run and reset
*/
protected boolean runAndReset() {
return sync.innerRunAndReset();
}
...

方法实现是基于FutureTask#sync实现。

FutureTask构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Creates a <tt>FutureTask</tt> that will upon running, execute the
* given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* <tt>Future&lt;?&gt; f = new FutureTask&lt;Object&gt;(runnable, null)</tt>
* @throws NullPointerException if runnable is null
*/
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}

Executors#callable()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns the given result. This
* can be useful when applying methods requiring a
* <tt>Callable</tt> to an otherwise resultless action.
* @param task the task to run
* @param result the result to return
* @return a callable object
* @throws NullPointerException if task null
*/
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

Executors#RunnableAdapter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}

3 总结

  1. 当前线程建立异步任务后,异步任务处于初始状态(内部有一个数值表示状态,初始为0),一般交由其他线程执行任务(比如提交给线程池处理)。当前线程通过异步任务的get()来获取执行结果时,如果异步任务此时还没执行完毕(内部状态既不是完成,也不是取消),那么当前线程会在get()处阻塞。
  2. 当其他线程(比如线程池中的工作线程)执行了异步任务,那么会将异步任务的状态改成”完成”(根据情况也可能是取消),同时将在get()除等待的线程唤醒。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×