/** * Attempts to cancel execution of this task. This attempt will * fail if the task has already completed or could not be * cancelled for some other reason. If successful, and this task * has not started when {@code cancel} is called, execution of * this task is suppressed. After this method returns * successfully, unless there is an intervening call to {@link * #reinitialize}, subsequent calls to {@link #isCancelled}, * {@link #isDone}, and {@code cancel} will return {@code true} * and calls to {@link #join} and related methods will result in * {@code CancellationException}. * * <p>This method may be overridden in subclasses, but if so, must * still ensure that these properties hold. In particular, the * {@code cancel} method itself must not throw exceptions. * * <p>This method is designed to be invoked by <em>other</em> * tasks. To terminate the current task, you can just return or * throw an unchecked exception from its computation method, or * invoke {@link #completeExceptionally}. * * @param mayInterruptIfRunning this value has no effect in the * default implementation because interrupts are not used to * control cancellation. * * @return {@code true} if this task is now cancelled */ publicbooleancancel(boolean mayInterruptIfRunning){ return setCompletion(CANCELLED) == CANCELLED; }
ForkJoinTask#setCompletion()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/** * Marks completion and wakes up threads waiting to join this task, * also clearing signal request bits. * * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL * @return completion status on exit */ privateintsetCompletion(int completion){ for (int s;;) { if ((s = status) < 0) return s; if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) { if (s != 0) synchronized (this) { notifyAll(); } return completion; } } }
/** * Main pool control -- a long packed with: * AC: Number of active running workers minus target parallelism (16 bits) * TC: Number of total workers minus target parallelism (16bits) * ST: true if pool is terminating (1 bit) * EC: the wait count of top waiting thread (15 bits) * ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits) * * When convenient, we can extract the upper 32 bits of counts and * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e = * (int)ctl. The ec field is never accessed alone, but always * together with id and st. The offsets of counts by the target * parallelism and the positionings of fields makes it possible to * perform the most common checks via sign tests of fields: When * ac is negative, there are not enough active workers, when tc is * negative, there are not enough total workers, when id is * negative, there is at least one waiting worker, and when e is * negative, the pool is terminating. To deal with these possibly * negative fields, we use casts in and out of "short" and/or * signed shifts to maintain signedness. */ volatilelong ctl;
// units for incrementing and decrementing privatestaticfinallong TC_UNIT = 1L << TC_SHIFT; privatestaticfinallong AC_UNIT = 1L << AC_SHIFT;
// masks and units for dealing with u = (int)(ctl >>> 32) privatestaticfinalint UAC_SHIFT = AC_SHIFT - 32; privatestaticfinalint UTC_SHIFT = TC_SHIFT - 32; privatestaticfinalint UAC_MASK = SMASK << UAC_SHIFT; privatestaticfinalint UTC_MASK = SMASK << UTC_SHIFT; privatestaticfinalint UAC_UNIT = 1 << UAC_SHIFT; privatestaticfinalint UTC_UNIT = 1 << UTC_SHIFT;
// masks and units for dealing with e = (int)ctl privatestaticfinalint E_MASK = 0x7fffffff; // no STOP_BIT privatestaticfinalint EC_UNIT = 1 << EC_SHIFT;
/** * Arranges to asynchronously execute this task. While it is not * necessarily enforced, it is a usage error to fork a task more * than once unless it has completed and been reinitialized. * Subsequent modifications to the state of this task or any data * it operates on are not necessarily consistently observable by * any thread other than the one executing it unless preceded by a * call to {@link #join} or related methods, or a call to {@link * #isDone} returning {@code true}. * * <p>This method may be invoked only from within {@code * ForkJoinPool} computations (as may be determined using method * {@link #inForkJoinPool}). Attempts to invoke in other contexts * result in exceptions or errors, possibly including {@code * ClassCastException}. * * @return {@code this}, to simplify usage */ publicfinal ForkJoinTask<V> fork(){ ((ForkJoinWorkerThread) Thread.currentThread()) .pushTask(this); returnthis; }
/** * Pushes a task. Call only from this thread. * * @param t the task. Caller must ensure non-null. */ finalvoidpushTask(ForkJoinTask<?> t){ ForkJoinTask<?>[] q; int s, m; if ((q = queue) != null) { // ignore if queue removed // 1 long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE; // 2 UNSAFE.putOrderedObject(q, u, t); // 3 queueTop = s + 1; // or use putOrderedInt if ((s -= queueBase) <= 2) pool.signalWork(); elseif (s == m) // 4 growQueue(); } }
/** prefix for all proxy class names */ // #1 privatefinalstatic String proxyClassNamePrefix = "$Proxy";
/** parameter types of a proxy class constructor */ privatefinalstatic Class[] constructorParams = { InvocationHandler.class };
/** maps a class loader to the proxy class cache for that loader */ // #2 privatestatic Map loaderToCache = new WeakHashMap();
/** marks that a particular proxy class is currently being generated */ // #3 privatestatic Object pendingGenerationMarker = new Object();
/** next number to use for generation of unique proxy class names */ privatestaticlong nextUniqueNumber = 0; privatestatic Object nextUniqueNumberLock = new Object();
/** set of all generated proxy classes, for isProxyClass implementation */ // #4 privatestatic Map proxyClasses = Collections.synchronizedMap(new WeakHashMap());