/** * An {@link ExecutorService} that can schedule commands to run after a given * delay, or to execute periodically. * * <p> The <tt>schedule</tt> methods create tasks with various delays * and return a task object that can be used to cancel or check * execution. The <tt>scheduleAtFixedRate</tt> and * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks * that run periodically until cancelled. * * <p> Commands submitted using the {@link Executor#execute} and * {@link ExecutorService} <tt>submit</tt> methods are scheduled with * a requested delay of zero. Zero and negative delays (but not * periods) are also allowed in <tt>schedule</tt> methods, and are * treated as requests for immediate execution. * * ..... * * @since 1.5 * @author Doug Lea */ publicinterfaceScheduledExecutorServiceextendsExecutorService{
/** * Creates and executes a one-shot action that becomes enabled * after the given delay. * * @param command the task to execute * @param delay the time from now to delay execution * @param unit the time unit of the delay parameter * @return a ScheduledFuture representing pending completion of * the task and whose <tt>get()</tt> method will return * <tt>null</tt> upon completion * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if command is null */ // 1 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
/** * Creates and executes a ScheduledFuture that becomes enabled after the * given delay. * * @param callable the function to execute * @param delay the time from now to delay execution * @param unit the time unit of the delay parameter * @return a ScheduledFuture that can be used to extract result or cancel * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if callable is null */ // 2 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
/** * Creates and executes a periodic action that becomes enabled first * after the given initial delay, and subsequently with the given * period; that is executions will commence after * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then * <tt>initialDelay + 2 * period</tt>, and so on. * If any execution of the task * encounters an exception, subsequent executions are suppressed. * Otherwise, the task will only terminate via cancellation or * termination of the executor. If any execution of this task * takes longer than its period, then subsequent executions * may start late, but will not concurrently execute. * * @param command the task to execute * @param initialDelay the time to delay first execution * @param period the period between successive executions * @param unit the time unit of the initialDelay and period parameters * @return a ScheduledFuture representing pending completion of * the task, and whose <tt>get()</tt> method will throw an * exception upon cancellation * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if command is null * @throws IllegalArgumentException if period less than or equal to zero */ // 3 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
/** * Creates and executes a periodic action that becomes enabled first * after the given initial delay, and subsequently with the * given delay between the termination of one execution and the * commencement of the next. If any execution of the task * encounters an exception, subsequent executions are suppressed. * Otherwise, the task will only terminate via cancellation or * termination of the executor. * * @param command the task to execute * @param initialDelay the time to delay first execution * @param delay the delay between the termination of one * execution and the commencement of the next * @param unit the time unit of the initialDelay and delay parameters * @return a ScheduledFuture representing pending completion of * the task, and whose <tt>get()</tt> method will throw an * exception upon cancellation * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if command is null * @throws IllegalArgumentException if delay less than or equal to zero */ // 4 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
/** * False if should cancel/suppress periodic tasks on shutdown. */ // 1 privatevolatileboolean continueExistingPeriodicTasksAfterShutdown;
/** * False if should cancel non-periodic tasks on shutdown. */ // 2 privatevolatileboolean executeExistingDelayedTasksAfterShutdown = true;
/** * Sequence number to break scheduling ties, and in turn to * guarantee FIFO order among tied entries. */ // 3 privatestaticfinal AtomicLong sequencer = new AtomicLong(0);
/** Base of nanosecond timings, to avoid wrapping */ privatestaticfinallong NANO_ORIGIN = System.nanoTime(); ....
/** * Creates a new ScheduledThreadPoolExecutor with the given core * pool size. * * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> */ publicScheduledThreadPoolExecutor(int corePoolSize){ super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }
/** * Creates a new ScheduledThreadPoolExecutor with the given * initial parameters. * * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> * @throws NullPointerException if threadFactory is null */ publicScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory){ super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory); }
/** * Creates a new ScheduledThreadPoolExecutor with the given * initial parameters. * * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> * @throws NullPointerException if handler is null */ publicScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler){ super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), handler); } /** * Creates a new ScheduledThreadPoolExecutor with the given * initial parameters. * * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached. * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> * @throws NullPointerException if threadFactory or handler is null */ publicScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler){ super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
/** * An annoying wrapper class to convince javac to use a * DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable> */ privatestaticclassDelayedWorkQueue extendsAbstractCollection<Runnable> implementsBlockingQueue<Runnable> {
privatefinal DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>(); public Runnable poll(){ return dq.poll(); } public Runnable peek(){ return dq.peek(); } public Runnable take()throws InterruptedException { return dq.take(); } public Runnable poll(long timeout, TimeUnit unit)throws InterruptedException { return dq.poll(timeout, unit); }
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) thrownew NullPointerException(); // 1 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
// 2 delayedExecute(t); return t; }
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit){ if (callable == null || unit == null) thrownew NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; }
标注代码分析
将任务包装成一个RunnableScheduledFuture。
然后延迟执行这个RunnableScheduledFuture。
ScheduledThreadPoolExecutor#triggerTime()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * Returns the trigger time of a delayed action. */ // 1 privatelongtriggerTime(long delay, TimeUnit unit){ return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); }
/** * Returns the trigger time of a delayed action. */ longtriggerTime(long delay){ // 2 return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
标注代码分析
返回一个延迟动作的触发时间。
如果当前delay很大的话,要调用overflowFree来防止溢出。
ScheduledThreadPoolExecutor#overflowFree()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * Constrains the values of all delays in the queue to be within * Long.MAX_VALUE of each other, to avoid overflow in compareTo. * This may occur if a task is eligible to be dequeued, but has * not yet been, while some other task is added with a delay of * Long.MAX_VALUE. */ privatelongoverflowFree(long delay){ Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(TimeUnit.NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; }
/** * Modifies or replaces the task used to execute a runnable. * This method can be used to override the concrete * class used for managing internal tasks. * The default implementation simply returns the given task. * * @param runnable the submitted Runnable * @param task the task created to execute the runnable * @return a task that can execute the runnable * @since 1.6 */ protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task){ return task; }
/** * Modifies or replaces the task used to execute a callable. * This method can be used to override the concrete * class used for managing internal tasks. * The default implementation simply returns the given task. * * @param callable the submitted Callable * @param task the task created to execute the callable * @return a task that can execute the callable * @since 1.6 */ protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task){ return task; }
/** Sequence number to break ties FIFO */ // 1 privatefinallong sequenceNumber; /** The time the task is enabled to execute in nanoTime units */ // 2 privatelong time; /** * Period in nanoseconds for repeating tasks. A positive * value indicates fixed-rate execution. A negative value * indicates fixed-delay execution. A value of 0 indicates a * non-repeating task. */ // 3 privatefinallong period;
/** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }
/** * Creates a periodic action with given nano time and period. */ ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); }
/** * Creates a one-shot action with given nanoTime-based trigger. */ ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } ....
/** * A {@link ScheduledFuture} that is {@link Runnable}. Successful * execution of the <tt>run</tt> method causes completion of the * <tt>Future</tt> and allows access to its results. * @see FutureTask * @see Executor * @since 1.6 * @author Doug Lea * @param <V> The result type returned by this Future's <tt>get</tt> method */ publicinterfaceRunnableScheduledFuture<V> extendsRunnableFuture<V>, ScheduledFuture<V> {
/** * Returns true if this is a periodic task. A periodic task may * re-run according to some schedule. A non-periodic task can be * run only once. * * @return true if this task is periodic */ // 1 booleanisPeriodic(); }
publicintcompareTo(Delayed other){ if (other == this) // compare zero ONLY if same object return0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; elseif (diff > 0) return1; elseif (sequenceNumber < x.sequenceNumber)// 1 return -1; else return1; } // 2 long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0)? 0 : ((d < 0)? -1 : 1); }
标注代码分析
如果触发时间相等,那么比较序列号,从而保证顺序。
如果要比较的对象不是ScheduledFutureTask,那么按照延迟值进行比较。
ScheduledThreadPoolExecutor#getQueue()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/** * Returns the task queue used by this executor. Each element of * this queue is a {@link ScheduledFuture}, including those * tasks submitted using <tt>execute</tt> which are for scheduling * purposes used as the basis of a zero-delay * <tt>ScheduledFuture</tt>. Iteration over this queue is * <em>not</em> guaranteed to traverse tasks in the order in * which they will execute. * * @return the task queue */ public BlockingQueue<Runnable> getQueue(){ returnsuper.getQueue(); }
通过getDelay()实现延迟计算。overflowFree()通过Delayed head = (Delayed) super.getQueue().peek();获取延迟对象。getQueue()新增是runnable对象。
ScheduledThreadPoolExecutor#delayedExecute()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/** * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. */ privatevoiddelayedExecute(Runnable command){ if (isShutdown()) { // 1 reject(command); return; } // Prestart a thread if necessary. We cannot prestart it // running the task because the task (probably) shouldn't be // run yet, so thread will just idle until delay elapses. // 2 if (getPoolSize() < getCorePoolSize()) prestartCoreThread(); // 3 super.getQueue().add(command); }
/** * Runs a periodic task. */ privatevoidrunPeriodic(){ boolean ok = ScheduledFutureTask.super.runAndReset(); boolean down = isShutdown(); // Reschedule if not cancelled and not shutdown or policy allows if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) { long p = period; if (p > 0) time += p; else time = triggerTime(-p); ScheduledThreadPoolExecutor.super.getQueue().add(this); } // This might have been the final executed delayed // task. Wake up threads to check. elseif (down) interruptIdleWorkers(); }
/** * Overrides FutureTask version so as to reset/requeue if periodic. */ publicvoidrun(){ // 1 if (isPeriodic()) // 2 runPeriodic(); else // 3 ScheduledFutureTask.super.run(); }
/** * Runs a periodic task. */ privatevoidrunPeriodic(){ // 1 boolean ok = ScheduledFutureTask.super.runAndReset(); // 2 boolean down = isShutdown(); // Reschedule if not cancelled and not shutdown or policy allows // 3 if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) { // 4 long p = period; if (p > 0) // 5 time += p; else // 6 time = triggerTime(-p); // 7 ScheduledThreadPoolExecutor.super.getQueue().add(this); } // This might have been the final executed delayed // task. Wake up threads to check. // 8 elseif (down) interruptIdleWorkers(); }
/** * Sets the policy on whether to continue executing existing periodic * tasks even when this executor has been <tt>shutdown</tt>. In * this case, these tasks will only terminate upon * <tt>shutdownNow</tt>, or after setting the policy to * <tt>false</tt> when already shutdown. This value is by default * false. * * @param value if true, continue after shutdown, else don't. * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy */ publicvoidsetContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value){ continueExistingPeriodicTasksAfterShutdown = value; if (!value && isShutdown()) cancelUnwantedTasks(); }
/** * Sets the policy on whether to execute existing delayed * tasks even when this executor has been <tt>shutdown</tt>. In * this case, these tasks will only terminate upon * <tt>shutdownNow</tt>, or after setting the policy to * <tt>false</tt> when already shutdown. This value is by default * true. * * @param value if true, execute after shutdown, else don't. * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy */ publicvoidsetExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value){ executeExistingDelayedTasksAfterShutdown = value; if (!value && isShutdown()) cancelUnwantedTasks(); }
ScheduledThreadPoolExecutor#shutdown()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. If the * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has * been set <tt>false</tt>, existing delayed tasks whose delays * have not yet elapsed are cancelled. And unless the * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has * been set <tt>true</tt>, future executions of existing periodic * tasks will be cancelled. */ publicvoidshutdown(){ cancelUnwantedTasks(); super.shutdown(); }
/** * Cancels and clears the queue of all tasks that should not be run * due to shutdown policy. */ privatevoidcancelUnwantedTasks(){ // 1 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); // 2 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) // 3 super.getQueue().clear(); elseif (keepDelayed || keepPeriodic) { // 4 Object[] entries = super.getQueue().toArray(); for (int i = 0; i < entries.length; ++i) { Object e = entries[i]; if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if (t.isPeriodic()? !keepPeriodic : !keepDelayed) t.cancel(false); } } entries = null; // 5 purge(); } }