--- jsr166/src/jsr166e/ForkJoinPool.java 2012/08/13 15:52:33 1.1 +++ jsr166/src/jsr166e/ForkJoinPool.java 2012/10/28 22:35:45 1.7 @@ -5,6 +5,7 @@ */ package jsr166e; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -17,6 +18,7 @@ import java.util.concurrent.ExecutorServ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -41,14 +43,27 @@ import java.util.concurrent.locks.Condit * ForkJoinPool}s may also be appropriate for use with event-style * tasks that are never joined. * - *

A {@code ForkJoinPool} is constructed with a given target - * parallelism level; by default, equal to the number of available - * processors. The pool attempts to maintain enough active (or - * available) threads by dynamically adding, suspending, or resuming - * internal worker threads, even if some tasks are stalled waiting to - * join others. However, no such adjustments are guaranteed in the - * face of blocked IO or other unmanaged synchronization. The nested - * {@link ManagedBlocker} interface enables extension of the kinds of + *

A static {@link #commonPool} is available and appropriate for + * most applications. The common pool is constructed upon first + * access, or upon usage by any ForkJoinTask that is not explictly + * submitted to a specified pool. Using the common pool normally + * reduces resource usage (its threads are slowly reclaimed during + * periods of non-use, and reinstated upon subsequent use). The + * common pool is by default constructed with default parameters, but + * these may be controlled by setting any or all of the three + * properties {@code + * java.util.concurrent.ForkJoinPool.common.{parallelism, + * threadFactory, exceptionHandler}}. + * + *

For applications that require separate or custom pools, a {@code + * ForkJoinPool} may be constructed with a given target parallelism + * level; by default, equal to the number of available processors. The + * pool attempts to maintain enough active (or available) threads by + * dynamically adding, suspending, or resuming internal worker + * threads, even if some tasks are stalled waiting to join + * others. However, no such adjustments are guaranteed in the face of + * blocked IO or other unmanaged synchronization. The nested {@link + * ManagedBlocker} interface enables extension of the kinds of * synchronization accommodated. * *

In addition to execution and lifecycle control methods, this @@ -93,23 +108,6 @@ import java.util.concurrent.locks.Condit * * * - *

Sample Usage. Normally a single {@code ForkJoinPool} is - * used for all parallel task execution in a program or subsystem. - * Otherwise, use would not usually outweigh the construction and - * bookkeeping overhead of creating a large set of threads. For - * example, a common pool could be used for the {@code SortTasks} - * illustrated in {@link RecursiveAction}. Because {@code - * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon - * daemon} mode, there is typically no need to explicitly {@link - * #shutdown} such a pool upon program exit. - * - *

 {@code
- * static final ForkJoinPool mainPool = new ForkJoinPool();
- * ...
- * public void sort(long[] array) {
- *   mainPool.invoke(new SortTask(array, 0, array.length));
- * }}
- * *

Implementation notes: This implementation restricts the * maximum number of running threads to 32767. Attempts to create * pools with greater than the maximum number result in @@ -320,9 +318,11 @@ public class ForkJoinPool extends Abstra * * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will - * time out and terminate if the pool has remained quiescent for - * SHRINK_RATE nanosecs. This will slowly propagate, eventually - * terminating all workers after long periods of non-use. + * time out and terminate if the pool has remained quiescent for a + * given period -- a short period if there are more threads than + * parallelism, longer as the number of threads decreases. This + * will slowly propagate, eventually terminating all workers after + * periods of non-use. * * Shutdown and Termination. A call to shutdownNow atomically sets * a runState bit and then (non-atomically) sets each worker's @@ -813,6 +813,33 @@ public class ForkJoinPool extends Abstra } /** + * Version of tryUnpush for shared queues; called by non-FJ + * submitters. Conservatively fails to unpush if all workers + * are active unless there are multiple tasks in queue. + */ + final boolean trySharedUnpush(ForkJoinTask task, ForkJoinPool p) { + boolean success = false; + if (task != null && top != base && runState == 0 && + U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { + try { + ForkJoinTask[] a; int n, s; + if ((a = array) != null && (n = (s = top) - base) > 0 && + (n > 1 || p == null || (int)(p.ctl >> AC_SHIFT) < 0)) { + int j = (((a.length - 1) & --s) << ASHIFT) + ABASE; + if (U.getObjectVolatile(a, j) == task && + U.compareAndSwapObject(a, j, task, null)) { + top = s; + success = true; + } + } + } finally { + runState = 0; // unlock + } + } + return success; + } + + /** * Polls the given task only if it is at the current base. */ final boolean pollFor(ForkJoinTask task) { @@ -1047,6 +1074,7 @@ public class ForkJoinPool extends Abstra ASHIFT = 31 - Integer.numberOfLeadingZeros(s); } } + /** * Per-thread records for threads that submit to pools. Currently * holds only pseudo-random seed / index that is used to choose @@ -1103,30 +1131,38 @@ public class ForkJoinPool extends Abstra private static final RuntimePermission modifyThreadPermission; /** - * Per-thread submission bookeeping. Shared across all pools + * Per-thread submission bookkeeping. Shared across all pools * to reduce ThreadLocal pollution and because random motion * to avoid contention in one pool is likely to hold for others. */ private static final ThreadSubmitter submitters; + /** Common default pool */ + static volatile ForkJoinPool commonPool; + + // commonPool construction parameters + private static final String propPrefix = + "java.util.concurrent.ForkJoinPool.common."; + private static final Thread.UncaughtExceptionHandler commonPoolUEH; + private static final ForkJoinWorkerThreadFactory commonPoolFactory; + static final int commonPoolParallelism; + + /** Static initialization lock */ + private static final Mutex initializationLock; + // static constants /** - * The wakeup interval (in nanoseconds) for a worker waiting for a - * task when the pool is quiescent to instead try to shrink the - * number of workers. The exact value does not matter too - * much. It must be short enough to release resources during - * sustained periods of idleness, but not so short that threads - * are continually re-created. + * Initial timeout value (in nanoseconds) for the tread triggering + * quiescence to park waiting for new work. On timeout, the thread + * will instead try to shrink the number of workers. */ - private static final long SHRINK_RATE = - 4L * 1000L * 1000L * 1000L; // 4 seconds + private static final long IDLE_TIMEOUT = 1000L * 1000L * 1000L; // 1sec /** - * The timeout value for attempted shrinkage, includes - * some slop to cope with system timer imprecision. + * Timeout value when there are more threads than parallelism level */ - private static final long SHRINK_TIMEOUT = SHRINK_RATE - (SHRINK_RATE / 10); + private static final long FAST_IDLE_TIMEOUT = 100L * 1000L * 1000L; /** * The maximum stolen->joining link depth allowed in method @@ -1259,7 +1295,7 @@ public class ForkJoinPool extends Abstra final Thread.UncaughtExceptionHandler ueh; // per-worker UEH final AtomicLong stealCount; // collect counts when terminated final AtomicInteger nextWorkerNumber; // to create worker name string - final String workerNamePrefix; // to create worker name string + String workerNamePrefix; // to create worker name string // Creating, registering, and deregistering workers @@ -1300,7 +1336,6 @@ public class ForkJoinPool extends Abstra * * @param w the worker's queue */ - final void registerWorker(WorkQueue w) { Mutex lock = this.lock; lock.lock(); @@ -1375,7 +1410,6 @@ public class ForkJoinPool extends Abstra U.throwException(ex); } - // Submissions /** @@ -1423,6 +1457,35 @@ public class ForkJoinPool extends Abstra } } + /** + * Submits the given (non-null) task to the common pool, if possible. + */ + static void submitToCommonPool(ForkJoinTask task) { + ForkJoinPool p; + if ((p = commonPool) == null) + p = ensureCommonPool(); + p.doSubmit(task); + } + + /** + * Returns true if the given task was submitted to common pool + * and has not yet commenced execution, and is available for + * removal according to execution policies; if so removing the + * submission from the pool. + * + * @param task the task + * @return true if successful + */ + static boolean tryUnsubmitFromCommonPool(ForkJoinTask task) { + ForkJoinPool p; WorkQueue[] ws; WorkQueue q; + int k = submitters.get().seed & SQMASK; + return ((p = commonPool) != null && + (ws = p.workQueues) != null && + ws.length > (k &= p.submitMask) && + (q = ws[k]) != null && + q.trySharedUnpush(task, p)); + } + // Maintaining ctl counts /** @@ -1434,7 +1497,7 @@ public class ForkJoinPool extends Abstra } /** - * Tries to activate or create a worker if too few are active. + * Tries to create one or activate one or more workers if too few are active. */ final void signalWork() { long c; int u; @@ -1518,7 +1581,7 @@ public class ForkJoinPool extends Abstra * awaiting signal, * * @param w the worker (via its WorkQueue) - * @return a task or null of none found + * @return a task or null if none found */ private final ForkJoinTask scan(WorkQueue w) { WorkQueue[] ws; // first update random seed @@ -1535,7 +1598,7 @@ public class ForkJoinPool extends Abstra t = (ForkJoinTask)U.getObjectVolatile(a, i); if (q.base == b && ec >= 0 && t != null && U.compareAndSwapObject(a, i, t, null)) { - if (q.top - (q.base = b + 1) > 1) + if (q.top - (q.base = b + 1) > 0) signalWork(); // help pushes signal return t; } @@ -1581,12 +1644,10 @@ public class ForkJoinPool extends Abstra } } else if (w.eventCount < 0) { // already queued - if ((nr = w.rescans) > 0) { // continue rescanning - int ac = a + parallelism; - if (((w.rescans = (ac < nr) ? ac : nr - 1) & 3) == 0) - Thread.yield(); // yield before block - } - else { + int ac = a + parallelism; + if ((nr = w.rescans) > 0) // continue rescanning + w.rescans = (ac < nr) ? ac : nr - 1; + else if (((w.seed >>> 16) & ac) == 0) { // randomize park Thread.interrupted(); // clear status Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); @@ -1604,8 +1665,8 @@ public class ForkJoinPool extends Abstra /** * If inactivating worker w has caused the pool to become * quiescent, checks for pool termination, and, so long as this is - * not the only worker, waits for event for up to SHRINK_RATE - * nanosecs. On timeout, if ctl has not changed, terminates the + * not the only worker, waits for event for up to a given + * duration. On timeout, if ctl has not changed, terminates the * worker, which will in turn wake up another worker to possibly * repeat this process. * @@ -1616,20 +1677,21 @@ public class ForkJoinPool extends Abstra private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) { if (w.eventCount < 0 && !tryTerminate(false, false) && (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) { + int dc = -(short)(currentCtl >>> TC_SHIFT); + long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT; + long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop Thread wt = Thread.currentThread(); - Thread.yield(); // yield before block while (ctl == currentCtl) { - long startTime = System.nanoTime(); Thread.interrupted(); // timed variant of version in scan() U.putObject(wt, PARKBLOCKER, this); w.parker = wt; if (ctl == currentCtl) - U.park(false, SHRINK_RATE); + U.park(false, parkTime); w.parker = null; U.putObject(wt, PARKBLOCKER, null); if (ctl != currentCtl) break; - if (System.nanoTime() - startTime >= SHRINK_TIMEOUT && + if (deadline - System.nanoTime() <= 0L && U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) { w.eventCount = (w.eventCount + E_SEQ) | E_MASK; w.runState = -1; // shrink @@ -1895,7 +1957,12 @@ public class ForkJoinPool extends Abstra */ private WorkQueue findNonEmptyStealQueue(WorkQueue w) { // Similar to loop in scan(), but ignoring submissions - int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; + int r; + if (w == null) // allow external callers + r = ThreadLocalRandom.current().nextInt(); + else { + r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; + } int step = (r >>> 16) | 1; for (WorkQueue[] ws;;) { int rs = runState, m; @@ -1914,7 +1981,6 @@ public class ForkJoinPool extends Abstra } } - /** * Runs tasks until {@code isQuiescent()}. We piggyback on * active count ctl maintenance, but rather than blocking @@ -1957,6 +2023,23 @@ public class ForkJoinPool extends Abstra } /** + * Restricted version of helpQuiescePool for non-FJ callers + */ + static void externalHelpQuiescePool() { + ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q; + ForkJoinTask t; int b; + int k = submitters.get().seed & SQMASK; + if ((p = commonPool) != null && + (ws = p.workQueues) != null && + ws.length > (k &= p.submitMask) && + (w = ws[k]) != null && + (q = p.findNonEmptyStealQueue(w)) != null && + (b = q.base) - q.top < 0 && + (t = q.pollAt(b)) != null) + t.doExec(); + } + + /** * Gets and removes a local or stolen task for the given worker. * * @return a task, if available @@ -1989,6 +2072,20 @@ public class ForkJoinPool extends Abstra 8); } + /** + * Returns approximate submission queue length for the given caller + */ + static int getEstimatedSubmitterQueueLength() { + ForkJoinPool p; WorkQueue[] ws; WorkQueue q; + int k = submitters.get().seed & SQMASK; + return ((p = commonPool) != null && + p.runState >= 0 && + (ws = p.workQueues) != null && + ws.length > (k &= p.submitMask) && + (q = ws[k]) != null) ? + q.queueSize() : 0; + } + // Termination /** @@ -2170,6 +2267,36 @@ public class ForkJoinPool extends Abstra lock.unlock(); } + /** + * Returns the common pool instance + * + * @return the common pool instance + */ + public static ForkJoinPool commonPool() { + ForkJoinPool p; + return (p = commonPool) != null? p : ensureCommonPool(); + } + + private static ForkJoinPool ensureCommonPool() { + ForkJoinPool p; + if ((p = commonPool) == null) { + final Mutex lock = initializationLock; + lock.lock(); + try { + if ((p = commonPool) == null) { + p = commonPool = new ForkJoinPool(commonPoolParallelism, + commonPoolFactory, + commonPoolUEH, false); + // use a more informative name string for workers + p.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; + } + } finally { + lock.unlock(); + } + } + return p; + } + // Execution methods /** @@ -2343,6 +2470,15 @@ public class ForkJoinPool extends Abstra } /** + * Returns the targeted parallelism level of the common pool. + * + * @return the targeted parallelism level of the common pool + */ + public static int getCommonPoolParallelism() { + return commonPoolParallelism; + } + + /** * Returns the number of worker threads that have started but not * yet terminated. The result returned by this method may differ * from {@link #getParallelism} when threads are created to @@ -2594,11 +2730,13 @@ public class ForkJoinPool extends Abstra } /** - * Initiates an orderly shutdown in which previously submitted - * tasks are executed, but no new tasks will be accepted. - * Invocation has no additional effect if already shut down. - * Tasks that are in the process of being submitted concurrently - * during the course of this method may or may not be rejected. + * Possibly initiates an orderly shutdown in which previously + * submitted tasks are executed, but no new tasks will be + * accepted. Invocation has no effect on execution state if this + * is the {@link #commonPool}, and no additional effect if + * already shut down. Tasks that are in the process of being + * submitted concurrently during the course of this method may or + * may not be rejected. * * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads @@ -2607,18 +2745,21 @@ public class ForkJoinPool extends Abstra */ public void shutdown() { checkPermission(); - tryTerminate(false, true); + if (this != commonPool) + tryTerminate(false, true); } /** - * Attempts to cancel and/or stop all tasks, and reject all - * subsequently submitted tasks. Tasks that are in the process of - * being submitted or executed concurrently during the course of - * this method may or may not be rejected. This method cancels - * both existing and unexecuted tasks, in order to permit - * termination in the presence of task dependencies. So the method - * always returns an empty list (unlike the case for some other - * Executors). + * Possibly attempts to cancel and/or stop all tasks, and reject + * all subsequently submitted tasks. Invocation has no effect on + * execution state if this is the {@link #commonPool}, and no + * additional effect if already shut down. Otherwise, tasks that + * are in the process of being submitted or executed concurrently + * during the course of this method may or may not be + * rejected. This method cancels both existing and unexecuted + * tasks, in order to permit termination in the presence of task + * dependencies. So the method always returns an empty list + * (unlike the case for some other Executors). * * @return an empty list * @throws SecurityException if a security manager exists and @@ -2628,7 +2769,8 @@ public class ForkJoinPool extends Abstra */ public List shutdownNow() { checkPermission(); - tryTerminate(true, true); + if (this != commonPool) + tryTerminate(true, true); return Collections.emptyList(); } @@ -2837,6 +2979,7 @@ public class ForkJoinPool extends Abstra defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); submitters = new ThreadSubmitter(); + initializationLock = new Mutex(); int s; try { U = getUnsafe(); @@ -2855,6 +2998,29 @@ public class ForkJoinPool extends Abstra if ((s & (s-1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(s); + + // Establish configuration for default pool + try { + String pp = System.getProperty(propPrefix + "parallelism"); + String fp = System.getProperty(propPrefix + "threadFactory"); + String up = System.getProperty(propPrefix + "exceptionHandler"); + int par; + if ((pp == null || (par = Integer.parseInt(pp)) <= 0)) + par = Runtime.getRuntime().availableProcessors(); + commonPoolParallelism = par; + if (fp != null) + commonPoolFactory = (ForkJoinWorkerThreadFactory) + ClassLoader.getSystemClassLoader().loadClass(fp).newInstance(); + else + commonPoolFactory = defaultForkJoinWorkerThreadFactory; + if (up != null) + commonPoolUEH = (Thread.UncaughtExceptionHandler) + ClassLoader.getSystemClassLoader().loadClass(up).newInstance(); + else + commonPoolUEH = null; + } catch (Exception e) { + throw new Error(e); + } } /**