--- jsr166/src/jsr166e/ForkJoinPool.java 2012/10/21 04:14:31 1.5 +++ jsr166/src/jsr166e/ForkJoinPool.java 2012/10/30 16:05:35 1.10 @@ -5,6 +5,7 @@ */ package jsr166e; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -17,6 +18,7 @@ import java.util.concurrent.ExecutorServ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -41,14 +43,26 @@ import java.util.concurrent.locks.Condit * ForkJoinPool}s may also be appropriate for use with event-style * tasks that are never joined. * - *

A {@code ForkJoinPool} is constructed with a given target - * parallelism level; by default, equal to the number of available - * processors. The pool attempts to maintain enough active (or - * available) threads by dynamically adding, suspending, or resuming - * internal worker threads, even if some tasks are stalled waiting to - * join others. However, no such adjustments are guaranteed in the - * face of blocked IO or other unmanaged synchronization. The nested - * {@link ManagedBlocker} interface enables extension of the kinds of + *

A static {@link #commonPool} is available and appropriate for + * most applications. The common pool is used by any ForkJoinTask that + * is not explicitly submitted to a specified pool. Using the common + * pool normally reduces resource usage (its threads are slowly + * reclaimed during periods of non-use, and reinstated upon subsequent + * use). The common pool is by default constructed with default + * parameters, but these may be controlled by setting any or all of + * the three properties {@code + * java.util.concurrent.ForkJoinPool.common.{parallelism, + * threadFactory, exceptionHandler}}. + * + *

For applications that require separate or custom pools, a {@code + * ForkJoinPool} may be constructed with a given target parallelism + * level; by default, equal to the number of available processors. The + * pool attempts to maintain enough active (or available) threads by + * dynamically adding, suspending, or resuming internal worker + * threads, even if some tasks are stalled waiting to join + * others. However, no such adjustments are guaranteed in the face of + * blocked IO or other unmanaged synchronization. The nested {@link + * ManagedBlocker} interface enables extension of the kinds of * synchronization accommodated. * *

In addition to execution and lifecycle control methods, this @@ -93,23 +107,6 @@ import java.util.concurrent.locks.Condit * * * - *

Sample Usage. Normally a single {@code ForkJoinPool} is - * used for all parallel task execution in a program or subsystem. - * Otherwise, use would not usually outweigh the construction and - * bookkeeping overhead of creating a large set of threads. For - * example, a common pool could be used for the {@code SortTasks} - * illustrated in {@link RecursiveAction}. Because {@code - * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon - * daemon} mode, there is typically no need to explicitly {@link - * #shutdown} such a pool upon program exit. - * - *

 {@code
- * static final ForkJoinPool mainPool = new ForkJoinPool();
- * ...
- * public void sort(long[] array) {
- *   mainPool.invoke(new SortTask(array, 0, array.length));
- * }}
- * *

Implementation notes: This implementation restricts the * maximum number of running threads to 32767. Attempts to create * pools with greater than the maximum number result in @@ -239,16 +236,15 @@ public class ForkJoinPool extends Abstra * when locked remains available to check consistency. * * Recording WorkQueues. WorkQueues are recorded in the - * "workQueues" array that is created upon pool construction and - * expanded if necessary. Updates to the array while recording - * new workers and unrecording terminated ones are protected from - * each other by a lock but the array is otherwise concurrently - * readable, and accessed directly. To simplify index-based - * operations, the array size is always a power of two, and all - * readers must tolerate null slots. Shared (submission) queues - * are at even indices, worker queues at odd indices. Grouping - * them together in this way simplifies and speeds up task - * scanning. + * "workQueues" array that is created upon first use and expanded + * if necessary. Updates to the array while recording new workers + * and unrecording terminated ones are protected from each other + * by a lock but the array is otherwise concurrently readable, and + * accessed directly. To simplify index-based operations, the + * array size is always a power of two, and all readers must + * tolerate null slots. Shared (submission) queues are at even + * indices, worker queues at odd indices. Grouping them together + * in this way simplifies and speeds up task scanning. * * All worker thread creation is on-demand, triggered by task * submissions, replacement of terminated workers, and/or @@ -320,9 +316,11 @@ public class ForkJoinPool extends Abstra * * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will - * time out and terminate if the pool has remained quiescent for - * SHRINK_RATE nanosecs. This will slowly propagate, eventually - * terminating all workers after long periods of non-use. + * time out and terminate if the pool has remained quiescent for a + * given period -- a short period if there are more threads than + * parallelism, longer as the number of threads decreases. This + * will slowly propagate, eventually terminating all workers after + * periods of non-use. * * Shutdown and Termination. A call to shutdownNow atomically sets * a runState bit and then (non-atomically) sets each worker's @@ -504,29 +502,6 @@ public class ForkJoinPool extends Abstra } /** - * A simple non-reentrant lock used for exclusion when managing - * queues and workers. We use a custom lock so that we can readily - * probe lock state in constructions that check among alternative - * actions. The lock is normally only very briefly held, and - * sometimes treated as a spinlock, but other usages block to - * reduce overall contention in those cases where locked code - * bodies perform allocation/resizing. - */ - static final class Mutex extends AbstractQueuedSynchronizer { - public final boolean tryAcquire(int ignore) { - return compareAndSetState(0, 1); - } - public final boolean tryRelease(int ignore) { - setState(0); - return true; - } - public final void lock() { acquire(0); } - public final void unlock() { release(0); } - public final boolean isHeldExclusively() { return getState() == 1; } - public final Condition newCondition() { return new ConditionObject(); } - } - - /** * Class for artificial tasks that are used to replace the target * of local joins if they are removed from an interior queue slot * in WorkQueue.tryRemoveAndExec. We don't need the proxy to @@ -717,8 +692,7 @@ public class ForkJoinPool extends Abstra /** * Takes next task, if one exists, in LIFO order. Call only - * by owner in unshared queues. (We do not have a shared - * version of this method because it is never needed.) + * by owner in unshared queues. */ final ForkJoinTask pop() { ForkJoinTask[] a; ForkJoinTask t; int m; @@ -736,6 +710,90 @@ public class ForkJoinPool extends Abstra return null; } + final ForkJoinTask sharedPop() { + ForkJoinTask task = null; + if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { + try { + ForkJoinTask[] a; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { + for (int s; (s = top - 1) - base >= 0;) { + long j = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObject(a, j); + if (t == null) + break; + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + task = t; + break; + } + } + } + } finally { + runState = 0; + } + } + return task; + } + + /** + * Version of pop that takes top element only if it + * its root is the given CountedCompleter. + */ + final ForkJoinTask popCC(CountedCompleter root) { + ForkJoinTask[] a; int m; + if (root != null && (a = array) != null && (m = a.length - 1) >= 0) { + for (int s; (s = top - 1) - base >= 0;) { + long j = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObject(a, j); + if (t == null || !(t instanceof CountedCompleter) || + ((CountedCompleter)t).getRoot() != root) + break; + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + return t; + } + if (root.status < 0) + break; + } + } + return null; + } + + /** + * Shared version of popCC + */ + final ForkJoinTask sharedPopCC(CountedCompleter root) { + ForkJoinTask task = null; + if (root != null && + runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { + try { + ForkJoinTask[] a; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { + for (int s; (s = top - 1) - base >= 0;) { + long j = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObject(a, j); + if (t == null || !(t instanceof CountedCompleter) || + ((CountedCompleter)t).getRoot() != root) + break; + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + task = t; + break; + } + if (root.status < 0) + break; + } + } + } finally { + runState = 0; + } + } + return task; + } + /** * Takes a task in FIFO order if b is base of queue and a task * can be claimed without contention. Specialized versions @@ -813,6 +871,28 @@ public class ForkJoinPool extends Abstra } /** + * Version of tryUnpush for shared queues; called by non-FJ + * submitters after prechecking that task probably exists. + */ + final boolean trySharedUnpush(ForkJoinTask t) { + boolean success = false; + if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { + try { + ForkJoinTask[] a; int s; + if ((a = array) != null && (s = top) != base && + U.compareAndSwapObject + (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { + top = s; + success = true; + } + } finally { + runState = 0; // unlock + } + } + return success; + } + + /** * Polls the given task only if it is at the current base. */ final boolean pollFor(ForkJoinTask task) { @@ -1086,6 +1166,23 @@ public class ForkJoinPool extends Abstra public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; + + /** Property prefix for constructing common pool */ + private static final String propPrefix = + "java.util.concurrent.ForkJoinPool.common."; + + /** + * Common (static) pool. Non-null for public use unless a static + * construction exception, but internal usages must null-check on + * use. + */ + static final ForkJoinPool commonPool; + + /** + * Common pool parallelism. Must equal commonPool.parallelism. + */ + static final int commonPoolParallelism; + /** * Generator for assigning sequence numbers as pool names. */ @@ -1113,21 +1210,16 @@ public class ForkJoinPool extends Abstra // static constants /** - * The wakeup interval (in nanoseconds) for a worker waiting for a - * task when the pool is quiescent to instead try to shrink the - * number of workers. The exact value does not matter too - * much. It must be short enough to release resources during - * sustained periods of idleness, but not so short that threads - * are continually re-created. + * Initial timeout value (in nanoseconds) for the thread triggering + * quiescence to park waiting for new work. On timeout, the thread + * will instead try to shrink the number of workers. */ - private static final long SHRINK_RATE = - 4L * 1000L * 1000L * 1000L; // 4 seconds + private static final long IDLE_TIMEOUT = 1000L * 1000L * 1000L; // 1sec /** - * The timeout value for attempted shrinkage, includes - * some slop to cope with system timer imprecision. + * Timeout value when there are more threads than parallelism level */ - private static final long SHRINK_TIMEOUT = SHRINK_RATE - (SHRINK_RATE / 10); + private static final long FAST_IDLE_TIMEOUT = 100L * 1000L * 1000L; /** * The maximum stolen->joining link depth allowed in method @@ -1247,21 +1339,58 @@ public class ForkJoinPool extends Abstra * empirically works OK on current JVMs. */ + volatile long stealCount; // collects worker counts volatile long ctl; // main pool control final int parallelism; // parallelism level final int localMode; // per-worker scheduling mode + volatile int nextWorkerNumber; // to create worker name string final int submitMask; // submit queue index bound int nextSeed; // for initializing worker seeds + volatile int mainLock; // spinlock for array updates volatile int runState; // shutdown status and seq WorkQueue[] workQueues; // main registry - final Mutex lock; // for registration - final Condition termination; // for awaitTermination final ForkJoinWorkerThreadFactory factory; // factory for new workers final Thread.UncaughtExceptionHandler ueh; // per-worker UEH - final AtomicLong stealCount; // collect counts when terminated - final AtomicInteger nextWorkerNumber; // to create worker name string final String workerNamePrefix; // to create worker name string + /* + * Mechanics for main lock protecting worker array updates. Uses + * the same strategy as ConcurrentHashMap bins -- a spinLock for + * normal cases, but falling back to builtin lock when (rarely) + * needed. See internal ConcurrentHashMap documentation for + * explanation. + */ + + static final int LOCK_WAITING = 2; // bit to indicate need for signal + static final int MAX_LOCK_SPINS = 1 << 8; + + private void tryAwaitMainLock() { + int spins = MAX_LOCK_SPINS, r = 0, h; + while (((h = mainLock) & 1) != 0) { + if (r == 0) + r = ThreadLocalRandom.current().nextInt(); // randomize spins + else if (spins >= 0) { + r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift + if (r >= 0) + --spins; + } + else if (U.compareAndSwapInt(this, MAINLOCK, h, h | LOCK_WAITING)) { + synchronized (this) { + if ((mainLock & LOCK_WAITING) != 0) { + try { + wait(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + else + notifyAll(); // possibly won race vs signaller + } + break; + } + } + } + // Creating, registering, and deregistering workers /** @@ -1288,8 +1417,10 @@ public class ForkJoinPool extends Abstra * ForkJoinWorkerThread. */ final String nextWorkerName() { - return workerNamePrefix.concat - (Integer.toString(nextWorkerNumber.addAndGet(1))); + int n; + do {} while (!U.compareAndSwapInt(this, NEXTWORKERNUMBER, + n = nextWorkerNumber, ++n)); + return workerNamePrefix.concat(Integer.toString(n)); } /** @@ -1301,14 +1432,15 @@ public class ForkJoinPool extends Abstra * * @param w the worker's queue */ - final void registerWorker(WorkQueue w) { - Mutex lock = this.lock; - lock.lock(); + while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1)) + tryAwaitMainLock(); try { - WorkQueue[] ws = workQueues; - if (w != null && ws != null) { // skip on shutdown/failure - int rs, n = ws.length, m = n - 1; + WorkQueue[] ws; + if ((ws = workQueues) == null) + ws = workQueues = new WorkQueue[submitMask + 1]; + if (w != null) { + int rs, n = ws.length, m = n - 1; int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence w.seed = (s == 0) ? 1 : s; // ensure non-zero seed int r = (s << 1) | 1; // use odd-numbered indices @@ -1328,8 +1460,12 @@ public class ForkJoinPool extends Abstra runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN); } } finally { - lock.unlock(); + if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) { + mainLock = 0; + synchronized (this) { notifyAll(); }; + } } + } /** @@ -1342,19 +1478,24 @@ public class ForkJoinPool extends Abstra * @param ex the exception causing failure, or null if none */ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { - Mutex lock = this.lock; WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { w.runState = -1; // ensure runState is set - stealCount.getAndAdd(w.totalSteals + w.nsteals); + long steals = w.totalSteals + w.nsteals, sc; + do {} while (!U.compareAndSwapLong(this, STEALCOUNT, + sc = stealCount, sc + steals)); int idx = w.poolIndex; - lock.lock(); - try { // remove record from array + while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1)) + tryAwaitMainLock(); + try { WorkQueue[] ws = workQueues; if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w) ws[idx] = null; } finally { - lock.unlock(); + if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) { + mainLock = 0; + synchronized (this) { notifyAll(); }; + } } } @@ -1376,7 +1517,6 @@ public class ForkJoinPool extends Abstra U.throwException(ex); } - // Submissions /** @@ -1394,20 +1534,36 @@ public class ForkJoinPool extends Abstra for (int r = s.seed, m = submitMask;;) { WorkQueue[] ws; WorkQueue q; int k = r & m & SQMASK; // use only even indices - if (runState < 0 || (ws = workQueues) == null || ws.length <= k) + if (runState < 0) throw new RejectedExecutionException(); // shutting down + else if ((ws = workQueues) == null || ws.length <= k) { + while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1)) + tryAwaitMainLock(); + try { + if (workQueues == null) + workQueues = new WorkQueue[submitMask + 1]; + } finally { + if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) { + mainLock = 0; + synchronized (this) { notifyAll(); }; + } + } + } else if ((q = ws[k]) == null) { // create new queue WorkQueue nq = new WorkQueue(this, null, SHARED_QUEUE); - Mutex lock = this.lock; // construct outside lock - lock.lock(); - try { // recheck under lock + while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1)) + tryAwaitMainLock(); + try { int rs = runState; // to update seq if (ws == workQueues && ws[k] == null) { ws[k] = nq; runState = ((rs & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN)); } } finally { - lock.unlock(); + if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) { + mainLock = 0; + synchronized (this) { notifyAll(); }; + } } } else if (q.trySharedPush(task)) { @@ -1424,6 +1580,79 @@ public class ForkJoinPool extends Abstra } } + /** + * Submits the given (non-null) task to the common pool, if possible. + */ + static void submitToCommonPool(ForkJoinTask task) { + ForkJoinPool p; + if ((p = commonPool) == null) + throw new RejectedExecutionException("Common Pool Unavailable"); + p.doSubmit(task); + } + + /** + * Returns true if caller is (or may be) submitter to the common + * pool, and not all workers are active, and there appear to be + * tasks in the associated submission queue. + */ + static boolean canHelpCommonPool() { + ForkJoinPool p; WorkQueue[] ws; WorkQueue q; + int k = submitters.get().seed & SQMASK; + return ((p = commonPool) != null && + (int)(p.ctl >> AC_SHIFT) < 0 && + (ws = p.workQueues) != null && + ws.length > (k &= p.submitMask) && + (q = ws[k]) != null && + q.top - q.base > 0); + } + + /** + * Returns true if the given task was submitted to common pool + * and has not yet commenced execution, and is available for + * removal according to execution policies; if so removing the + * submission from the pool. + * + * @param task the task + * @return true if successful + */ + static boolean tryUnsubmitFromCommonPool(ForkJoinTask task) { + // Peek, looking for task and eligibility before + // using trySharedUnpush to actually take it under lock + ForkJoinPool p; WorkQueue[] ws; WorkQueue q; + ForkJoinTask[] a; int s; + int k = submitters.get().seed & SQMASK; + return ((p = commonPool) != null && + (int)(p.ctl >> AC_SHIFT) < 0 && + (ws = p.workQueues) != null && + ws.length > (k &= p.submitMask) && + (q = ws[k]) != null && + (a = q.array) != null && + (s = q.top - 1) - q.base >= 0 && + s >= 0 && s < a.length && + a[s] == task && + q.trySharedUnpush(task)); + } + + /** + * Tries to pop a task from common pool with given root + */ + static ForkJoinTask popCCFromCommonPool(CountedCompleter root) { + ForkJoinPool p; WorkQueue[] ws; WorkQueue q; + ForkJoinTask t; + int k = submitters.get().seed & SQMASK; + if (root != null && + (p = commonPool) != null && + (int)(p.ctl >> AC_SHIFT) < 0 && + (ws = p.workQueues) != null && + ws.length > (k &= p.submitMask) && + (q = ws[k]) != null && q.top - q.base > 0 && + root.status < 0 && + (t = q.sharedPopCC(root)) != null) + return t; + return null; + } + + // Maintaining ctl counts /** @@ -1435,7 +1664,7 @@ public class ForkJoinPool extends Abstra } /** - * Tries to activate or create a worker if too few are active. + * Tries to create one or activate one or more workers if too few are active. */ final void signalWork() { long c; int u; @@ -1536,7 +1765,7 @@ public class ForkJoinPool extends Abstra t = (ForkJoinTask)U.getObjectVolatile(a, i); if (q.base == b && ec >= 0 && t != null && U.compareAndSwapObject(a, i, t, null)) { - if (q.top - (q.base = b + 1) > 1) + if (q.top - (q.base = b + 1) > 0) signalWork(); // help pushes signal return t; } @@ -1582,12 +1811,10 @@ public class ForkJoinPool extends Abstra } } else if (w.eventCount < 0) { // already queued - if ((nr = w.rescans) > 0) { // continue rescanning - int ac = a + parallelism; - if (((w.rescans = (ac < nr) ? ac : nr - 1) & 3) == 0) - Thread.yield(); // yield before block - } - else { + int ac = a + parallelism; + if ((nr = w.rescans) > 0) // continue rescanning + w.rescans = (ac < nr) ? ac : nr - 1; + else if (((w.seed >>> 16) & ac) == 0) { // randomize park Thread.interrupted(); // clear status Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); @@ -1605,8 +1832,8 @@ public class ForkJoinPool extends Abstra /** * If inactivating worker w has caused the pool to become * quiescent, checks for pool termination, and, so long as this is - * not the only worker, waits for event for up to SHRINK_RATE - * nanosecs. On timeout, if ctl has not changed, terminates the + * not the only worker, waits for event for up to a given + * duration. On timeout, if ctl has not changed, terminates the * worker, which will in turn wake up another worker to possibly * repeat this process. * @@ -1617,20 +1844,21 @@ public class ForkJoinPool extends Abstra private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) { if (w.eventCount < 0 && !tryTerminate(false, false) && (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) { + int dc = -(short)(currentCtl >>> TC_SHIFT); + long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT; + long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop Thread wt = Thread.currentThread(); - Thread.yield(); // yield before block while (ctl == currentCtl) { - long startTime = System.nanoTime(); Thread.interrupted(); // timed variant of version in scan() U.putObject(wt, PARKBLOCKER, this); w.parker = wt; if (ctl == currentCtl) - U.park(false, SHRINK_RATE); + U.park(false, parkTime); w.parker = null; U.putObject(wt, PARKBLOCKER, null); if (ctl != currentCtl) break; - if (System.nanoTime() - startTime >= SHRINK_TIMEOUT && + if (deadline - System.nanoTime() <= 0L && U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) { w.eventCount = (w.eventCount + E_SEQ) | E_MASK; w.runState = -1; // shrink @@ -1915,7 +2143,6 @@ public class ForkJoinPool extends Abstra } } - /** * Runs tasks until {@code isQuiescent()}. We piggyback on * active count ctl maintenance, but rather than blocking @@ -1958,6 +2185,31 @@ public class ForkJoinPool extends Abstra } /** + * Restricted version of helpQuiescePool for non-FJ callers + */ + static void externalHelpQuiescePool() { + ForkJoinPool p; WorkQueue[] ws; WorkQueue q, sq; + ForkJoinTask[] a; int b; + ForkJoinTask t = null; + int k = submitters.get().seed & SQMASK; + if ((p = commonPool) != null && + (int)(p.ctl >> AC_SHIFT) < 0 && + (ws = p.workQueues) != null && + ws.length > (k &= p.submitMask) && + (q = ws[k]) != null) { + while (q.top - q.base > 0) { + if ((t = q.sharedPop()) != null) + break; + } + if (t == null && (sq = p.findNonEmptyStealQueue(q)) != null && + (b = sq.base) - sq.top < 0) + t = sq.pollAt(b); + if (t != null) + t.doExec(); + } + } + + /** * Gets and removes a local or stolen task for the given worker. * * @return a task, if available @@ -1990,6 +2242,20 @@ public class ForkJoinPool extends Abstra 8); } + /** + * Returns approximate submission queue length for the given caller + */ + static int getEstimatedSubmitterQueueLength() { + ForkJoinPool p; WorkQueue[] ws; WorkQueue q; + int k = submitters.get().seed & SQMASK; + return ((p = commonPool) != null && + p.runState >= 0 && + (ws = p.workQueues) != null && + ws.length > (k &= p.submitMask) && + (q = ws[k]) != null) ? + q.queueSize() : 0; + } + // Termination /** @@ -2007,22 +2273,28 @@ public class ForkJoinPool extends Abstra * @return true if now terminating or terminated */ private boolean tryTerminate(boolean now, boolean enable) { - Mutex lock = this.lock; for (long c;;) { if (((c = ctl) & STOP_BIT) != 0) { // already terminating if ((short)(c >>> TC_SHIFT) == -parallelism) { - lock.lock(); // don't need try/finally - termination.signalAll(); // signal when 0 workers - lock.unlock(); + synchronized (this) { + notifyAll(); // signal when 0 workers + } } return true; } if (runState >= 0) { // not yet enabled if (!enable) return false; - lock.lock(); - runState |= SHUTDOWN; - lock.unlock(); + while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1)) + tryAwaitMainLock(); + try { + runState |= SHUTDOWN; + } finally { + if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) { + mainLock = 0; + synchronized (this) { notifyAll(); }; + } + } } if (!now) { // check if idle & no tasks if ((int)(c >> AC_SHIFT) != -parallelism || @@ -2155,20 +2427,43 @@ public class ForkJoinPool extends Abstra // Use nearest power 2 for workQueues size. See Hackers Delight sec 3.2. int n = parallelism - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; - int size = (n + 1) << 1; // #slots = 2*#workers - this.submitMask = size - 1; // room for max # of submit queues - this.workQueues = new WorkQueue[size]; - this.termination = (this.lock = new Mutex()).newCondition(); - this.stealCount = new AtomicLong(); - this.nextWorkerNumber = new AtomicInteger(); + this.submitMask = ((n + 1) << 1) - 1; int pn = poolNumberGenerator.incrementAndGet(); StringBuilder sb = new StringBuilder("ForkJoinPool-"); sb.append(Integer.toString(pn)); sb.append("-worker-"); this.workerNamePrefix = sb.toString(); - lock.lock(); this.runState = 1; // set init flag - lock.unlock(); + } + + /** + * Constructor for common pool, suitable only for static initialization. + * Basically the same as above, but uses smallest possible initial footprint. + */ + ForkJoinPool(int parallelism, int submitMask, + ForkJoinWorkerThreadFactory factory, + Thread.UncaughtExceptionHandler handler) { + this.factory = factory; + this.ueh = handler; + this.submitMask = submitMask; + this.parallelism = parallelism; + long np = (long)(-parallelism); + this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); + this.localMode = LIFO_QUEUE; + this.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; + this.runState = 1; + } + + /** + * Returns the common pool instance. + * + * @return the common pool instance + */ + public static ForkJoinPool commonPool() { + ForkJoinPool p; + if ((p = commonPool) == null) + throw new Error("Common Pool Unavailable"); + return p; } // Execution methods @@ -2344,6 +2639,15 @@ public class ForkJoinPool extends Abstra } /** + * Returns the targeted parallelism level of the common pool. + * + * @return the targeted parallelism level of the common pool + */ + public static int getCommonPoolParallelism() { + return commonPoolParallelism; + } + + /** * Returns the number of worker threads that have started but not * yet terminated. The result returned by this method may differ * from {@link #getParallelism} when threads are created to @@ -2424,7 +2728,7 @@ public class ForkJoinPool extends Abstra * @return the number of steals */ public long getStealCount() { - long count = stealCount.get(); + long count = stealCount; WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { for (int i = 1; i < ws.length; i += 2) { @@ -2554,7 +2858,7 @@ public class ForkJoinPool extends Abstra public String toString() { // Use a single pass through workQueues to collect counts long qt = 0L, qs = 0L; int rc = 0; - long st = stealCount.get(); + long st = stealCount; long c = ctl; WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { @@ -2595,11 +2899,13 @@ public class ForkJoinPool extends Abstra } /** - * Initiates an orderly shutdown in which previously submitted - * tasks are executed, but no new tasks will be accepted. - * Invocation has no additional effect if already shut down. - * Tasks that are in the process of being submitted concurrently - * during the course of this method may or may not be rejected. + * Possibly initiates an orderly shutdown in which previously + * submitted tasks are executed, but no new tasks will be + * accepted. Invocation has no effect on execution state if this + * is the {@link #commonPool}, and no additional effect if + * already shut down. Tasks that are in the process of being + * submitted concurrently during the course of this method may or + * may not be rejected. * * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads @@ -2608,18 +2914,21 @@ public class ForkJoinPool extends Abstra */ public void shutdown() { checkPermission(); - tryTerminate(false, true); + if (this != commonPool) + tryTerminate(false, true); } /** - * Attempts to cancel and/or stop all tasks, and reject all - * subsequently submitted tasks. Tasks that are in the process of - * being submitted or executed concurrently during the course of - * this method may or may not be rejected. This method cancels - * both existing and unexecuted tasks, in order to permit - * termination in the presence of task dependencies. So the method - * always returns an empty list (unlike the case for some other - * Executors). + * Possibly attempts to cancel and/or stop all tasks, and reject + * all subsequently submitted tasks. Invocation has no effect on + * execution state if this is the {@link #commonPool}, and no + * additional effect if already shut down. Otherwise, tasks that + * are in the process of being submitted or executed concurrently + * during the course of this method may or may not be + * rejected. This method cancels both existing and unexecuted + * tasks, in order to permit termination in the presence of task + * dependencies. So the method always returns an empty list + * (unlike the case for some other Executors). * * @return an empty list * @throws SecurityException if a security manager exists and @@ -2629,7 +2938,8 @@ public class ForkJoinPool extends Abstra */ public List shutdownNow() { checkPermission(); - tryTerminate(true, true); + if (this != commonPool) + tryTerminate(true, true); return Collections.emptyList(); } @@ -2686,19 +2996,21 @@ public class ForkJoinPool extends Abstra public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); - final Mutex lock = this.lock; - lock.lock(); - try { - for (;;) { - if (isTerminated()) - return true; - if (nanos <= 0) - return false; - nanos = termination.awaitNanos(nanos); + if (isTerminated()) + return true; + long startTime = System.nanoTime(); + boolean terminated = false; + synchronized (this) { + for (long waitTime = nanos, millis = 0L;;) { + if (terminated = isTerminated() || + waitTime <= 0L || + (millis = unit.toMillis(waitTime)) <= 0L) + break; + wait(millis); + waitTime = nanos - (System.nanoTime() - startTime); } - } finally { - lock.unlock(); } + return terminated; } /** @@ -2830,6 +3142,9 @@ public class ForkJoinPool extends Abstra private static final long PARKBLOCKER; private static final int ABASE; private static final int ASHIFT; + private static final long NEXTWORKERNUMBER; + private static final long STEALCOUNT; + private static final long MAINLOCK; static { poolNumberGenerator = new AtomicInteger(); @@ -2845,17 +3160,48 @@ public class ForkJoinPool extends Abstra Class ak = ForkJoinTask[].class; CTL = U.objectFieldOffset (k.getDeclaredField("ctl")); + NEXTWORKERNUMBER = U.objectFieldOffset + (k.getDeclaredField("nextWorkerNumber")); + STEALCOUNT = U.objectFieldOffset + (k.getDeclaredField("stealCount")); + MAINLOCK = U.objectFieldOffset + (k.getDeclaredField("mainLock")); Class tk = Thread.class; PARKBLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); ABASE = U.arrayBaseOffset(ak); s = U.arrayIndexScale(ak); + ASHIFT = 31 - Integer.numberOfLeadingZeros(s); } catch (Exception e) { throw new Error(e); } if ((s & (s-1)) != 0) throw new Error("data type scale not a power of two"); - ASHIFT = 31 - Integer.numberOfLeadingZeros(s); + try { // Establish common pool + String pp = System.getProperty(propPrefix + "parallelism"); + String fp = System.getProperty(propPrefix + "threadFactory"); + String up = System.getProperty(propPrefix + "exceptionHandler"); + ForkJoinWorkerThreadFactory fac = (fp == null) ? + defaultForkJoinWorkerThreadFactory : + ((ForkJoinWorkerThreadFactory)ClassLoader. + getSystemClassLoader().loadClass(fp).newInstance()); + Thread.UncaughtExceptionHandler ueh = (up == null) ? null : + ((Thread.UncaughtExceptionHandler)ClassLoader. + getSystemClassLoader().loadClass(up).newInstance()); + int par; + if ((pp == null || (par = Integer.parseInt(pp)) <= 0)) + par = Runtime.getRuntime().availableProcessors(); + if (par > MAX_CAP) + par = MAX_CAP; + commonPoolParallelism = par; + int n = par - 1; // precompute submit mask + n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; + n |= n >>> 8; n |= n >>> 16; + int mask = ((n + 1) << 1) - 1; + commonPool = new ForkJoinPool(par, mask, fac, ueh); + } catch (Exception e) { + throw new Error(e); + } } /**