--- jsr166/src/jsr166y/ForkJoinPool.java 2009/07/16 15:32:34 1.6 +++ jsr166/src/jsr166y/ForkJoinPool.java 2009/07/22 01:36:51 1.13 @@ -28,7 +28,7 @@ import java.lang.reflect.*; * most tasks spawn other subtasks (as do most ForkJoinTasks), as well * as the mixed execution of some plain Runnable- or Callable- based * activities along with ForkJoinTasks. When setting - * setAsyncMode, a ForkJoinPools may also be appropriate for + * {@code setAsyncMode}, a ForkJoinPools may also be appropriate for * use with fine-grained tasks that are never joined. Otherwise, other * ExecutorService implementations are typically more appropriate * choices. @@ -38,24 +38,27 @@ import java.lang.reflect.*; * adding, suspending, or resuming threads, even if some tasks are * waiting to join others. However, no such adjustments are performed * in the face of blocked IO or other unmanaged synchronization. The - * nested ManagedBlocker interface enables extension of + * nested {@code ManagedBlocker} interface enables extension of * the kinds of synchronization accommodated. The target parallelism - * level may also be changed dynamically (setParallelism) + * level may also be changed dynamically ({@code setParallelism}) * and thread construction can be limited using methods - * setMaximumPoolSize and/or - * setMaintainsParallelism. + * {@code setMaximumPoolSize} and/or + * {@code setMaintainsParallelism}. * *

In addition to execution and lifecycle control methods, this * class provides status check methods (for example - * getStealCount) that are intended to aid in developing, + * {@code getStealCount}) that are intended to aid in developing, * tuning, and monitoring fork/join applications. Also, method - * toString returns indications of pool state in a + * {@code toString} returns indications of pool state in a * convenient form for informal monitoring. * *

