--- jsr166/src/jsr166y/ForkJoinPool.java 2009/07/20 21:54:51 1.8 +++ jsr166/src/jsr166y/ForkJoinPool.java 2009/07/22 01:36:51 1.13 @@ -56,6 +56,9 @@ import java.lang.reflect.*; * maximum number of running threads to 32767. Attempts to create * pools with greater than the maximum result in * IllegalArgumentExceptions. + * + * @since 1.7 + * @author Doug Lea */ public class ForkJoinPool extends AbstractExecutorService { @@ -81,7 +84,7 @@ public class ForkJoinPool extends Abstra * Returns a new worker thread operating in the given pool. * * @param pool the pool this thread works in - * @throws NullPointerException if pool is null; + * @throws NullPointerException if pool is null */ public ForkJoinWorkerThread newThread(ForkJoinPool pool); } @@ -153,7 +156,7 @@ public class ForkJoinPool extends Abstra /** * The uncaught exception handler used when any worker - * abrupty terminates + * abruptly terminates */ private Thread.UncaughtExceptionHandler ueh; @@ -226,9 +229,9 @@ public class ForkJoinPool extends Abstra private static int workerCountsFor(int t, int r) { return (t << 16) + r; } /** - * Add delta (which may be negative) to running count. This must + * Adds delta (which may be negative) to running count. This must * be called before (with negative arg) and after (with positive) - * any managed synchronization (i.e., mainly, joins) + * any managed synchronization (i.e., mainly, joins). * @param delta the number to add */ final void updateRunningCount(int delta) { @@ -237,7 +240,7 @@ public class ForkJoinPool extends Abstra } /** - * Add delta (which may be negative) to both total and running + * Adds delta (which may be negative) to both total and running * count. This must be called upon creation and termination of * worker threads. * @param delta the number to add @@ -273,7 +276,7 @@ public class ForkJoinPool extends Abstra /** * Try incrementing active count; fail on contention. Called by * workers before/during executing tasks. - * @return true on success; + * @return true on success */ final boolean tryIncrementActiveCount() { int c = runControl; @@ -281,8 +284,8 @@ public class ForkJoinPool extends Abstra } /** - * Try decrementing active count; fail on contention. - * Possibly trigger termination on success + * Tries decrementing active count; fails on contention. + * Possibly triggers termination on success. * Called by workers when they can't find tasks. * @return true on success */ @@ -297,7 +300,7 @@ public class ForkJoinPool extends Abstra } /** - * Return true if argument represents zero active count and + * Returns true if argument represents zero active count and * nonzero runstate, which is the triggering condition for * terminating on shutdown. */ @@ -341,7 +344,7 @@ public class ForkJoinPool extends Abstra } /** - * Creates a ForkJoinPool with the indicated parellelism level + * Creates a ForkJoinPool with the indicated parallelism level * threads, and using the default ForkJoinWorkerThreadFactory, * @param parallelism the number of worker threads * @throws IllegalArgumentException if parallelism less than or @@ -376,7 +379,7 @@ public class ForkJoinPool extends Abstra * @param parallelism the targeted number of worker threads * @param factory the factory for creating new threads * @throws IllegalArgumentException if parallelism less than or - * equal to zero, or greater than implementation limit. + * equal to zero, or greater than implementation limit * @throws NullPointerException if factory is null * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads @@ -421,7 +424,7 @@ public class ForkJoinPool extends Abstra } /** - * Return a good size for worker array given pool size. + * Returns a good size for worker array given pool size. * Currently requires size to be a power of two. */ private static int arraySizeFor(int ps) { @@ -429,8 +432,8 @@ public class ForkJoinPool extends Abstra } /** - * Create or resize array if necessary to hold newLength. - * Call only under exlusion or lock + * Creates or resizes array if necessary to hold newLength. + * Call only under exclusion or lock. * @return the array */ private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) { @@ -716,10 +719,10 @@ public class ForkJoinPool extends Abstra /** - * Sets the target paralleism level of this pool. + * Sets the target parallelism level of this pool. * @param parallelism the target parallelism * @throws IllegalArgumentException if parallelism less than or - * equal to zero or greater than maximum size bounds. + * equal to zero or greater than maximum size bounds * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link @@ -782,7 +785,7 @@ public class ForkJoinPool extends Abstra * Setting this value has no effect on current pool size. It * controls construction of new threads. * @throws IllegalArgumentException if negative or greater then - * internal implementation limit. + * internal implementation limit */ public void setMaximumPoolSize(int newMax) { if (newMax < 0 || newMax > MAX_THREADS) @@ -819,10 +822,10 @@ public class ForkJoinPool extends Abstra * worker threads only process asynchronous tasks. This method is * designed to be invoked only when pool is quiescent, and * typically only before any tasks are submitted. The effects of - * invocations at ather times may be unpredictable. + * invocations at other times may be unpredictable. * * @param async if true, use locally FIFO scheduling - * @return the previous mode. + * @return the previous mode */ public boolean setAsyncMode(boolean async) { boolean oldMode = locallyFifo; @@ -842,7 +845,7 @@ public class ForkJoinPool extends Abstra * Returns true if this pool uses local first-in-first-out * scheduling mode for forked tasks that are never joined. * - * @return true if this pool uses async mode. + * @return true if this pool uses async mode */ public boolean getAsyncMode() { return locallyFifo; @@ -863,7 +866,7 @@ public class ForkJoinPool extends Abstra * Returns an estimate of the number of threads that are currently * stealing or executing tasks. This method may overestimate the * number of active threads. - * @return the number of active threads. + * @return the number of active threads */ public int getActiveThreadCount() { return activeCountOf(runControl); @@ -873,7 +876,7 @@ public class ForkJoinPool extends Abstra * Returns an estimate of the number of threads that are currently * idle waiting for tasks. This method may underestimate the * number of idle threads. - * @return the number of idle threads. + * @return the number of idle threads */ final int getIdleThreadCount() { int c = runningCountOf(workerCounts) - activeCountOf(runControl); @@ -902,7 +905,7 @@ public class ForkJoinPool extends Abstra * tuning fork/join programs: In general, steal counts should be * high enough to keep threads busy, but low enough to avoid * overhead and contention across threads. - * @return the number of steals. + * @return the number of steals */ public long getStealCount() { return stealCount.get(); @@ -925,7 +928,7 @@ public class ForkJoinPool extends Abstra * an approximation, obtained by iterating across all threads in * the pool. This method may be useful for tuning task * granularities. - * @return the number of queued tasks. + * @return the number of queued tasks */ public long getQueuedTaskCount() { long count = 0; @@ -944,7 +947,7 @@ public class ForkJoinPool extends Abstra * Returns an estimate of the number tasks submitted to this pool * that have not yet begun executing. This method takes time * proportional to the number of submissions. - * @return the number of queued submissions. + * @return the number of queued submissions */ public int getQueuedSubmissionCount() { return submissionQueue.size(); @@ -953,7 +956,7 @@ public class ForkJoinPool extends Abstra /** * Returns true if there are any tasks submitted to this pool * that have not yet begun executing. - * @return {@code true} if there are any queued submissions. + * @return {@code true} if there are any queued submissions */ public boolean hasQueuedSubmissions() { return !submissionQueue.isEmpty(); @@ -973,7 +976,7 @@ public class ForkJoinPool extends Abstra * Removes all available unexecuted submitted and forked tasks * from scheduling queues and adds them to the given collection, * without altering their execution status. These may include - * artifically generated or wrapped tasks. This method id designed + * artificially generated or wrapped tasks. This method is designed * to be invoked only when the pool is known to be * quiescent. Invocations at other times may not remove all * tasks. A failure encountered while attempting to add elements @@ -1183,7 +1186,7 @@ public class ForkJoinPool extends Abstra } /** - * Possibly terminate when on shutdown state + * Possibly terminates when on shutdown state. */ private void terminateOnShutdown() { if (!hasQueuedSubmissions() && canTerminateOnShutdown(runControl)) @@ -1191,7 +1194,7 @@ public class ForkJoinPool extends Abstra } /** - * Clear out and cancel submissions + * Clears out and cancels submissions. */ private void cancelQueuedSubmissions() { ForkJoinTask task; @@ -1200,7 +1203,7 @@ public class ForkJoinPool extends Abstra } /** - * Clean out worker queues. + * Cleans out worker queues. */ private void cancelQueuedWorkerTasks() { final ReentrantLock lock = this.workerLock; @@ -1220,8 +1223,8 @@ public class ForkJoinPool extends Abstra } /** - * Set each worker's status to terminating. Requires lock to avoid - * conflicts with add/remove + * Sets each worker's status to terminating. Requires lock to avoid + * conflicts with add/remove. */ private void stopAllWorkers() { final ReentrantLock lock = this.workerLock; @@ -1241,7 +1244,7 @@ public class ForkJoinPool extends Abstra } /** - * Interrupt all unterminated workers. This is not required for + * Interrupts all unterminated workers. This is not required for * sake of internal control, but may help unstick user code during * shutdown. */ @@ -1311,7 +1314,7 @@ public class ForkJoinPool extends Abstra } /** - * Wake up waiter, returning false if known to already + * Wakes up waiter, returning false if known to already */ boolean signal() { ForkJoinWorkerThread t = thread; @@ -1323,7 +1326,7 @@ public class ForkJoinPool extends Abstra } /** - * Await release on sync + * Awaits release on sync. */ void awaitSyncRelease(ForkJoinPool p) { while (thread != null && !p.syncIsReleasable(this)) @@ -1331,7 +1334,7 @@ public class ForkJoinPool extends Abstra } /** - * Await resumption as spare + * Awaits resumption as spare. */ void awaitSpareRelease() { while (thread != null) { @@ -1371,7 +1374,7 @@ public class ForkJoinPool extends Abstra } /** - * Signal threads waiting to poll a task. Because method sync + * Signals threads waiting to poll a task. Because method sync * rechecks availability, it is OK to only proceed if queue * appears to be non-empty, and OK to skip under contention to * increment count (since some other thread succeeded). @@ -1458,7 +1461,7 @@ public class ForkJoinPool extends Abstra // Parallelism maintenance /** - * Decrement running count; if too low, add spare. + * Decrements running count; if too low, adds spare. * * Conceptually, all we need to do here is add or resume a * spare thread when one is about to block (and remove or @@ -1478,7 +1481,7 @@ public class ForkJoinPool extends Abstra * only be suspended or removed when they are idle, not * immediately when they aren't needed. So adding threads will * raise parallelism level for longer than necessary. Also, - * FJ applications often enounter highly transient peaks when + * FJ applications often encounter highly transient peaks when * many threads are blocked joining, but for less time than it * takes to create or resume spares. * @@ -1548,8 +1551,8 @@ public class ForkJoinPool extends Abstra } /** - * Add a spare worker if lock available and no more than the - * expected numbers of threads exist + * Adds a spare worker if lock available and no more than the + * expected numbers of threads exist. * @return true if successful */ private boolean tryAddSpare(int expectedCounts) { @@ -1582,7 +1585,7 @@ public class ForkJoinPool extends Abstra } /** - * Add the kth spare worker. On entry, pool coounts are already + * Adds the kth spare worker. On entry, pool counts are already * adjusted to reflect addition. */ private void createAndStartSpare(int k) { @@ -1604,11 +1607,11 @@ public class ForkJoinPool extends Abstra } /** - * Suspend calling thread w if there are excess threads. Called - * only from sync. Spares are enqueued in a Treiber stack - * using the same WaitQueueNodes as barriers. They are resumed - * mainly in preJoin, but are also woken on pool events that - * require all threads to check run state. + * Suspends calling thread w if there are excess threads. Called + * only from sync. Spares are enqueued in a Treiber stack using + * the same WaitQueueNodes as barriers. They are resumed mainly + * in preJoin, but are also woken on pool events that require all + * threads to check run state. * @param w the caller */ private boolean suspendIfSpare(ForkJoinWorkerThread w) { @@ -1629,7 +1632,7 @@ public class ForkJoinPool extends Abstra } /** - * Try to pop and resume a spare thread. + * Tries to pop and resume a spare thread. * @param updateCount if true, increment running count on success * @return true if successful */ @@ -1647,7 +1650,7 @@ public class ForkJoinPool extends Abstra } /** - * Pop and resume all spare threads. Same idea as ensureSync. + * Pops and resumes all spare threads. Same idea as ensureSync. * @return true if any spares released */ private boolean resumeAllSpares() { @@ -1665,7 +1668,7 @@ public class ForkJoinPool extends Abstra } /** - * Pop and shutdown excessive spare threads. Call only while + * Pops and shuts down excessive spare threads. Call only while * holding lock. This is not guaranteed to eliminate all excess * threads, only those suspended as spares, which are the ones * unlikely to be needed in the future. @@ -1717,9 +1720,9 @@ public class ForkJoinPool extends Abstra * Possibly blocks the current thread, for example waiting for * a lock or condition. * @return true if no additional blocking is necessary (i.e., - * if isReleasable would return true). + * if isReleasable would return true) * @throws InterruptedException if interrupted while waiting - * (the method is not required to do so, but is allowe to). + * (the method is not required to do so, but is allowed to). */ boolean block() throws InterruptedException; @@ -1756,7 +1759,7 @@ public class ForkJoinPool extends Abstra * attempt to maintain the pool's nominal parallelism; otherwise * activate a thread only if necessary to avoid complete * starvation. - * @throws InterruptedException if blocker.block did so. + * @throws InterruptedException if blocker.block did so */ public static void managedBlock(ManagedBlocker blocker, boolean maintainParallelism) @@ -1818,11 +1821,11 @@ public class ForkJoinPool extends Abstra private static long fieldOffset(String fieldName) throws NoSuchFieldException { - return _unsafe.objectFieldOffset + return UNSAFE.objectFieldOffset (ForkJoinPool.class.getDeclaredField(fieldName)); } - static final Unsafe _unsafe; + static final Unsafe UNSAFE; static final long eventCountOffset; static final long workerCountsOffset; static final long runControlOffset; @@ -1831,7 +1834,7 @@ public class ForkJoinPool extends Abstra static { try { - _unsafe = getUnsafe(); + UNSAFE = getUnsafe(); eventCountOffset = fieldOffset("eventCount"); workerCountsOffset = fieldOffset("workerCounts"); runControlOffset = fieldOffset("runControl"); @@ -1843,18 +1846,18 @@ public class ForkJoinPool extends Abstra } private boolean casEventCount(long cmp, long val) { - return _unsafe.compareAndSwapLong(this, eventCountOffset, cmp, val); + return UNSAFE.compareAndSwapLong(this, eventCountOffset, cmp, val); } private boolean casWorkerCounts(int cmp, int val) { - return _unsafe.compareAndSwapInt(this, workerCountsOffset, cmp, val); + return UNSAFE.compareAndSwapInt(this, workerCountsOffset, cmp, val); } private boolean casRunControl(int cmp, int val) { - return _unsafe.compareAndSwapInt(this, runControlOffset, cmp, val); + return UNSAFE.compareAndSwapInt(this, runControlOffset, cmp, val); } private boolean casSpareStack(WaitQueueNode cmp, WaitQueueNode val) { - return _unsafe.compareAndSwapObject(this, spareStackOffset, cmp, val); + return UNSAFE.compareAndSwapObject(this, spareStackOffset, cmp, val); } private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) { - return _unsafe.compareAndSwapObject(this, syncStackOffset, cmp, val); + return UNSAFE.compareAndSwapObject(this, syncStackOffset, cmp, val); } }