--- jsr166/src/jsr166y/ForkJoinPool.java 2012/10/28 22:36:01 1.135 +++ jsr166/src/jsr166y/ForkJoinPool.java 2012/10/29 17:23:34 1.136 @@ -43,14 +43,13 @@ import java.util.concurrent.locks.Condit * tasks that are never joined. * *

A static {@link #commonPool} is available and appropriate for - * most applications. The common pool is constructed upon first - * access, or upon usage by any ForkJoinTask that is not explictly - * submitted to a specified pool. Using the common pool normally - * reduces resource usage (its threads are slowly reclaimed during - * periods of non-use, and reinstated upon subsequent use). The - * common pool is by default constructed with default parameters, but - * these may be controlled by setting any or all of the three - * properties {@code + * most applications. The common pool is used by any ForkJoinTask that + * is not explicitly submitted to a specified pool. Using the common + * pool normally reduces resource usage (its threads are slowly + * reclaimed during periods of non-use, and reinstated upon subsequent + * use). The common pool is by default constructed with default + * parameters, but these may be controlled by setting any or all of + * the three properties {@code * java.util.concurrent.ForkJoinPool.common.{parallelism, * threadFactory, exceptionHandler}}. * @@ -236,16 +235,15 @@ public class ForkJoinPool extends Abstra * when locked remains available to check consistency. * * Recording WorkQueues. WorkQueues are recorded in the - * "workQueues" array that is created upon pool construction and - * expanded if necessary. Updates to the array while recording - * new workers and unrecording terminated ones are protected from - * each other by a lock but the array is otherwise concurrently - * readable, and accessed directly. To simplify index-based - * operations, the array size is always a power of two, and all - * readers must tolerate null slots. Shared (submission) queues - * are at even indices, worker queues at odd indices. Grouping - * them together in this way simplifies and speeds up task - * scanning. + * "workQueues" array that is created upon first use and expanded + * if necessary. Updates to the array while recording new workers + * and unrecording terminated ones are protected from each other + * by a lock but the array is otherwise concurrently readable, and + * accessed directly. To simplify index-based operations, the + * array size is always a power of two, and all readers must + * tolerate null slots. Shared (submission) queues are at even + * indices, worker queues at odd indices. Grouping them together + * in this way simplifies and speeds up task scanning. * * All worker thread creation is on-demand, triggered by task * submissions, replacement of terminated workers, and/or @@ -503,29 +501,6 @@ public class ForkJoinPool extends Abstra } /** - * A simple non-reentrant lock used for exclusion when managing - * queues and workers. We use a custom lock so that we can readily - * probe lock state in constructions that check among alternative - * actions. The lock is normally only very briefly held, and - * sometimes treated as a spinlock, but other usages block to - * reduce overall contention in those cases where locked code - * bodies perform allocation/resizing. - */ - static final class Mutex extends AbstractQueuedSynchronizer { - public final boolean tryAcquire(int ignore) { - return compareAndSetState(0, 1); - } - public final boolean tryRelease(int ignore) { - setState(0); - return true; - } - public final void lock() { acquire(0); } - public final void unlock() { release(0); } - public final boolean isHeldExclusively() { return getState() == 1; } - public final Condition newCondition() { return new ConditionObject(); } - } - - /** * Class for artificial tasks that are used to replace the target * of local joins if they are removed from an interior queue slot * in WorkQueue.tryRemoveAndExec. We don't need the proxy to @@ -813,23 +788,18 @@ public class ForkJoinPool extends Abstra /** * Version of tryUnpush for shared queues; called by non-FJ - * submitters. Conservatively fails to unpush if all workers - * are active unless there are multiple tasks in queue. + * submitters after prechecking that task probably exists. */ - final boolean trySharedUnpush(ForkJoinTask task, ForkJoinPool p) { + final boolean trySharedUnpush(ForkJoinTask t) { boolean success = false; - if (task != null && top != base && runState == 0 && - U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { + if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { try { - ForkJoinTask[] a; int n, s; - if ((a = array) != null && (n = (s = top) - base) > 0 && - (n > 1 || p == null || (int)(p.ctl >> AC_SHIFT) < 0)) { - int j = (((a.length - 1) & --s) << ASHIFT) + ABASE; - if (U.getObjectVolatile(a, j) == task && - U.compareAndSwapObject(a, j, task, null)) { - top = s; - success = true; - } + ForkJoinTask[] a; int s; + if ((a = array) != null && (s = top) != base && + U.compareAndSwapObject + (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { + top = s; + success = true; } } finally { runState = 0; // unlock @@ -1112,6 +1082,23 @@ public class ForkJoinPool extends Abstra public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; + + /** Property prefix for constructing common pool */ + private static final String propPrefix = + "java.util.concurrent.ForkJoinPool.common."; + + /** + * Common (static) pool. Non-null for public use unless a static + * construction exception, but internal usages must null-check on + * use. + */ + static final ForkJoinPool commonPool; + + /** + * Common pool parallelism. Must equal commonPool.parallelism. + */ + static final int commonPoolParallelism; + /** * Generator for assigning sequence numbers as pool names. */ @@ -1136,23 +1123,10 @@ public class ForkJoinPool extends Abstra */ private static final ThreadSubmitter submitters; - /** Common default pool */ - static volatile ForkJoinPool commonPool; - - // commonPool construction parameters - private static final String propPrefix = - "java.util.concurrent.ForkJoinPool.common."; - private static final Thread.UncaughtExceptionHandler commonPoolUEH; - private static final ForkJoinWorkerThreadFactory commonPoolFactory; - static final int commonPoolParallelism; - - /** Static initialization lock */ - private static final Mutex initializationLock; - // static constants /** - * Initial timeout value (in nanoseconds) for the tread triggering + * Initial timeout value (in nanoseconds) for the thread triggering * quiescence to park waiting for new work. On timeout, the thread * will instead try to shrink the number of workers. */ @@ -1281,20 +1255,57 @@ public class ForkJoinPool extends Abstra * empirically works OK on current JVMs. */ + volatile long stealCount; // collects worker counts volatile long ctl; // main pool control final int parallelism; // parallelism level final int localMode; // per-worker scheduling mode + volatile int nextWorkerNumber; // to create worker name string final int submitMask; // submit queue index bound int nextSeed; // for initializing worker seeds + volatile int mainLock; // spinlock for array updates volatile int runState; // shutdown status and seq WorkQueue[] workQueues; // main registry - final Mutex lock; // for registration - final Condition termination; // for awaitTermination final ForkJoinWorkerThreadFactory factory; // factory for new workers final Thread.UncaughtExceptionHandler ueh; // per-worker UEH - final AtomicLong stealCount; // collect counts when terminated - final AtomicInteger nextWorkerNumber; // to create worker name string - String workerNamePrefix; // to create worker name string + final String workerNamePrefix; // to create worker name string + + /* + * Mechanics for main lock protecting worker array updates. Uses + * the same strategy as ConcurrentHashMap bins -- a spinLock for + * normal cases, but falling back to builtin lock when (rarely) + * needed. See internal ConcurrentHashMap documentation for + * explanation. + */ + + static final int LOCK_WAITING = 2; // bit to indicate need for signal + static final int MAX_LOCK_SPINS = 1 << 8; + + private void tryAwaitMainLock() { + int spins = MAX_LOCK_SPINS, r = 0, h; + while (((h = mainLock) & 1) != 0) { + if (r == 0) + r = ThreadLocalRandom.current().nextInt(); // randomize spins + else if (spins >= 0) { + r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift + if (r >= 0) + --spins; + } + else if (U.compareAndSwapInt(this, MAINLOCK, h, h | LOCK_WAITING)) { + synchronized (this) { + if ((mainLock & LOCK_WAITING) != 0) { + try { + wait(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + else + notifyAll(); // possibly won race vs signaller + } + break; + } + } + } // Creating, registering, and deregistering workers @@ -1322,8 +1333,10 @@ public class ForkJoinPool extends Abstra * ForkJoinWorkerThread. */ final String nextWorkerName() { - return workerNamePrefix.concat - (Integer.toString(nextWorkerNumber.addAndGet(1))); + int n; + do {} while(!U.compareAndSwapInt(this, NEXTWORKERNUMBER, + n = nextWorkerNumber, ++n)); + return workerNamePrefix.concat(Integer.toString(n)); } /** @@ -1336,12 +1349,14 @@ public class ForkJoinPool extends Abstra * @param w the worker's queue */ final void registerWorker(WorkQueue w) { - Mutex lock = this.lock; - lock.lock(); + while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1)) + tryAwaitMainLock(); try { - WorkQueue[] ws = workQueues; - if (w != null && ws != null) { // skip on shutdown/failure - int rs, n = ws.length, m = n - 1; + WorkQueue[] ws; + if ((ws = workQueues) == null) + ws = workQueues = new WorkQueue[submitMask + 1]; + if (w != null) { + int rs, n = ws.length, m = n - 1; int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence w.seed = (s == 0) ? 1 : s; // ensure non-zero seed int r = (s << 1) | 1; // use odd-numbered indices @@ -1361,8 +1376,12 @@ public class ForkJoinPool extends Abstra runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN); } } finally { - lock.unlock(); + if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) { + mainLock = 0; + synchronized (this) { notifyAll(); }; + } } + } /** @@ -1375,19 +1394,24 @@ public class ForkJoinPool extends Abstra * @param ex the exception causing failure, or null if none */ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { - Mutex lock = this.lock; WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { w.runState = -1; // ensure runState is set - stealCount.getAndAdd(w.totalSteals + w.nsteals); + long steals = w.totalSteals + w.nsteals, sc; + do {} while(!U.compareAndSwapLong(this, STEALCOUNT, + sc = stealCount, sc + steals)); int idx = w.poolIndex; - lock.lock(); - try { // remove record from array + while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1)) + tryAwaitMainLock(); + try { WorkQueue[] ws = workQueues; if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w) ws[idx] = null; } finally { - lock.unlock(); + if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) { + mainLock = 0; + synchronized (this) { notifyAll(); }; + } } } @@ -1426,20 +1450,36 @@ public class ForkJoinPool extends Abstra for (int r = s.seed, m = submitMask;;) { WorkQueue[] ws; WorkQueue q; int k = r & m & SQMASK; // use only even indices - if (runState < 0 || (ws = workQueues) == null || ws.length <= k) + if (runState < 0) throw new RejectedExecutionException(); // shutting down + else if ((ws = workQueues) == null || ws.length <= k) { + while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1)) + tryAwaitMainLock(); + try { + if (workQueues == null) + workQueues = new WorkQueue[submitMask + 1]; + } finally { + if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) { + mainLock = 0; + synchronized (this) { notifyAll(); }; + } + } + } else if ((q = ws[k]) == null) { // create new queue WorkQueue nq = new WorkQueue(this, null, SHARED_QUEUE); - Mutex lock = this.lock; // construct outside lock - lock.lock(); - try { // recheck under lock + while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1)) + tryAwaitMainLock(); + try { int rs = runState; // to update seq if (ws == workQueues && ws[k] == null) { ws[k] = nq; runState = ((rs & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN)); } } finally { - lock.unlock(); + if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) { + mainLock = 0; + synchronized (this) { notifyAll(); }; + } } } else if (q.trySharedPush(task)) { @@ -1462,7 +1502,7 @@ public class ForkJoinPool extends Abstra static void submitToCommonPool(ForkJoinTask task) { ForkJoinPool p; if ((p = commonPool) == null) - p = ensureCommonPool(); + throw new RejectedExecutionException("Common Pool Unavailable"); p.doSubmit(task); } @@ -1476,13 +1516,20 @@ public class ForkJoinPool extends Abstra * @return true if successful */ static boolean tryUnsubmitFromCommonPool(ForkJoinTask task) { + // Peek, looking for task and eligibility before + // using trySharedUnpush to actually take it under lock ForkJoinPool p; WorkQueue[] ws; WorkQueue q; + ForkJoinTask[] a; int t, s, n; int k = submitters.get().seed & SQMASK; return ((p = commonPool) != null && (ws = p.workQueues) != null && ws.length > (k &= p.submitMask) && (q = ws[k]) != null && - q.trySharedUnpush(task, p)); + (a = q.array) != null && + (n = (t = q.top) - q.base) > 0 && + (n > 1 || (int)(p.ctl >> AC_SHIFT) < 0) && + (s = t - 1) >= 0 && s < a.length && a[s] == task && + q.trySharedUnpush(task)); } // Maintaining ctl counts @@ -1956,12 +2003,7 @@ public class ForkJoinPool extends Abstra */ private WorkQueue findNonEmptyStealQueue(WorkQueue w) { // Similar to loop in scan(), but ignoring submissions - int r; - if (w == null) // allow external callers - r = ThreadLocalRandom.current().nextInt(); - else { - r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; - } + int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; int step = (r >>> 16) | 1; for (WorkQueue[] ws;;) { int rs = runState, m; @@ -2102,22 +2144,28 @@ public class ForkJoinPool extends Abstra * @return true if now terminating or terminated */ private boolean tryTerminate(boolean now, boolean enable) { - Mutex lock = this.lock; for (long c;;) { if (((c = ctl) & STOP_BIT) != 0) { // already terminating if ((short)(c >>> TC_SHIFT) == -parallelism) { - lock.lock(); // don't need try/finally - termination.signalAll(); // signal when 0 workers - lock.unlock(); + synchronized(this) { + notifyAll(); // signal when 0 workers + } } return true; } if (runState >= 0) { // not yet enabled if (!enable) return false; - lock.lock(); - runState |= SHUTDOWN; - lock.unlock(); + while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1)) + tryAwaitMainLock(); + try { + runState |= SHUTDOWN; + } finally { + if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) { + mainLock = 0; + synchronized (this) { notifyAll(); }; + } + } } if (!now) { // check if idle & no tasks if ((int)(c >> AC_SHIFT) != -parallelism || @@ -2250,49 +2298,42 @@ public class ForkJoinPool extends Abstra // Use nearest power 2 for workQueues size. See Hackers Delight sec 3.2. int n = parallelism - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; - int size = (n + 1) << 1; // #slots = 2*#workers - this.submitMask = size - 1; // room for max # of submit queues - this.workQueues = new WorkQueue[size]; - this.termination = (this.lock = new Mutex()).newCondition(); - this.stealCount = new AtomicLong(); - this.nextWorkerNumber = new AtomicInteger(); + this.submitMask = ((n + 1) << 1) - 1; int pn = poolNumberGenerator.incrementAndGet(); StringBuilder sb = new StringBuilder("ForkJoinPool-"); sb.append(Integer.toString(pn)); sb.append("-worker-"); this.workerNamePrefix = sb.toString(); - lock.lock(); this.runState = 1; // set init flag - lock.unlock(); } /** - * Returns the common pool instance + * Constructor for common pool, suitable only for static initialization. + * Basically the same as above, but uses smallest possible initial footprint. + */ + ForkJoinPool(int parallelism, int submitMask, + ForkJoinWorkerThreadFactory factory, + Thread.UncaughtExceptionHandler handler) { + this.factory = factory; + this.ueh = handler; + this.submitMask = submitMask; + this.parallelism = parallelism; + long np = (long)(-parallelism); + this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); + this.localMode = LIFO_QUEUE; + this.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; + this.runState = 1; + } + + /** + * Returns the common pool instance. * * @return the common pool instance */ public static ForkJoinPool commonPool() { ForkJoinPool p; - return (p = commonPool) != null? p : ensureCommonPool(); - } - - private static ForkJoinPool ensureCommonPool() { - ForkJoinPool p; - if ((p = commonPool) == null) { - final Mutex lock = initializationLock; - lock.lock(); - try { - if ((p = commonPool) == null) { - p = commonPool = new ForkJoinPool(commonPoolParallelism, - commonPoolFactory, - commonPoolUEH, false); - // use a more informative name string for workers - p.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; - } - } finally { - lock.unlock(); - } - } + if ((p = commonPool) == null) + throw new Error("Common Pool Unavailable"); return p; } @@ -2558,7 +2599,7 @@ public class ForkJoinPool extends Abstra * @return the number of steals */ public long getStealCount() { - long count = stealCount.get(); + long count = stealCount; WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { for (int i = 1; i < ws.length; i += 2) { @@ -2688,7 +2729,7 @@ public class ForkJoinPool extends Abstra public String toString() { // Use a single pass through workQueues to collect counts long qt = 0L, qs = 0L; int rc = 0; - long st = stealCount.get(); + long st = stealCount; long c = ctl; WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { @@ -2826,19 +2867,21 @@ public class ForkJoinPool extends Abstra public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); - final Mutex lock = this.lock; - lock.lock(); - try { - for (;;) { - if (isTerminated()) - return true; - if (nanos <= 0) - return false; - nanos = termination.awaitNanos(nanos); + if (isTerminated()) + return true; + long startTime = System.nanoTime(); + boolean terminated = false; + synchronized(this) { + for (long waitTime = nanos, millis = 0L;;) { + if (terminated = isTerminated() || + waitTime <= 0L || + (millis = unit.toMillis(waitTime)) <= 0L) + break; + wait(millis); + waitTime = nanos - (System.nanoTime() - startTime); } - } finally { - lock.unlock(); } + return terminated; } /** @@ -2970,6 +3013,9 @@ public class ForkJoinPool extends Abstra private static final long PARKBLOCKER; private static final int ABASE; private static final int ASHIFT; + private static final long NEXTWORKERNUMBER; + private static final long STEALCOUNT; + private static final long MAINLOCK; static { poolNumberGenerator = new AtomicInteger(); @@ -2978,7 +3024,6 @@ public class ForkJoinPool extends Abstra defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); submitters = new ThreadSubmitter(); - initializationLock = new Mutex(); int s; try { U = getUnsafe(); @@ -2986,37 +3031,45 @@ public class ForkJoinPool extends Abstra Class ak = ForkJoinTask[].class; CTL = U.objectFieldOffset (k.getDeclaredField("ctl")); + NEXTWORKERNUMBER = U.objectFieldOffset + (k.getDeclaredField("nextWorkerNumber")); + STEALCOUNT = U.objectFieldOffset + (k.getDeclaredField("stealCount")); + MAINLOCK = U.objectFieldOffset + (k.getDeclaredField("mainLock")); Class tk = Thread.class; PARKBLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); ABASE = U.arrayBaseOffset(ak); s = U.arrayIndexScale(ak); + ASHIFT = 31 - Integer.numberOfLeadingZeros(s); } catch (Exception e) { throw new Error(e); } if ((s & (s-1)) != 0) throw new Error("data type scale not a power of two"); - ASHIFT = 31 - Integer.numberOfLeadingZeros(s); - - // Establish configuration for default pool - try { + try { // Establish common pool String pp = System.getProperty(propPrefix + "parallelism"); String fp = System.getProperty(propPrefix + "threadFactory"); String up = System.getProperty(propPrefix + "exceptionHandler"); + ForkJoinWorkerThreadFactory fac = (fp == null) ? + defaultForkJoinWorkerThreadFactory : + ((ForkJoinWorkerThreadFactory)ClassLoader. + getSystemClassLoader().loadClass(fp).newInstance()); + Thread.UncaughtExceptionHandler ueh = (up == null)? null : + ((Thread.UncaughtExceptionHandler)ClassLoader. + getSystemClassLoader().loadClass(up).newInstance()); int par; if ((pp == null || (par = Integer.parseInt(pp)) <= 0)) par = Runtime.getRuntime().availableProcessors(); + if (par > MAX_CAP) + par = MAX_CAP; commonPoolParallelism = par; - if (fp != null) - commonPoolFactory = (ForkJoinWorkerThreadFactory) - ClassLoader.getSystemClassLoader().loadClass(fp).newInstance(); - else - commonPoolFactory = defaultForkJoinWorkerThreadFactory; - if (up != null) - commonPoolUEH = (Thread.UncaughtExceptionHandler) - ClassLoader.getSystemClassLoader().loadClass(up).newInstance(); - else - commonPoolUEH = null; + int n = par - 1; // precompute submit mask + n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; + n |= n >>> 8; n |= n >>> 16; + int mask = ((n + 1) << 1) - 1; + commonPool = new ForkJoinPool(par, mask, fac, ueh); } catch (Exception e) { throw new Error(e); }