--- jsr166/src/jsr166y/ForkJoinPool.java 2011/04/01 20:20:37 1.100 +++ jsr166/src/jsr166y/ForkJoinPool.java 2011/07/01 01:15:06 1.106 @@ -19,7 +19,6 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReentrantLock; @@ -102,13 +101,12 @@ import java.util.concurrent.locks.Condit * daemon} mode, there is typically no need to explicitly {@link * #shutdown} such a pool upon program exit. * - *
+ *  
 {@code
  * static final ForkJoinPool mainPool = new ForkJoinPool();
  * ...
  * public void sort(long[] array) {
  *   mainPool.invoke(new SortTask(array, 0, array.length));
- * }
- * 
+ * }}
* *

Implementation notes: This implementation restricts the * maximum number of running threads to 32767. Attempts to create @@ -292,7 +290,7 @@ public class ForkJoinPool extends Abstra * "terminate" status, cancels all unprocessed tasks, and wakes up * all waiting workers. Detecting whether termination should * commence after a non-abrupt shutdown() call requires more work - * and bookkeeping. We need consensus about quiesence (i.e., that + * and bookkeeping. We need consensus about quiescence (i.e., that * there is no more work) which is reflected in active counts so * long as there are no current blockers, as well as possible * re-evaluations during independent changes in blocking or @@ -467,7 +465,7 @@ public class ForkJoinPool extends Abstra /** * Main pool control -- a long packed with: * AC: Number of active running workers minus target parallelism (16 bits) - * TC: Number of total workers minus target parallelism (16bits) + * TC: Number of total workers minus target parallelism (16 bits) * ST: true if pool is terminating (1 bit) * EC: the wait count of top waiting thread (15 bits) * ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits) @@ -795,7 +793,8 @@ public class ForkJoinPool extends Abstra else if (w.eventCount != v) return true; // update next time } - if ((int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 && + if ((!shutdown || !tryTerminate(false)) && + (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 && blockedCount == 0 && quiescerCount == 0) idleAwaitWork(w, nc, c, v); // quiescent for (boolean rescanned = false;;) { @@ -865,7 +864,7 @@ public class ForkJoinPool extends Abstra w.parked = false; if (w.eventCount != v) break; - else if (System.nanoTime() - startTime < + else if (System.nanoTime() - startTime < SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop Thread.interrupted(); // spurious wakeup else if (UNSAFE.compareAndSwapLong(this, ctlOffset, @@ -948,7 +947,7 @@ public class ForkJoinPool extends Abstra int pc = parallelism; do { ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w; - int e, ac, tc, rc, i; + int e, ac, tc, i; long c = ctl; int u = (int)(c >>> 32); if ((e = (int)c) < 0) { @@ -988,15 +987,15 @@ public class ForkJoinPool extends Abstra } /** - * Decrements blockedCount and increments active count + * Decrements blockedCount and increments active count. */ private void postBlock() { long c; do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, // no mask c = ctl, c + AC_UNIT)); int b; - do {} while(!UNSAFE.compareAndSwapInt(this, blockedCountOffset, - b = blockedCount, b - 1)); + do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset, + b = blockedCount, b - 1)); } /** @@ -1006,7 +1005,6 @@ public class ForkJoinPool extends Abstra * @param joinMe the task */ final void tryAwaitJoin(ForkJoinTask joinMe) { - int s; Thread.interrupted(); // clear interrupts before checking termination if (joinMe.status >= 0) { if (tryPreBlock()) { @@ -1020,7 +1018,7 @@ public class ForkJoinPool extends Abstra /** * Possibly blocks the given worker waiting for joinMe to - * complete or timeout + * complete or timeout. * * @param joinMe the task * @param millis the wait time for underlying Object.wait @@ -1056,7 +1054,7 @@ public class ForkJoinPool extends Abstra } /** - * If necessary, compensates for blocker, and blocks + * If necessary, compensates for blocker, and blocks. */ private void awaitBlocker(ManagedBlocker blocker) throws InterruptedException { @@ -1148,7 +1146,7 @@ public class ForkJoinPool extends Abstra ws[k] = w; nextWorkerIndex = k + 1; int m = g & SMASK; - g = k > m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); + g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); } } finally { scanGuard = g; @@ -1331,8 +1329,8 @@ public class ForkJoinPool extends Abstra */ final void addQuiescerCount(int delta) { int c; - do {} while(!UNSAFE.compareAndSwapInt(this, quiescerCountOffset, - c = quiescerCount, c + delta)); + do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset, + c = quiescerCount, c + delta)); } /** @@ -1357,12 +1355,12 @@ public class ForkJoinPool extends Abstra final int idlePerActive() { // Approximate at powers of two for small values, saturate past 4 int p = parallelism; - int a = p + (int)(ctl >> AC_SHIFT); - return (a > (p >>>= 1) ? 0 : - a > (p >>>= 1) ? 1 : - a > (p >>>= 1) ? 2 : - a > (p >>>= 1) ? 4 : - 8); + int a = p + (int)(ctl >> AC_SHIFT); + return (a > (p >>>= 1) ? 0 : + a > (p >>>= 1) ? 1 : + a > (p >>>= 1) ? 2 : + a > (p >>>= 1) ? 4 : + 8); } // Exported methods @@ -1685,7 +1683,7 @@ public class ForkJoinPool extends Abstra */ public int getRunningThreadCount() { int r = parallelism + (int)(ctl >> AC_SHIFT); - return r <= 0? 0 : r; // suppress momentarily negative values + return (r <= 0) ? 0 : r; // suppress momentarily negative values } /** @@ -1697,7 +1695,7 @@ public class ForkJoinPool extends Abstra */ public int getActiveThreadCount() { int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount; - return r <= 0? 0 : r; // suppress momentarily negative values + return (r <= 0) ? 0 : r; // suppress momentarily negative values } /** @@ -1852,9 +1850,9 @@ public class ForkJoinPool extends Abstra int ac = rc + blockedCount; String level; if ((c & STOP_BIT) != 0) - level = (tc == 0)? "Terminated" : "Terminating"; + level = (tc == 0) ? "Terminated" : "Terminating"; else - level = shutdown? "Shutting down" : "Running"; + level = shutdown ? "Shutting down" : "Running"; return super.toString() + "[" + level + ", parallelism = " + pc + @@ -2117,10 +2115,9 @@ public class ForkJoinPool extends Abstra modifyThreadPermission = new RuntimePermission("modifyThread"); defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); - int s; try { UNSAFE = getUnsafe(); - Class k = ForkJoinPool.class; + Class k = ForkJoinPool.class; ctlOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("ctl")); stealCountOffset = UNSAFE.objectFieldOffset @@ -2133,12 +2130,12 @@ public class ForkJoinPool extends Abstra (k.getDeclaredField("scanGuard")); nextWorkerNumberOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("nextWorkerNumber")); - Class a = ForkJoinTask[].class; - ABASE = UNSAFE.arrayBaseOffset(a); - s = UNSAFE.arrayIndexScale(a); } catch (Exception e) { throw new Error(e); } + Class a = ForkJoinTask[].class; + ABASE = UNSAFE.arrayBaseOffset(a); + int s = UNSAFE.arrayIndexScale(a); if ((s & (s-1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(s);