/** * Attempts to cancel execution of this task. This attempt will * fail if the task has already completed or could not be * cancelled for some other reason. If successful, and this task * has not started when {@code cancel} is called, execution of * this task is suppressed. After this method returns * successfully, unless there is an intervening call to {@link * #reinitialize}, subsequent calls to {@link #isCancelled}, * {@link #isDone}, and {@code cancel} will return {@code true} * and calls to {@link #join} and related methods will result in * {@code CancellationException}. * * <p>This method may be overridden in subclasses, but if so, must * still ensure that these properties hold. In particular, the * {@code cancel} method itself must not throw exceptions. * * <p>This method is designed to be invoked by <em>other</em> * tasks. To terminate the current task, you can just return or * throw an unchecked exception from its computation method, or * invoke {@link #completeExceptionally}. * * @param mayInterruptIfRunning this value has no effect in the * default implementation because interrupts are not used to * control cancellation. * * @return {@code true} if this task is now cancelled */ publicbooleancancel(boolean mayInterruptIfRunning){ return setCompletion(CANCELLED) == CANCELLED; }
ForkJoinTask#setCompletion()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/** * Marks completion and wakes up threads waiting to join this task, * also clearing signal request bits. * * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL * @return completion status on exit */ privateintsetCompletion(int completion){ for (int s;;) { if ((s = status) < 0) return s; if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) { if (s != 0) synchronized (this) { notifyAll(); } return completion; } } }
设置任务运行状态为取消,然后唤醒在任务上等待的线程。
ForkJoinPool#shutdown()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * Tasks that are in the process of being submitted concurrently * during the course of this method may or may not be rejected. * * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link * java.lang.RuntimePermission}{@code ("modifyThread")} */ publicvoidshutdown(){ checkPermission(); shutdown = true; tryTerminate(false); }
/** * Attempts to cancel and/or stop all tasks, and reject all * subsequently submitted tasks. Tasks that are in the process of * being submitted or executed concurrently during the course of * this method may or may not be rejected. This method cancels * both existing and unexecuted tasks, in order to permit * termination in the presence of task dependencies. So the method * always returns an empty list (unlike the case for some other * Executors). * * @return an empty list * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link * java.lang.RuntimePermission}{@code ("modifyThread")} */ public List<Runnable> shutdownNow(){ checkPermission(); shutdown = true; tryTerminate(true); return Collections.emptyList(); }
/** * Runs up to three passes through workers: (0) Setting * termination status for each worker, followed by wakeups up to * queued workers; (1) helping cancel tasks; (2) interrupting * lagging threads (likely in external tasks, but possibly also * blocked in joins). Each pass repeats previous steps because of * potential lagging thread creation. */ privatevoidstartTerminating(){ cancelSubmissions(); for (int pass = 0; pass < 3; ++pass) { ForkJoinWorkerThread[] ws = workers; if (ws != null) { for (ForkJoinWorkerThread w : ws) { if (w != null) { w.terminate = true; if (pass > 0) { w.cancelTasks(); if (pass > 1 && !w.isInterrupted()) { try { w.interrupt(); } catch (SecurityException ignore) { } } } } } terminateWaiters(); } } } ``` 1. 取消`pool`中`submissionQueue`中的任务。 2. 将所有的工作线程的结束状态设置为true。 3. 取消所有工作线程的任务队列中未完成的任务。 4. 中断所有工作线程。 5. 结束还在等待的工作线程。 ## ForkJoinPool#awaitTermination() ``` java /** * Blocks until all tasks have completed execution after a shutdown * request, or the timeout occurs, or the current thread is * interrupted, whichever happens first. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return {@code true} if this executor terminated and * {@code false} if the timeout elapsed before termination * @throws InterruptedException if interrupted while waiting */ publicbooleanawaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.submissionLock; lock.lock(); try { for (;;) { if (isTerminated()) returntrue; if (nanos <= 0) returnfalse; nanos = termination.awaitNanos(nanos); } } finally { lock.unlock(); } }
/** * Returns {@code true} if all tasks have completed following shut down. * * @return {@code true} if all tasks have completed following shut down */ publicbooleanisTerminated(){ long c = ctl; return ((c & STOP_BIT) != 0L && (short)(c >>> TC_SHIFT) == -parallelism); }
同时满足pool的总控信息中有停止标记且总的工作线程数为0。
ForkJoinPool#isTerminating()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/** * Returns {@code true} if the process of termination has * commenced but not yet completed. This method may be useful for * debugging. A return of {@code true} reported a sufficient * period after shutdown may indicate that submitted tasks have * ignored or suppressed interruption, or are waiting for IO, * causing this executor not to properly terminate. (See the * advisory notes for class {@link ForkJoinTask} stating that * tasks should not normally entail blocking operations. But if * they do, they must abort them on interrupt.) * * @return {@code true} if terminating but not yet terminated */ publicbooleanisTerminating(){ long c = ctl; return ((c & STOP_BIT) != 0L && (short)(c >>> TC_SHIFT) != -parallelism); }
判断pool是否正在结束过程中,判断逻辑是pool的总控信息中有停止标记,但总的工作线程数不为0。
ForkJoinPool#isAtLeastTerminating()
1 2 3 4 5 6
/** * Returns true if terminating or terminated. Used by ForkJoinWorkerThread. */ finalbooleanisAtLeastTerminating(){ return (ctl & STOP_BIT) != 0L; }
判断pool是否正在结束过程中或者已经结束,只要pool的总控信息中有停止标记就可以了。
ForkJoinPool#isShutdown()
1 2 3 4 5 6 7 8
/** * Returns {@code true} if this pool has been shut down. * * @return {@code true} if this pool has been shut down */ publicbooleanisShutdown(){ return shutdown; }
/** * CASes slot i of array q from t to null. Caller must ensure q is * non-null and index is in range. */ privatestaticfinalbooleancasSlotNull(ForkJoinTask<?>[] q, int i, ForkJoinTask<?> t){ return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null); }
ForkJoinWorkerThread#writeSlot()
1 2 3 4 5 6 7 8 9
/** * Performs a volatile write of the given task at given slot of * array q. Caller must ensure q is non-null and index is in * range. This method is used only during resets and backouts. */ privatestaticfinalvoidwriteSlot(ForkJoinTask<?>[] q, int i, ForkJoinTask<?> t){ UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t); }
ForkJoinWorkerThread#peekTask()
1 2 3 4 5 6 7 8 9 10 11
/** * Returns next task, or null if empty or contended. */ final ForkJoinTask<?> peekTask() { int m; ForkJoinTask<?>[] q = queue; if (q == null || (m = q.length - 1) < 0) returnnull; int i = locallyFifo ? queueBase : (queueTop - 1); return q[i & m]; }
/** * Returns, but does not unschedule or execute, a task queued by * the current thread but not yet executed, if one is immediately * available. There is no guarantee that this task will actually * be polled or executed next. Conversely, this method may return * null even if a task exists but cannot be accessed without * contention with other threads. This method is designed * primarily to support extensions, and is unlikely to be useful * otherwise. * * <p>This method may be invoked only from within {@code * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return the next task, or {@code null} if none are available */ protectedstatic ForkJoinTask<?> peekNextLocalTask() { return ((ForkJoinWorkerThread) Thread.currentThread()) .peekTask(); }
/** * Drains tasks to given collection c. * * @return the number of tasks drained */ finalintdrainTasksTo(Collection<? super ForkJoinTask<?>> c){ int n = 0; while (queueBase != queueTop) { ForkJoinTask<?> t = deqTask(); if (t != null) { c.add(t); ++n; } } return n; }
/** * Removes all available unexecuted submitted and forked tasks * from scheduling queues and adds them to the given collection, * without altering their execution status. These may include * artificially generated or wrapped tasks. This method is * designed to be invoked only when the pool is known to be * quiescent. Invocations at other times may not remove all * tasks. A failure encountered while attempting to add elements * to collection {@code c} may result in elements being in * neither, either or both collections when the associated * exception is thrown. The behavior of this operation is * undefined if the specified collection is modified while the * operation is in progress. * * @param c the collection to transfer elements into * @return the number of elements transferred */ protectedintdrainTasksTo(Collection<? super ForkJoinTask<?>> c){ int count = 0; while (queueBase != queueTop) { ForkJoinTask<?> t = pollSubmission(); if (t != null) { c.add(t); ++count; } } ForkJoinWorkerThread[] ws; if ((short)(ctl >>> TC_SHIFT) > -parallelism && (ws = workers) != null) { for (ForkJoinWorkerThread w : ws) if (w != null) count += w.drainTasksTo(c); } return count; }
/** * Returns an estimate of the number of tasks in the queue. */ finalintgetQueueSize(){ return queueTop - queueBase; }
返回当前工作线程任务队列中的任务数量。
ForkJoinTask#getQueuedTaskCount()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/** * Returns an estimate of the number of tasks that have been * forked by the current worker thread but not yet executed. This * value may be useful for heuristic decisions about whether to * fork other tasks. * * <p>This method may be invoked only from within {@code * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return the number of tasks */ publicstaticintgetQueuedTaskCount(){ return ((ForkJoinWorkerThread) Thread.currentThread()) .getQueueSize(); }
/** * Gets and removes a local or stolen task. * * @return a task, if available */ final ForkJoinTask<?> pollTask() { ForkJoinWorkerThread[] ws; ForkJoinTask<?> t = pollLocalTask(); if (t != null || (ws = pool.workers) == null) return t; int n = ws.length; // cheap version of FJP.scan int steps = n << 1; int r = nextSeed(); int i = 0; while (i < steps) { ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)]; if (w != null && w.queueBase != w.queueTop && w.queue != null) { if ((t = w.deqTask()) != null) return t; i = 0; } } returnnull; }
ForkJoinWorkerThread#pollLocalTask()
1 2 3 4 5 6 7 8
/** * Gets and removes a local task. * * @return a task, if available */ final ForkJoinTask<?> pollLocalTask() { return locallyFifo ? locallyDeqTask() : popTask(); }
/** * Unschedules and returns, without executing, the next task * queued by the current thread but not yet executed, if one is * available, or if not available, a task that was forked by some * other thread, if available. Availability may be transient, so a * {@code null} result does not necessarily imply quiescence * of the pool this task is operating in. This method is designed * primarily to support extensions, and is unlikely to be useful * otherwise. * * <p>This method may be invoked only from within {@code * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return a task, or {@code null} if none are available */ protectedstatic ForkJoinTask<?> pollTask() { return ((ForkJoinWorkerThread) Thread.currentThread()) .pollTask(); }
/** * Returns the approximate (non-atomic) number of idle threads per * active thread. */ finalintidlePerActive(){ // Approximate at powers of two for small values, saturate past 4 int p = parallelism; int a = p + (int)(ctl >> AC_SHIFT); return (a > (p >>>= 1) ? 0 : a > (p >>>= 1) ? 1 : a > (p >>>= 1) ? 2 : a > (p >>>= 1) ? 4 : 8); }
/** * Returns an estimate of how many more locally queued tasks are * held by the current worker thread than there are other worker * threads that might steal them. This value may be useful for * heuristic decisions about whether to fork other tasks. In many * usages of ForkJoinTasks, at steady state, each worker should * aim to maintain a small constant surplus (for example, 3) of * tasks, and to process computations locally if this threshold is * exceeded. * * <p>This method may be invoked only from within {@code * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return the surplus number of tasks, which may be negative */ publicstaticintgetSurplusQueuedTaskCount(){ return ((ForkJoinWorkerThread) Thread.currentThread()) .getEstimatedSurplusTaskCount(); }
/** * Runs tasks until {@code pool.isQuiescent()}. We piggyback on * pool's active count ctl maintenance, but rather than blocking * when tasks cannot be found, we rescan until all others cannot * find tasks either. The bracketing by pool quiescerCounts * updates suppresses pool auto-shutdown mechanics that could * otherwise prematurely terminate the pool because all threads * appear to be inactive. */ finalvoidhelpQuiescePool(){ boolean active = true; ForkJoinTask<?> ps = currentSteal; // to restore below ForkJoinPool p = pool; // 1 p.addQuiescerCount(1); for (;;) { ForkJoinWorkerThread[] ws = p.workers; ForkJoinWorkerThread v = null; int n; // 2 if (queueTop != queueBase) // 3 v = this; // 4 elseif (ws != null && (n = ws.length) > 1) { ForkJoinWorkerThread w; // 5 int r = nextSeed(); // cheap version of FJP.scan int steps = n << 1; for (int i = 0; i < steps; ++i) { if ((w = ws[(i + r) & (n - 1)]) != null && w.queueBase != w.queueTop) { v = w; break; } } } if (v != null) { ForkJoinTask<?> t; if (!active) { active = true; p.addActiveCount(1); } // 6 if ((t = (v != this) ? v.deqTask() : locallyFifo ? locallyDeqTask() : popTask()) != null) { currentSteal = t; t.doExec(); currentSteal = ps; } } else { if (active) { active = false; p.addActiveCount(-1); } // 7 if (p.isQuiescent()) { p.addActiveCount(1); p.addQuiescerCount(-1); break; } } } }
/** * Returns {@code true} if all worker threads are currently idle. * An idle worker is one that cannot obtain a task to execute * because none are available to steal from other threads, and * there are no pending submissions to the pool. This method is * conservative; it might not return {@code true} immediately upon * idleness of all threads, but will eventually become true if * threads remain inactive. * * @return {@code true} if all threads are currently idle */ publicbooleanisQuiescent(){ return parallelism + (int)(ctl >> AC_SHIFT) + blockedCount == 0; }
逻辑就是当前活动线程数量为0,且阻塞等到join的工作线程数量也为0。
ForkJoinPool#addQuiescerCount()
1 2 3 4 5 6 7 8 9 10 11 12
/** * Increment or decrement quiescerCount. Needed only to prevent * triggering shutdown if a worker is transiently inactive while * checking quiescence. * * @param delta 1 for increment, -1 for decrement */ finalvoidaddQuiescerCount(int delta){ int c; do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset, c = quiescerCount, c + delta)); }
ForkJoinPool#addActiveCount()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/** * Directly increment or decrement active count without * queuing. This method is used to transiently assert inactivation * while checking quiescence. * * @param delta 1 for increment, -1 for decrement */ finalvoidaddActiveCount(int delta){ long d = delta < 0 ? -AC_UNIT : AC_UNIT; long c; do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl, ((c + d) & AC_MASK) | (c & ~AC_MASK))); }
/** * Returns an estimate of how many more locally queued tasks are * held by the current worker thread than there are other worker * threads that might steal them. This value may be useful for * heuristic decisions about whether to fork other tasks. In many * usages of ForkJoinTasks, at steady state, each worker should * aim to maintain a small constant surplus (for example, 3) of * tasks, and to process computations locally if this threshold is * exceeded. * * <p>This method may be invoked only from within {@code * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return the surplus number of tasks, which may be negative */ publicstaticintgetSurplusQueuedTaskCount(){ return ((ForkJoinWorkerThread) Thread.currentThread()) .getEstimatedSurplusTaskCount(); }
/** * Waits if necessary for the computation to complete, and then * retrieves its result. * * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread is not a * member of a ForkJoinPool and was interrupted while waiting */ publicfinal V get()throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? doJoin() : externalInterruptibleAwaitDone(0L); Throwable ex; if (s == CANCELLED) thrownew CancellationException(); if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) thrownew ExecutionException(ex); return getRawResult(); }
/** * Waits if necessary for at most the given time for the computation * to complete, and then retrieves its result, if available. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread is not a * member of a ForkJoinPool and was interrupted while waiting * @throws TimeoutException if the wait timed out */ publicfinal V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Thread t = Thread.currentThread(); if (t instanceof ForkJoinWorkerThread) { ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; long nanos = unit.toNanos(timeout); if (status >= 0) { boolean completed = false; if (w.unpushTask(this)) { try { completed = exec(); } catch (Throwable rex) { setExceptionalCompletion(rex); } } if (completed) setCompletion(NORMAL); elseif (status >= 0 && nanos > 0) w.pool.timedAwaitJoin(this, nanos); } } else { long millis = unit.toMillis(timeout); if (millis > 0) externalInterruptibleAwaitDone(millis); } int s = status; if (s != NORMAL) { Throwable ex; if (s == CANCELLED) thrownew CancellationException(); if (s != EXCEPTIONAL) thrownew TimeoutException(); if ((ex = getThrowableException()) != null) thrownew ExecutionException(ex); } return getRawResult(); }
/** * Possibly blocks the given worker waiting for joinMe to * complete or timeout * * @param joinMe the task * @param millis the wait time for underlying Object.wait */ finalvoidtimedAwaitJoin(ForkJoinTask<?> joinMe, long nanos){ // 1 while (joinMe.status >= 0) { // 2 Thread.interrupted(); if ((ctl & STOP_BIT) != 0L) { // 3 joinMe.cancelIgnoringExceptions(); break; } // 4 if (tryPreBlock()) { long last = System.nanoTime(); while (joinMe.status >= 0) { long millis = TimeUnit.NANOSECONDS.toMillis(nanos); if (millis <= 0) break; // 5 joinMe.tryAwaitDone(millis); if (joinMe.status < 0) break; if ((ctl & STOP_BIT) != 0L) { joinMe.cancelIgnoringExceptions(); break; } long now = System.nanoTime(); nanos -= now - last; last = now; } // 6 postBlock(); break; } } }
/** * Tries to block a worker thread until completed or timed out. * Uses Object.wait time argument conventions. * May fail on contention or interrupt. * * @param millis if > 0, wait time. */ finalvoidtryAwaitDone(long millis){ int s; try { if (((s = status) > 0 || (s == 0 && UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) && status > 0) { synchronized (this) { if (status > 0) wait(millis); } } } catch (InterruptedException ie) { // caller must check termination } }
ForkJoinTask#invoke()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/** * Commences performing this task, awaits its completion if * necessary, and returns its result, or throws an (unchecked) * {@code RuntimeException} or {@code Error} if the underlying * computation did so. * * @return the computed result */ publicfinal V invoke(){ if (doInvoke() != NORMAL) return reportResult(); else return getRawResult(); }
ForkJoinTask#doInvoke()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/** * Primary mechanics for invoke, quietlyInvoke. * @return status upon completion */ privateintdoInvoke(){ int s; boolean completed; if ((s = status) < 0) return s; try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) return setCompletion(NORMAL); else return doJoin(); }
..... publicstaticvoidinvokeAll(ForkJoinTask<?>... tasks){ Throwable ex = null; int last = tasks.length - 1; for (int i = last; i >= 0; --i) { ForkJoinTask<?> t = tasks[i]; if (t == null) { if (ex == null) ex = new NullPointerException(); } elseif (i != 0) t.fork(); elseif (t.doInvoke() < NORMAL && ex == null) ex = t.getException(); } for (int i = 1; i <= last; ++i) { ForkJoinTask<?> t = tasks[i]; if (t != null) { if (ex != null) t.cancel(false); elseif (t.doJoin() < NORMAL && ex == null) ex = t.getException(); } } if (ex != null) UNSAFE.throwException(ex); }
..... publicstatic <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks){ if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) { invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()])); return tasks; } @SuppressWarnings("unchecked") List<? extends ForkJoinTask<?>> ts = (List<? extends ForkJoinTask<?>>) tasks; Throwable ex = null; int last = ts.size() - 1; for (int i = last; i >= 0; --i) { ForkJoinTask<?> t = ts.get(i); if (t == null) { if (ex == null) ex = new NullPointerException(); } elseif (i != 0) t.fork(); elseif (t.doInvoke() < NORMAL && ex == null) ex = t.getException(); } for (int i = 1; i <= last; ++i) { ForkJoinTask<?> t = ts.get(i); if (t != null) { if (ex != null) t.cancel(false); elseif (t.doJoin() < NORMAL && ex == null) ex = t.getException(); } } if (ex != null) UNSAFE.throwException(ex); return tasks; }
ForkJoinTask#isCompletedAbnormally()
1 2 3 4 5 6 7 8
/** * Returns {@code true} if this task threw an exception or was cancelled. * * @return {@code true} if this task threw an exception or was cancelled */ publicfinalbooleanisCompletedAbnormally(){ return status < NORMAL; }
判断任务是否异常结束。
ForkJoinTask#isCompletedNormally()
1 2 3 4 5 6 7 8 9 10
/** * Returns {@code true} if this task completed without throwing an * exception and was not cancelled. * * @return {@code true} if this task completed without throwing an * exception and was not cancelled */ publicfinalbooleanisCompletedNormally(){ return status == NORMAL; }
判断任务是否正常完成。
ForkJoinTask#quietlyJoin()
1 2 3 4 5 6 7 8 9
/** * Joins this task, without returning its result or throwing its * exception. This method may be useful when processing * collections of tasks when some have been cancelled or otherwise * known to have aborted. */ publicfinalvoidquietlyJoin(){ doJoin(); }
ForkJoinTask#quietlyInvoke()
1 2 3 4 5 6 7 8
/** * Commences performing this task and awaits its completion if * necessary, without returning its result or throwing its * exception. */ publicfinalvoidquietlyInvoke(){ doInvoke(); }
quietlyJoin()和quietlyInvoke()它们不会返回结果或者抛出异常。
ForkJoinTask#reinitialize()
1 2 3 4 5 6 7 8
..... publicvoidreinitialize(){ if (status == EXCEPTIONAL) // 1 clearExceptionalCompletion(); else status = 0; }
/** * Tries to unschedule this task for execution. This method will * typically succeed if this task is the most recently forked task * by the current thread, and has not commenced executing in * another thread. This method may be useful when arranging * alternative local processing of tasks that could have been, but * were not, stolen. * * <p>This method may be invoked only from within {@code * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return {@code true} if unforked */ publicbooleantryUnfork(){ return ((ForkJoinWorkerThread) Thread.currentThread()) .unpushTask(this); }
方法内部会尝试将自身从当前工作线程的队列顶端移除,相当于在当前任务没有被调度执行之前取消了自己。
ForkJoinTask#getPool()
1 2 3 4 5 6 7 8 9 10 11 12
/** * Returns the pool hosting the current task execution, or null * if this task is executing outside of any ForkJoinPool. * * @see #inForkJoinPool * @return the pool, or {@code null} if none */ publicstatic ForkJoinPool getPool(){ Thread t = Thread.currentThread(); return (t instanceof ForkJoinWorkerThread) ? ((ForkJoinWorkerThread) t).pool : null; }
ForkJoinTask#inForkJoinPool()
1 2 3 4 5 6 7 8 9 10 11
/** * Returns {@code true} if the current thread is a {@link * ForkJoinWorkerThread} executing as a ForkJoinPool computation. * * @return {@code true} if the current thread is a {@link * ForkJoinWorkerThread} executing as a ForkJoinPool computation, * or {@code false} otherwise */ publicstaticbooleaninForkJoinPool(){ return Thread.currentThread() instanceof ForkJoinWorkerThread; }
/** * Blocks in accord with the given blocker. If the current thread * is a {@link ForkJoinWorkerThread}, this method possibly * arranges for a spare thread to be activated if necessary to * ensure sufficient parallelism while the current thread is blocked. * * <p>If the caller is not a {@link ForkJoinTask}, this method is * behaviorally equivalent to * <pre> {@code * while (!blocker.isReleasable()) * if (blocker.block()) * return; * }</pre> * * If the caller is a {@code ForkJoinTask}, then the pool may * first be expanded to ensure parallelism, and later adjusted. * * @param blocker the blocker * @throws InterruptedException if blocker.block did so */ publicstaticvoidmanagedBlock(ManagedBlocker blocker) throws InterruptedException { Thread t = Thread.currentThread(); if (t instanceof ForkJoinWorkerThread) { ForkJoinWorkerThread w = (ForkJoinWorkerThread) t; w.pool.awaitBlocker(blocker); } else { do {} while (!blocker.isReleasable() && !blocker.block()); } }
ForkJoin工作线程
会调用工作线程所在的Pool的awaitBlocker()。
非ForkJoin工作线程
相当于在blocker上阻塞,被唤醒后方法就结束了。
ForkJoinPool#awaitBlocker()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * If necessary, compensates for blocker, and blocks */ privatevoidawaitBlocker(ManagedBlocker blocker) throws InterruptedException { while (!blocker.isReleasable()) { if (tryPreBlock()) { try { do {} while (!blocker.isReleasable() && !blocker.block()); } finally { postBlock(); } break; } } }
/** * Interface for extending managed parallelism for tasks running * in {@link ForkJoinPool}s. * * <p>A {@code ManagedBlocker} provides two methods. Method * {@code isReleasable} must return {@code true} if blocking is * not necessary. Method {@code block} blocks the current thread * if necessary (perhaps internally invoking {@code isReleasable} * before actually blocking). These actions are performed by any * thread invoking {@link ForkJoinPool#managedBlock}. The * unusual methods in this API accommodate synchronizers that may, * but don't usually, block for long periods. Similarly, they * allow more efficient internal handling of cases in which * additional workers may be, but usually are not, needed to * ensure sufficient parallelism. Toward this end, * implementations of method {@code isReleasable} must be amenable * to repeated invocation. * * <p>For example, here is a ManagedBlocker based on a * ReentrantLock: * <pre> {@code * class ManagedLocker implements ManagedBlocker { * final ReentrantLock lock; * boolean hasLock = false; * ManagedLocker(ReentrantLock lock) { this.lock = lock; } * public boolean block() { * if (!hasLock) * lock.lock(); * return true; * } * public boolean isReleasable() { * return hasLock || (hasLock = lock.tryLock()); * } * }}</pre> * * <p>Here is a class that possibly blocks waiting for an * item on a given queue: * <pre> {@code * class QueueTaker<E> implements ManagedBlocker { * final BlockingQueue<E> queue; * volatile E item = null; * QueueTaker(BlockingQueue<E> q) { this.queue = q; } * public boolean block() throws InterruptedException { * if (item == null) * item = queue.take(); * return true; * } * public boolean isReleasable() { * return item != null || (item = queue.poll()) != null; * } * public E getItem() { // call after pool.managedBlock completes * return item; * } * }}</pre> */ publicstaticinterfaceManagedBlocker{ /** * Possibly blocks the current thread, for example waiting for * a lock or condition. * * @return {@code true} if no additional blocking is necessary * (i.e., if isReleasable would return true) * @throws InterruptedException if interrupted while waiting * (the method is not required to do so, but is allowed to) */ booleanblock()throws InterruptedException;
/** * Returns {@code true} if blocking is unnecessary. */ booleanisReleasable(); }
/** * Performs the given task, returning its result upon completion. * If the computation encounters an unchecked Exception or Error, * it is rethrown as the outcome of this invocation. Rethrown * exceptions behave in the same way as regular exceptions, but, * when possible, contain stack traces (as displayed for example * using {@code ex.printStackTrace()}) of both the current thread * as well as the thread actually encountering the exception; * minimally only the latter. * * @param task the task * @return the task's result * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ public <T> T invoke(ForkJoinTask<T> task){ Thread t = Thread.currentThread(); if (task == null) thrownew NullPointerException(); if (shutdown) thrownew RejectedExecutionException(); if ((t instanceof ForkJoinWorkerThread) && ((ForkJoinWorkerThread)t).pool == this) return task.invoke(); // bypass submit if in same pool else { addSubmission(task); return task.join(); } }
/** * Arranges for (asynchronous) execution of the given task. * * @param task the task * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ publicvoidexecute(ForkJoinTask<?> task){ if (task == null) thrownew NullPointerException(); forkOrSubmit(task); }
// AbstractExecutorService methods
/** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution */ publicvoidexecute(Runnable task){ if (task == null) thrownew NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = ForkJoinTask.adapt(task, null); forkOrSubmit(job); }
/** * Returns the factory used for constructing new workers. * * @return the factory used for constructing new workers */ // 1 public ForkJoinWorkerThreadFactory getFactory(){ return factory; }
/** * Returns the handler for internal worker threads that terminate * due to unrecoverable errors encountered while executing tasks. * * @return the handler, or {@code null} if none */ // 2 public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler(){ return ueh; }
/** * Returns the targeted parallelism level of this pool. * * @return the targeted parallelism level of this pool */ // 3 publicintgetParallelism(){ return parallelism; }
/** * Returns the number of worker threads that have started but not * yet terminated. The result returned by this method may differ * from {@link #getParallelism} when threads are created to * maintain parallelism when others are cooperatively blocked. * * @return the number of worker threads */ // 4 publicintgetPoolSize(){ return parallelism + (short)(ctl >>> TC_SHIFT); }
/** * Returns {@code true} if this pool uses local first-in-first-out * scheduling mode for forked tasks that are never joined. * * @return {@code true} if this pool uses async mode */ // 5 publicbooleangetAsyncMode(){ return locallyFifo; }
/** * Returns an estimate of the number of worker threads that are * not blocked waiting to join tasks or for other managed * synchronization. This method may overestimate the * number of running threads. * * @return the number of worker threads */ // 6 publicintgetRunningThreadCount(){ int r = parallelism + (int)(ctl >> AC_SHIFT); return (r <= 0) ? 0 : r; // suppress momentarily negative values }
/** * Returns an estimate of the number of threads that are currently * stealing or executing tasks. This method may overestimate the * number of active threads. * * @return the number of active threads */ // 7 publicintgetActiveThreadCount(){ int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount; return (r <= 0) ? 0 : r; // suppress momentarily negative values }
/** * Returns an estimate of the total number of tasks stolen from * one thread's work queue by another. The reported value * underestimates the actual total number of steals when the pool * is not quiescent. This value may be useful for monitoring and * tuning fork/join programs: in general, steal counts should be * high enough to keep threads busy, but low enough to avoid * overhead and contention across threads. * * @return the number of steals */ // 8 publiclonggetStealCount(){ return stealCount; }
/** * Returns an estimate of the total number of tasks currently held * in queues by worker threads (but not including tasks submitted * to the pool that have not begun executing). This value is only * an approximation, obtained by iterating across all threads in * the pool. This method may be useful for tuning task * granularities. * * @return the number of queued tasks */ // 9 publiclonggetQueuedTaskCount(){ long count = 0; ForkJoinWorkerThread[] ws; if ((short)(ctl >>> TC_SHIFT) > -parallelism && (ws = workers) != null) { for (ForkJoinWorkerThread w : ws) if (w != null) count -= w.queueBase - w.queueTop; // must read base first } return count; }
/** * Returns an estimate of the number of tasks submitted to this * pool that have not yet begun executing. This method may take * time proportional to the number of submissions. * * @return the number of queued submissions */ // 10 publicintgetQueuedSubmissionCount(){ return -queueBase + queueTop; }
/** * Returns {@code true} if there are any tasks submitted to this * pool that have not yet begun executing. * * @return {@code true} if there are any queued submissions */ // 11 publicbooleanhasQueuedSubmissions(){ return queueBase != queueTop; }