/** * A {@link Future} that is {@link Runnable}. Successful execution of * the <tt>run</tt> method causes completion of the <tt>Future</tt> * and allows access to its results. * @see FutureTask * @see Executor * @since 1.6 * @author Doug Lea * @param <V> The result type returned by this Future's <tt>get</tt> method */ publicinterfaceRunnableFuture<V> extendsRunnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ voidrun(); }
/** * The <code>Runnable</code> interface should be implemented by any * class whose instances are intended to be executed by a thread. The * class must define a method of no arguments called <code>run</code>. * <p> * This interface is designed to provide a common protocol for objects that * wish to execute code while they are active. For example, * <code>Runnable</code> is implemented by class <code>Thread</code>. * Being active simply means that a thread has been started and has not * yet been stopped. * * ...... * @author Arthur van Hoff * @version %I%, %G% * @see java.lang.Thread * @see java.util.concurrent.Callable * @since JDK1.0 */ public interfaceRunnable{ /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ publicabstractvoidrun(); }
/** * A <tt>Future</tt> represents the result of an asynchronous * computation. Methods are provided to check if the computation is * complete, to wait for its completion, and to retrieve the result of * the computation. The result can only be retrieved using method * <tt>get</tt> when the computation has completed, blocking if * necessary until it is ready. Cancellation is performed by the * <tt>cancel</tt> method. Additional methods are provided to * determine if the task completed normally or was cancelled. Once a * computation has completed, the computation cannot be cancelled. * If you would like to use a <tt>Future</tt> for the sake * of cancellability but not provide a usable result, you can * declare types of the form <tt>Future<?></tt> and * return <tt>null</tt> as a result of the underlying task. * * ..... * * @see FutureTask * @see Executor * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this Future's <tt>get</tt> method */ publicinterfaceFuture<V> {
/** * Attempts to cancel execution of this task. This attempt will * fail if the task has already completed, has already been cancelled, * or could not be cancelled for some other reason. If successful, * and this task has not started when <tt>cancel</tt> is called, * this task should never run. If the task has already started, * then the <tt>mayInterruptIfRunning</tt> parameter determines * whether the thread executing this task should be interrupted in * an attempt to stop the task. * * <p>After this method returns, subsequent calls to {@link #isDone} will * always return <tt>true</tt>. Subsequent calls to {@link #isCancelled} * will always return <tt>true</tt> if this method returned <tt>true</tt>. * * @param mayInterruptIfRunning <tt>true</tt> if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete * @return <tt>false</tt> if the task could not be cancelled, * typically because it has already completed normally; * <tt>true</tt> otherwise */ // 1 booleancancel(boolean mayInterruptIfRunning);
/** * Returns <tt>true</tt> if this task was cancelled before it completed * normally. * * @return <tt>true</tt> if this task was cancelled before it completed */ // 2 booleanisCancelled();
/** * Returns <tt>true</tt> if this task completed. * * Completion may be due to normal termination, an exception, or * cancellation -- in all of these cases, this method will return * <tt>true</tt>. * * @return <tt>true</tt> if this task completed */ // 3 booleanisDone();
/** * Waits if necessary for the computation to complete, and then * retrieves its result. * * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting */ // 4 V get()throws InterruptedException, ExecutionException;
/** * Waits if necessary for at most the given time for the computation * to complete, and then retrieves its result, if available. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the wait timed out */ // 5 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
/** * Synchronization control for FutureTask. Note that this must be * a non-static inner class in order to invoke the protected * <tt>done</tt> method. For clarity, all inner class support * methods are same as outer, prefixed with "inner". * * Uses AQS sync state to represent run status */ privatefinalclassSyncextendsAbstractQueuedSynchronizer{ privatestaticfinallong serialVersionUID = -7828117401763700385L;
/** State value representing that task is running */ // 1 privatestaticfinalint RUNNING = 1; /** State value representing that task ran */ // 2 privatestaticfinalint RAN = 2; /** State value representing that task was cancelled */ // 3 privatestaticfinalint CANCELLED = 4;
/** The underlying callable */ // 4 privatefinal Callable<V> callable; /** The result to return from get() */ // 5 private V result; /** The exception to throw from get() */ // 6 private Throwable exception;
/** * The thread running task. When nulled after set/cancel, this * indicates that the results are accessible. Must be * volatile, to ensure visibility upon completion. */ // 7 privatevolatile Thread runner;
/** * A task that returns a result and may throw an exception. * Implementors define a single method with no arguments called * <tt>call</tt>. * * <p>The <tt>Callable</tt> interface is similar to {@link * java.lang.Runnable}, in that both are designed for classes whose * instances are potentially executed by another thread. A * <tt>Runnable</tt>, however, does not return a result and cannot * throw a checked exception. * * <p> The {@link Executors} class contains utility methods to * convert from other common forms to <tt>Callable</tt> classes. * * @see Executor * @since 1.5 * @author Doug Lea * @param <V> the result type of method <tt>call</tt> */ publicinterfaceCallable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call()throws Exception; }
voidinnerSet(V v){ for (;;) { // 1 int s = getState(); if (s == RAN) // 2 return; if (s == CANCELLED) { // 3 // aggressively release to set runner to null, // in case we are racing with a cancel request // that will try to interrupt runner releaseShared(0); return; } // 4 if (compareAndSetState(s, RAN)) { // 5 result = v; // 6 releaseShared(0); // 7 done(); return; } } }
标注代码分析
获取任务执行状态。
如果任务已经执行完毕,退出。
这里释放AQS控制权并设置runner为null,为了避免正在和一个试图中断线程的取消请求竞。
尝试将任务状态设置为执行完成。
设置执行结果。
释放AQS控制权。
子类可覆盖这个方法,做一些定制处理。
AbstractQueuedSynchronizer#releaseShared()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ publicfinalbooleanreleaseShared(int arg){ if (tryReleaseShared(arg)) { doReleaseShared(); returntrue; } returnfalse; }
/** * Attempts to set the state to reflect a release in shared mode. * * <p>This method is always invoked by the thread performing release. * * <p>The default implementation throws * {@link UnsupportedOperationException}. * * @param arg the release argument. This value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. The value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this release of shared mode may permit a * waiting acquire (shared or exclusive) to succeed; and * {@code false} otherwise * @throws IllegalMonitorStateException if releasing would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if shared mode is not supported */ protectedbooleantryReleaseShared(int arg){ thrownew UnsupportedOperationException(); }
FutureTask#Sync#tryReleaseShared()
1 2 3 4 5 6 7 8
/** * Implements AQS base release to always signal after setting * final done status by nulling runner thread. */ protectedbooleantryReleaseShared(int ignore){ runner = null; returntrue; }
voidinnerSetException(Throwable t){ for (;;) { int s = getState(); if (s == RAN) return; if (s == CANCELLED) { // aggressively release to set runner to null, // in case we are racing with a cancel request // that will try to interrupt runner releaseShared(0); return; } if (compareAndSetState(s, RAN)) { exception = t; result = null; releaseShared(0); done(); return; } } }
innerRun()和innerSet()类似,只不过最后要设置异常,清空result。
FutureTask#Sync#innerRunAndReset()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
booleaninnerRunAndReset(){ if (!compareAndSetState(0, RUNNING)) returnfalse; try { runner = Thread.currentThread(); if (getState() == RUNNING) callable.call(); // don't set result runner = null; return compareAndSetState(RUNNING, 0); } catch (Throwable ex) { innerSetException(ex); returnfalse; } }
innerRunAndReset()不设置执行结果,最后执行完毕后重置异步任务状态为0。
FutureTask#Sync#innerGet()
1 2 3 4 5 6 7 8 9 10 11 12
V innerGet()throws InterruptedException, ExecutionException { // 1 acquireSharedInterruptibly(0); if (getState() == CANCELLED) // 2 thrownew CancellationException(); if (exception != null) // 3 thrownew ExecutionException(exception); // 4 return result; }
标注代码分析
获取共享锁,无法获取时阻塞等待。
如果任务状态为取消,那么抛出CancellationException。
如果任务执行异常,抛出ExecutionException,并传递异常。
成功执行完成,返回执行结果。
FutureTask#Sync#innerGet(long nanosTimeout)
1 2 3 4 5 6 7 8 9
V innerGet(long nanosTimeout)throws InterruptedException, ExecutionException, TimeoutException { if (!tryAcquireSharedNanos(0, nanosTimeout)) thrownew TimeoutException(); if (getState() == CANCELLED) thrownew CancellationException(); if (exception != null) thrownew ExecutionException(exception); return result; }
判断任务是否完成,要依据任务(完成或取消)状态来判断。
FutureTask#get()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/** * @throws CancellationException {@inheritDoc} */ public V get()throws InterruptedException, ExecutionException { return sync.innerGet(); }
/** * @throws CancellationException {@inheritDoc} */ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return sync.innerGet(unit.toNanos(timeout)); }
/** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ publicfinalvoidacquireSharedInterruptibly(int arg)throws InterruptedException { if (Thread.interrupted()) thrownew InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
/** * Attempts to acquire in shared mode. This method should query if * the state of the object permits it to be acquired in the shared * mode, and if so to acquire it. * * <p>This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. * * <p>The default implementation throws {@link * UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if shared mode is not supported */ protectedinttryAcquireShared(int arg){ thrownew UnsupportedOperationException(); }
FutureTask#Sync#tryAcquireShared()
1 2 3 4 5 6
/** * Implements AQS base acquire to succeed if ran or cancelled */ protectedinttryAcquireShared(int ignore){ return innerIsDone()? 1 : -1; }
/** * A cancellable asynchronous computation. This class provides a base * implementation of {@link Future}, with methods to start and cancel * a computation, query to see if the computation is complete, and * retrieve the result of the computation. The result can only be * retrieved when the computation has completed; the <tt>get</tt> * method will block if the computation has not yet completed. Once * the computation has completed, the computation cannot be restarted * or cancelled. * * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or * {@link java.lang.Runnable} object. Because <tt>FutureTask</tt> * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be * submitted to an {@link Executor} for execution. * * <p>In addition to serving as a standalone class, this class provides * <tt>protected</tt> functionality that may be useful when creating * customized task classes. * * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this FutureTask's <tt>get</tt> method */ publicclassFutureTask<V> implementsRunnableFuture<V> { /** Synchronization control for FutureTask */ privatefinal Sync sync;
/** * Creates a <tt>FutureTask</tt> that will upon running, execute the * given <tt>Callable</tt>. * * @param callable the callable task * @throws NullPointerException if callable is null */ publicFutureTask(Callable<V> callable){ if (callable == null) thrownew NullPointerException(); sync = new Sync(callable); }
/** * Creates a <tt>FutureTask</tt> that will upon running, execute the * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the * given result on successful completion. * * @param runnable the runnable task * @param result the result to return on successful completion. If * you don't need a particular result, consider using * constructions of the form: * <tt>Future<?> f = new FutureTask<Object>(runnable, null)</tt> * @throws NullPointerException if runnable is null */ publicFutureTask(Runnable runnable, V result){ sync = new Sync(Executors.callable(runnable, result)); }
/** * @throws CancellationException {@inheritDoc} */ public V get()throws InterruptedException, ExecutionException { return sync.innerGet(); }
/** * @throws CancellationException {@inheritDoc} */ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return sync.innerGet(unit.toNanos(timeout)); }
/** * Protected method invoked when this task transitions to state * <tt>isDone</tt> (whether normally or via cancellation). The * default implementation does nothing. Subclasses may override * this method to invoke completion callbacks or perform * bookkeeping. Note that you can query status inside the * implementation of this method to determine whether this task * has been cancelled. */ protectedvoiddone(){ }
/** * Sets the result of this Future to the given value unless * this future has already been set or has been cancelled. * This method is invoked internally by the <tt>run</tt> method * upon successful completion of the computation. * @param v the value */ protectedvoidset(V v){ sync.innerSet(v); }
/** * Causes this future to report an <tt>ExecutionException</tt> * with the given throwable as its cause, unless this Future has * already been set or has been cancelled. * This method is invoked internally by the <tt>run</tt> method * upon failure of the computation. * @param t the cause of failure */ protectedvoidsetException(Throwable t){ sync.innerSetException(t); }
// The following (duplicated) doc comment can be removed once // // 6270645: Javadoc comments should be inherited from most derived // superinterface or superclass // is fixed. /** * Sets this Future to the result of its computation * unless it has been cancelled. */ publicvoidrun(){ sync.innerRun(); }
/** * Executes the computation without setting its result, and then * resets this Future to initial state, failing to do so if the * computation encounters an exception or is cancelled. This is * designed for use with tasks that intrinsically execute more * than once. * @return true if successfully run and reset */ protectedbooleanrunAndReset(){ return sync.innerRunAndReset(); } ...
方法实现是基于FutureTask#sync实现。
FutureTask构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/** * Creates a <tt>FutureTask</tt> that will upon running, execute the * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the * given result on successful completion. * * @param runnable the runnable task * @param result the result to return on successful completion. If * you don't need a particular result, consider using * constructions of the form: * <tt>Future<?> f = new FutureTask<Object>(runnable, null)</tt> * @throws NullPointerException if runnable is null */ publicFutureTask(Runnable runnable, V result){ sync = new Sync(Executors.callable(runnable, result)); }
Executors#callable()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/** * Returns a {@link Callable} object that, when * called, runs the given task and returns the given result. This * can be useful when applying methods requiring a * <tt>Callable</tt> to an otherwise resultless action. * @param task the task to run * @param result the result to return * @return a callable object * @throws NullPointerException if task null */ publicstatic <T> Callable<T> callable(Runnable task, T result){ if (task == null) thrownew NullPointerException(); returnnew RunnableAdapter<T>(task, result); }
Executors#RunnableAdapter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/** * A callable that runs given task and returns given result */ staticfinalclassRunnableAdapter<T> implementsCallable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call(){ task.run(); return result; } }