Implementation notes: This implementation restricts the * maximum number of running threads to 32767. Attempts to create * pools with greater than the maximum result in * IllegalArgumentExceptions. + * + * @since 1.7 + * @author Doug Lea */ public class ForkJoinPool extends AbstractExecutorService { @@ -81,7 +84,7 @@ public class ForkJoinPool extends Abstra * Returns a new worker thread operating in the given pool. * * @param pool the pool this thread works in - * @throws NullPointerException if pool is null; + * @throws NullPointerException if pool is null */ public ForkJoinWorkerThread newThread(ForkJoinPool pool); } @@ -153,7 +156,7 @@ public class ForkJoinPool extends Abstra /** * The uncaught exception handler used when any worker - * abrupty terminates + * abruptly terminates */ private Thread.UncaughtExceptionHandler ueh; @@ -226,9 +229,9 @@ public class ForkJoinPool extends Abstra private static int workerCountsFor(int t, int r) { return (t << 16) + r; } /** - * Add delta (which may be negative) to running count. This must + * Adds delta (which may be negative) to running count. This must * be called before (with negative arg) and after (with positive) - * any managed synchronization (i.e., mainly, joins) + * any managed synchronization (i.e., mainly, joins). * @param delta the number to add */ final void updateRunningCount(int delta) { @@ -237,7 +240,7 @@ public class ForkJoinPool extends Abstra } /** - * Add delta (which may be negative) to both total and running + * Adds delta (which may be negative) to both total and running * count. This must be called upon creation and termination of * worker threads. * @param delta the number to add @@ -273,7 +276,7 @@ public class ForkJoinPool extends Abstra /** * Try incrementing active count; fail on contention. Called by * workers before/during executing tasks. - * @return true on success; + * @return true on success */ final boolean tryIncrementActiveCount() { int c = runControl; @@ -281,8 +284,8 @@ public class ForkJoinPool extends Abstra } /** - * Try decrementing active count; fail on contention. - * Possibly trigger termination on success + * Tries decrementing active count; fails on contention. + * Possibly triggers termination on success. * Called by workers when they can't find tasks. * @return true on success */ @@ -297,7 +300,7 @@ public class ForkJoinPool extends Abstra } /** - * Return true if argument represents zero active count and + * Returns true if argument represents zero active count and * nonzero runstate, which is the triggering condition for * terminating on shutdown. */ @@ -333,7 +336,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public ForkJoinPool() { this(Runtime.getRuntime().availableProcessors(), @@ -341,7 +344,7 @@ public class ForkJoinPool extends Abstra } /** - * Creates a ForkJoinPool with the indicated parellelism level + * Creates a ForkJoinPool with the indicated parallelism level * threads, and using the default ForkJoinWorkerThreadFactory, * @param parallelism the number of worker threads * @throws IllegalArgumentException if parallelism less than or @@ -349,7 +352,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory); @@ -364,7 +367,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public ForkJoinPool(ForkJoinWorkerThreadFactory factory) { this(Runtime.getRuntime().availableProcessors(), factory); @@ -376,12 +379,12 @@ public class ForkJoinPool extends Abstra * @param parallelism the targeted number of worker threads * @param factory the factory for creating new threads * @throws IllegalArgumentException if parallelism less than or - * equal to zero, or greater than implementation limit. + * equal to zero, or greater than implementation limit * @throws NullPointerException if factory is null * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) { if (parallelism <= 0 || parallelism > MAX_THREADS) @@ -421,7 +424,7 @@ public class ForkJoinPool extends Abstra } /** - * Return a good size for worker array given pool size. + * Returns a good size for worker array given pool size. * Currently requires size to be a power of two. */ private static int arraySizeFor(int ps) { @@ -429,8 +432,8 @@ public class ForkJoinPool extends Abstra } /** - * Create or resize array if necessary to hold newLength. - * Call only under exlusion or lock + * Creates or resizes array if necessary to hold newLength. + * Call only under exclusion or lock. * @return the array */ private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) { @@ -689,7 +692,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public Thread.UncaughtExceptionHandler setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) { @@ -716,14 +719,14 @@ public class ForkJoinPool extends Abstra /** - * Sets the target paralleism level of this pool. + * Sets the target parallelism level of this pool. * @param parallelism the target parallelism * @throws IllegalArgumentException if parallelism less than or - * equal to zero or greater than maximum size bounds. + * equal to zero or greater than maximum size bounds * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public void setParallelism(int parallelism) { checkPermission(); @@ -758,7 +761,7 @@ public class ForkJoinPool extends Abstra /** * Returns the number of worker threads that have started but not * yet terminated. This result returned by this method may differ - * from getParallelism when threads are created to + * from {@code getParallelism} when threads are created to * maintain parallelism when others are cooperatively blocked. * * @return the number of worker threads @@ -782,7 +785,7 @@ public class ForkJoinPool extends Abstra * Setting this value has no effect on current pool size. It * controls construction of new threads. * @throws IllegalArgumentException if negative or greater then - * internal implementation limit. + * internal implementation limit */ public void setMaximumPoolSize(int newMax) { if (newMax < 0 || newMax > MAX_THREADS) @@ -819,10 +822,10 @@ public class ForkJoinPool extends Abstra * worker threads only process asynchronous tasks. This method is * designed to be invoked only when pool is quiescent, and * typically only before any tasks are submitted. The effects of - * invocations at ather times may be unpredictable. + * invocations at other times may be unpredictable. * * @param async if true, use locally FIFO scheduling - * @return the previous mode. + * @return the previous mode */ public boolean setAsyncMode(boolean async) { boolean oldMode = locallyFifo; @@ -840,9 +843,9 @@ public class ForkJoinPool extends Abstra /** * Returns true if this pool uses local first-in-first-out - * scheduling mode for forked tasks that are never joined. + * scheduling mode for forked tasks that are never joined. * - * @return true if this pool uses async mode. + * @return true if this pool uses async mode */ public boolean getAsyncMode() { return locallyFifo; @@ -863,7 +866,7 @@ public class ForkJoinPool extends Abstra * Returns an estimate of the number of threads that are currently * stealing or executing tasks. This method may overestimate the * number of active threads. - * @return the number of active threads. + * @return the number of active threads */ public int getActiveThreadCount() { return activeCountOf(runControl); @@ -873,7 +876,7 @@ public class ForkJoinPool extends Abstra * Returns an estimate of the number of threads that are currently * idle waiting for tasks. This method may underestimate the * number of idle threads. - * @return the number of idle threads. + * @return the number of idle threads */ final int getIdleThreadCount() { int c = runningCountOf(workerCounts) - activeCountOf(runControl); @@ -902,7 +905,7 @@ public class ForkJoinPool extends Abstra * tuning fork/join programs: In general, steal counts should be * high enough to keep threads busy, but low enough to avoid * overhead and contention across threads. - * @return the number of steals. + * @return the number of steals */ public long getStealCount() { return stealCount.get(); @@ -925,7 +928,7 @@ public class ForkJoinPool extends Abstra * an approximation, obtained by iterating across all threads in * the pool. This method may be useful for tuning task * granularities. - * @return the number of queued tasks. + * @return the number of queued tasks */ public long getQueuedTaskCount() { long count = 0; @@ -944,7 +947,7 @@ public class ForkJoinPool extends Abstra * Returns an estimate of the number tasks submitted to this pool * that have not yet begun executing. This method takes time * proportional to the number of submissions. - * @return the number of queued submissions. + * @return the number of queued submissions */ public int getQueuedSubmissionCount() { return submissionQueue.size(); @@ -953,7 +956,7 @@ public class ForkJoinPool extends Abstra /** * Returns true if there are any tasks submitted to this pool * that have not yet begun executing. - * @return true if there are any queued submissions. + * @return {@code true} if there are any queued submissions */ public boolean hasQueuedSubmissions() { return !submissionQueue.isEmpty(); @@ -973,11 +976,11 @@ public class ForkJoinPool extends Abstra * Removes all available unexecuted submitted and forked tasks * from scheduling queues and adds them to the given collection, * without altering their execution status. These may include - * artifically generated or wrapped tasks. This method id designed + * artificially generated or wrapped tasks. This method is designed * to be invoked only when the pool is known to be * quiescent. Invocations at other times may not remove all * tasks. A failure encountered while attempting to add elements - * to collection c may result in elements being in + * to collection {@code c} may result in elements being in * neither, either or both collections when the associated * exception is thrown. The behavior of this operation is * undefined if the specified collection is modified while the @@ -1045,7 +1048,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public void shutdown() { checkPermission(); @@ -1061,13 +1064,13 @@ public class ForkJoinPool extends Abstra * method may or may not be rejected. Unlike some other executors, * this method cancels rather than collects non-executed tasks * upon termination, so always returns an empty list. However, you - * can use method drainTasksTo before invoking this + * can use method {@code drainTasksTo} before invoking this * method to transfer unexecuted tasks to another collection. * @return an empty list * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}{@code ("modifyThread")}, */ public List shutdownNow() { checkPermission(); @@ -1076,28 +1079,28 @@ public class ForkJoinPool extends Abstra } /** - * Returns true if all tasks have completed following shut down. + * Returns {@code true} if all tasks have completed following shut down. * - * @return true if all tasks have completed following shut down + * @return {@code true} if all tasks have completed following shut down */ public boolean isTerminated() { return runStateOf(runControl) == TERMINATED; } /** - * Returns true if the process of termination has + * Returns {@code true} if the process of termination has * commenced but possibly not yet completed. * - * @return true if terminating + * @return {@code true} if terminating */ public boolean isTerminating() { return runStateOf(runControl) >= TERMINATING; } /** - * Returns true if this pool has been shut down. + * Returns {@code true} if this pool has been shut down. * - * @return true if this pool has been shut down + * @return {@code true} if this pool has been shut down */ public boolean isShutdown() { return runStateOf(runControl) >= SHUTDOWN; @@ -1110,8 +1113,8 @@ public class ForkJoinPool extends Abstra * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument - * @return true if this executor terminated and - * false if the timeout elapsed before termination + * @return {@code true} if this executor terminated and + * {@code false} if the timeout elapsed before termination * @throws InterruptedException if interrupted while waiting */ public boolean awaitTermination(long timeout, TimeUnit unit) @@ -1183,7 +1186,7 @@ public class ForkJoinPool extends Abstra } /** - * Possibly terminate when on shutdown state + * Possibly terminates when on shutdown state. */ private void terminateOnShutdown() { if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl)) @@ -1191,7 +1194,7 @@ public class ForkJoinPool extends Abstra } /** - * Clear out and cancel submissions + * Clears out and cancels submissions. */ private void cancelQueuedSubmissions() { ForkJoinTask task; @@ -1200,7 +1203,7 @@ public class ForkJoinPool extends Abstra } /** - * Clean out worker queues. + * Cleans out worker queues. */ private void cancelQueuedWorkerTasks() { final ReentrantLock lock = this.workerLock; @@ -1220,8 +1223,8 @@ public class ForkJoinPool extends Abstra } /** - * Set each worker's status to terminating. Requires lock to avoid - * conflicts with add/remove + * Sets each worker's status to terminating. Requires lock to avoid + * conflicts with add/remove. */ private void stopAllWorkers() { final ReentrantLock lock = this.workerLock; @@ -1241,7 +1244,7 @@ public class ForkJoinPool extends Abstra } /** - * Interrupt all unterminated workers. This is not required for + * Interrupts all unterminated workers. This is not required for * sake of internal control, but may help unstick user code during * shutdown. */ @@ -1311,7 +1314,7 @@ public class ForkJoinPool extends Abstra } /** - * Wake up waiter, returning false if known to already + * Wakes up waiter, returning false if known to already */ boolean signal() { ForkJoinWorkerThread t = thread; @@ -1323,7 +1326,7 @@ public class ForkJoinPool extends Abstra } /** - * Await release on sync + * Awaits release on sync. */ void awaitSyncRelease(ForkJoinPool p) { while (thread != null && !p.syncIsReleasable(this)) @@ -1331,7 +1334,7 @@ public class ForkJoinPool extends Abstra } /** - * Await resumption as spare + * Awaits resumption as spare. */ void awaitSpareRelease() { while (thread != null) { @@ -1371,7 +1374,7 @@ public class ForkJoinPool extends Abstra } /** - * Signal threads waiting to poll a task. Because method sync + * Signals threads waiting to poll a task. Because method sync * rechecks availability, it is OK to only proceed if queue * appears to be non-empty, and OK to skip under contention to * increment count (since some other thread succeeded). @@ -1458,7 +1461,7 @@ public class ForkJoinPool extends Abstra // Parallelism maintenance /** - * Decrement running count; if too low, add spare. + * Decrements running count; if too low, adds spare. * * Conceptually, all we need to do here is add or resume a * spare thread when one is about to block (and remove or @@ -1478,7 +1481,7 @@ public class ForkJoinPool extends Abstra * only be suspended or removed when they are idle, not * immediately when they aren't needed. So adding threads will * raise parallelism level for longer than necessary. Also, - * FJ applications often enounter highly transient peaks when + * FJ applications often encounter highly transient peaks when * many threads are blocked joining, but for less time than it * takes to create or resume spares. * @@ -1548,8 +1551,8 @@ public class ForkJoinPool extends Abstra } /** - * Add a spare worker if lock available and no more than the - * expected numbers of threads exist + * Adds a spare worker if lock available and no more than the + * expected numbers of threads exist. * @return true if successful */ private boolean tryAddSpare(int expectedCounts) { @@ -1582,7 +1585,7 @@ public class ForkJoinPool extends Abstra } /** - * Add the kth spare worker. On entry, pool coounts are already + * Adds the kth spare worker. On entry, pool counts are already * adjusted to reflect addition. */ private void createAndStartSpare(int k) { @@ -1604,11 +1607,11 @@ public class ForkJoinPool extends Abstra } /** - * Suspend calling thread w if there are excess threads. Called - * only from sync. Spares are enqueued in a Treiber stack - * using the same WaitQueueNodes as barriers. They are resumed - * mainly in preJoin, but are also woken on pool events that - * require all threads to check run state. + * Suspends calling thread w if there are excess threads. Called + * only from sync. Spares are enqueued in a Treiber stack using + * the same WaitQueueNodes as barriers. They are resumed mainly + * in preJoin, but are also woken on pool events that require all + * threads to check run state. * @param w the caller */ private boolean suspendIfSpare(ForkJoinWorkerThread w) { @@ -1629,7 +1632,7 @@ public class ForkJoinPool extends Abstra } /** - * Try to pop and resume a spare thread. + * Tries to pop and resume a spare thread. * @param updateCount if true, increment running count on success * @return true if successful */ @@ -1647,7 +1650,7 @@ public class ForkJoinPool extends Abstra } /** - * Pop and resume all spare threads. Same idea as ensureSync. + * Pops and resumes all spare threads. Same idea as ensureSync. * @return true if any spares released */ private boolean resumeAllSpares() { @@ -1665,7 +1668,7 @@ public class ForkJoinPool extends Abstra } /** - * Pop and shutdown excessive spare threads. Call only while + * Pops and shuts down excessive spare threads. Call only while * holding lock. This is not guaranteed to eliminate all excess * threads, only those suspended as spares, which are the ones * unlikely to be needed in the future. @@ -1690,8 +1693,8 @@ public class ForkJoinPool extends Abstra /** * Interface for extending managed parallelism for tasks running * in ForkJoinPools. A ManagedBlocker provides two methods. - * Method isReleasable must return true if blocking is not - * necessary. Method block blocks the current thread + * Method {@code isReleasable} must return true if blocking is not + * necessary. Method {@code block} blocks the current thread * if necessary (perhaps internally invoking isReleasable before * actually blocking.). *

For example, here is a ManagedBlocker based on a @@ -1717,9 +1720,9 @@ public class ForkJoinPool extends Abstra * Possibly blocks the current thread, for example waiting for * a lock or condition. * @return true if no additional blocking is necessary (i.e., - * if isReleasable would return true). + * if isReleasable would return true) * @throws InterruptedException if interrupted while waiting - * (the method is not required to do so, but is allowe to). + * (the method is not required to do so, but is allowed to). */ boolean block() throws InterruptedException; @@ -1734,7 +1737,7 @@ public class ForkJoinPool extends Abstra * is a ForkJoinWorkerThread, this method possibly arranges for a * spare thread to be activated if necessary to ensure parallelism * while the current thread is blocked. If - * maintainParallelism is true and the pool supports + * {@code maintainParallelism} is true and the pool supports * it ({@link #getMaintainsParallelism}), this method attempts to * maintain the pool's nominal parallelism. Otherwise if activates * a thread only if necessary to avoid complete starvation. This @@ -1756,7 +1759,7 @@ public class ForkJoinPool extends Abstra * attempt to maintain the pool's nominal parallelism; otherwise * activate a thread only if necessary to avoid complete * starvation. - * @throws InterruptedException if blocker.block did so. + * @throws InterruptedException if blocker.block did so */ public static void managedBlock(ManagedBlocker blocker, boolean maintainParallelism) @@ -1818,11 +1821,11 @@ public class ForkJoinPool extends Abstra private static long fieldOffset(String fieldName) throws NoSuchFieldException { - return _unsafe.objectFieldOffset + return UNSAFE.objectFieldOffset (ForkJoinPool.class.getDeclaredField(fieldName)); } - static final Unsafe _unsafe; + static final Unsafe UNSAFE; static final long eventCountOffset; static final long workerCountsOffset; static final long runControlOffset; @@ -1831,7 +1834,7 @@ public class ForkJoinPool extends Abstra static { try { - _unsafe = getUnsafe(); + UNSAFE = getUnsafe(); eventCountOffset = fieldOffset("eventCount"); workerCountsOffset = fieldOffset("workerCounts"); runControlOffset = fieldOffset("runControl"); @@ -1843,18 +1846,18 @@ public class ForkJoinPool extends Abstra } private boolean casEventCount(long cmp, long val) { - return _unsafe.compareAndSwapLong(this, eventCountOffset, cmp, val); + return UNSAFE.compareAndSwapLong(this, eventCountOffset, cmp, val); } private boolean casWorkerCounts(int cmp, int val) { - return _unsafe.compareAndSwapInt(this, workerCountsOffset, cmp, val); + return UNSAFE.compareAndSwapInt(this, workerCountsOffset, cmp, val); } private boolean casRunControl(int cmp, int val) { - return _unsafe.compareAndSwapInt(this, runControlOffset, cmp, val); + return UNSAFE.compareAndSwapInt(this, runControlOffset, cmp, val); } private boolean casSpareStack(WaitQueueNode cmp, WaitQueueNode val) { - return _unsafe.compareAndSwapObject(this, spareStackOffset, cmp, val); + return UNSAFE.compareAndSwapObject(this, spareStackOffset, cmp, val); } private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) { - return _unsafe.compareAndSwapObject(this, syncStackOffset, cmp, val); + return UNSAFE.compareAndSwapObject(this, syncStackOffset, cmp, val); } }