--- jsr166/src/jsr166e/ForkJoinPool.java 2012/08/13 15:52:33 1.1 +++ jsr166/src/jsr166e/ForkJoinPool.java 2012/10/28 22:35:45 1.7 @@ -5,6 +5,7 @@ */ package jsr166e; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -17,6 +18,7 @@ import java.util.concurrent.ExecutorServ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -41,14 +43,27 @@ import java.util.concurrent.locks.Condit * ForkJoinPool}s may also be appropriate for use with event-style * tasks that are never joined. * - *
A {@code ForkJoinPool} is constructed with a given target - * parallelism level; by default, equal to the number of available - * processors. The pool attempts to maintain enough active (or - * available) threads by dynamically adding, suspending, or resuming - * internal worker threads, even if some tasks are stalled waiting to - * join others. However, no such adjustments are guaranteed in the - * face of blocked IO or other unmanaged synchronization. The nested - * {@link ManagedBlocker} interface enables extension of the kinds of + *
A static {@link #commonPool} is available and appropriate for + * most applications. The common pool is constructed upon first + * access, or upon usage by any ForkJoinTask that is not explictly + * submitted to a specified pool. Using the common pool normally + * reduces resource usage (its threads are slowly reclaimed during + * periods of non-use, and reinstated upon subsequent use). The + * common pool is by default constructed with default parameters, but + * these may be controlled by setting any or all of the three + * properties {@code + * java.util.concurrent.ForkJoinPool.common.{parallelism, + * threadFactory, exceptionHandler}}. + * + *
For applications that require separate or custom pools, a {@code + * ForkJoinPool} may be constructed with a given target parallelism + * level; by default, equal to the number of available processors. The + * pool attempts to maintain enough active (or available) threads by + * dynamically adding, suspending, or resuming internal worker + * threads, even if some tasks are stalled waiting to join + * others. However, no such adjustments are guaranteed in the face of + * blocked IO or other unmanaged synchronization. The nested {@link + * ManagedBlocker} interface enables extension of the kinds of * synchronization accommodated. * *
In addition to execution and lifecycle control methods, this @@ -93,23 +108,6 @@ import java.util.concurrent.locks.Condit * * * - *
Sample Usage. Normally a single {@code ForkJoinPool} is - * used for all parallel task execution in a program or subsystem. - * Otherwise, use would not usually outweigh the construction and - * bookkeeping overhead of creating a large set of threads. For - * example, a common pool could be used for the {@code SortTasks} - * illustrated in {@link RecursiveAction}. Because {@code - * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon - * daemon} mode, there is typically no need to explicitly {@link - * #shutdown} such a pool upon program exit. - * - *
{@code - * static final ForkJoinPool mainPool = new ForkJoinPool(); - * ... - * public void sort(long[] array) { - * mainPool.invoke(new SortTask(array, 0, array.length)); - * }}- * *
Implementation notes: This implementation restricts the
* maximum number of running threads to 32767. Attempts to create
* pools with greater than the maximum number result in
@@ -320,9 +318,11 @@ public class ForkJoinPool extends Abstra
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
- * time out and terminate if the pool has remained quiescent for
- * SHRINK_RATE nanosecs. This will slowly propagate, eventually
- * terminating all workers after long periods of non-use.
+ * time out and terminate if the pool has remained quiescent for a
+ * given period -- a short period if there are more threads than
+ * parallelism, longer as the number of threads decreases. This
+ * will slowly propagate, eventually terminating all workers after
+ * periods of non-use.
*
* Shutdown and Termination. A call to shutdownNow atomically sets
* a runState bit and then (non-atomically) sets each worker's
@@ -813,6 +813,33 @@ public class ForkJoinPool extends Abstra
}
/**
+ * Version of tryUnpush for shared queues; called by non-FJ
+ * submitters. Conservatively fails to unpush if all workers
+ * are active unless there are multiple tasks in queue.
+ */
+ final boolean trySharedUnpush(ForkJoinTask> task, ForkJoinPool p) {
+ boolean success = false;
+ if (task != null && top != base && runState == 0 &&
+ U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
+ try {
+ ForkJoinTask>[] a; int n, s;
+ if ((a = array) != null && (n = (s = top) - base) > 0 &&
+ (n > 1 || p == null || (int)(p.ctl >> AC_SHIFT) < 0)) {
+ int j = (((a.length - 1) & --s) << ASHIFT) + ABASE;
+ if (U.getObjectVolatile(a, j) == task &&
+ U.compareAndSwapObject(a, j, task, null)) {
+ top = s;
+ success = true;
+ }
+ }
+ } finally {
+ runState = 0; // unlock
+ }
+ }
+ return success;
+ }
+
+ /**
* Polls the given task only if it is at the current base.
*/
final boolean pollFor(ForkJoinTask> task) {
@@ -1047,6 +1074,7 @@ public class ForkJoinPool extends Abstra
ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
}
}
+
/**
* Per-thread records for threads that submit to pools. Currently
* holds only pseudo-random seed / index that is used to choose
@@ -1103,30 +1131,38 @@ public class ForkJoinPool extends Abstra
private static final RuntimePermission modifyThreadPermission;
/**
- * Per-thread submission bookeeping. Shared across all pools
+ * Per-thread submission bookkeeping. Shared across all pools
* to reduce ThreadLocal pollution and because random motion
* to avoid contention in one pool is likely to hold for others.
*/
private static final ThreadSubmitter submitters;
+ /** Common default pool */
+ static volatile ForkJoinPool commonPool;
+
+ // commonPool construction parameters
+ private static final String propPrefix =
+ "java.util.concurrent.ForkJoinPool.common.";
+ private static final Thread.UncaughtExceptionHandler commonPoolUEH;
+ private static final ForkJoinWorkerThreadFactory commonPoolFactory;
+ static final int commonPoolParallelism;
+
+ /** Static initialization lock */
+ private static final Mutex initializationLock;
+
// static constants
/**
- * The wakeup interval (in nanoseconds) for a worker waiting for a
- * task when the pool is quiescent to instead try to shrink the
- * number of workers. The exact value does not matter too
- * much. It must be short enough to release resources during
- * sustained periods of idleness, but not so short that threads
- * are continually re-created.
+ * Initial timeout value (in nanoseconds) for the tread triggering
+ * quiescence to park waiting for new work. On timeout, the thread
+ * will instead try to shrink the number of workers.
*/
- private static final long SHRINK_RATE =
- 4L * 1000L * 1000L * 1000L; // 4 seconds
+ private static final long IDLE_TIMEOUT = 1000L * 1000L * 1000L; // 1sec
/**
- * The timeout value for attempted shrinkage, includes
- * some slop to cope with system timer imprecision.
+ * Timeout value when there are more threads than parallelism level
*/
- private static final long SHRINK_TIMEOUT = SHRINK_RATE - (SHRINK_RATE / 10);
+ private static final long FAST_IDLE_TIMEOUT = 100L * 1000L * 1000L;
/**
* The maximum stolen->joining link depth allowed in method
@@ -1259,7 +1295,7 @@ public class ForkJoinPool extends Abstra
final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
final AtomicLong stealCount; // collect counts when terminated
final AtomicInteger nextWorkerNumber; // to create worker name string
- final String workerNamePrefix; // to create worker name string
+ String workerNamePrefix; // to create worker name string
// Creating, registering, and deregistering workers
@@ -1300,7 +1336,6 @@ public class ForkJoinPool extends Abstra
*
* @param w the worker's queue
*/
-
final void registerWorker(WorkQueue w) {
Mutex lock = this.lock;
lock.lock();
@@ -1375,7 +1410,6 @@ public class ForkJoinPool extends Abstra
U.throwException(ex);
}
-
// Submissions
/**
@@ -1423,6 +1457,35 @@ public class ForkJoinPool extends Abstra
}
}
+ /**
+ * Submits the given (non-null) task to the common pool, if possible.
+ */
+ static void submitToCommonPool(ForkJoinTask> task) {
+ ForkJoinPool p;
+ if ((p = commonPool) == null)
+ p = ensureCommonPool();
+ p.doSubmit(task);
+ }
+
+ /**
+ * Returns true if the given task was submitted to common pool
+ * and has not yet commenced execution, and is available for
+ * removal according to execution policies; if so removing the
+ * submission from the pool.
+ *
+ * @param task the task
+ * @return true if successful
+ */
+ static boolean tryUnsubmitFromCommonPool(ForkJoinTask> task) {
+ ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
+ int k = submitters.get().seed & SQMASK;
+ return ((p = commonPool) != null &&
+ (ws = p.workQueues) != null &&
+ ws.length > (k &= p.submitMask) &&
+ (q = ws[k]) != null &&
+ q.trySharedUnpush(task, p));
+ }
+
// Maintaining ctl counts
/**
@@ -1434,7 +1497,7 @@ public class ForkJoinPool extends Abstra
}
/**
- * Tries to activate or create a worker if too few are active.
+ * Tries to create one or activate one or more workers if too few are active.
*/
final void signalWork() {
long c; int u;
@@ -1518,7 +1581,7 @@ public class ForkJoinPool extends Abstra
* awaiting signal,
*
* @param w the worker (via its WorkQueue)
- * @return a task or null of none found
+ * @return a task or null if none found
*/
private final ForkJoinTask> scan(WorkQueue w) {
WorkQueue[] ws; // first update random seed
@@ -1535,7 +1598,7 @@ public class ForkJoinPool extends Abstra
t = (ForkJoinTask>)U.getObjectVolatile(a, i);
if (q.base == b && ec >= 0 && t != null &&
U.compareAndSwapObject(a, i, t, null)) {
- if (q.top - (q.base = b + 1) > 1)
+ if (q.top - (q.base = b + 1) > 0)
signalWork(); // help pushes signal
return t;
}
@@ -1581,12 +1644,10 @@ public class ForkJoinPool extends Abstra
}
}
else if (w.eventCount < 0) { // already queued
- if ((nr = w.rescans) > 0) { // continue rescanning
- int ac = a + parallelism;
- if (((w.rescans = (ac < nr) ? ac : nr - 1) & 3) == 0)
- Thread.yield(); // yield before block
- }
- else {
+ int ac = a + parallelism;
+ if ((nr = w.rescans) > 0) // continue rescanning
+ w.rescans = (ac < nr) ? ac : nr - 1;
+ else if (((w.seed >>> 16) & ac) == 0) { // randomize park
Thread.interrupted(); // clear status
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
@@ -1604,8 +1665,8 @@ public class ForkJoinPool extends Abstra
/**
* If inactivating worker w has caused the pool to become
* quiescent, checks for pool termination, and, so long as this is
- * not the only worker, waits for event for up to SHRINK_RATE
- * nanosecs. On timeout, if ctl has not changed, terminates the
+ * not the only worker, waits for event for up to a given
+ * duration. On timeout, if ctl has not changed, terminates the
* worker, which will in turn wake up another worker to possibly
* repeat this process.
*
@@ -1616,20 +1677,21 @@ public class ForkJoinPool extends Abstra
private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
if (w.eventCount < 0 && !tryTerminate(false, false) &&
(int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) {
+ int dc = -(short)(currentCtl >>> TC_SHIFT);
+ long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
+ long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop
Thread wt = Thread.currentThread();
- Thread.yield(); // yield before block
while (ctl == currentCtl) {
- long startTime = System.nanoTime();
Thread.interrupted(); // timed variant of version in scan()
U.putObject(wt, PARKBLOCKER, this);
w.parker = wt;
if (ctl == currentCtl)
- U.park(false, SHRINK_RATE);
+ U.park(false, parkTime);
w.parker = null;
U.putObject(wt, PARKBLOCKER, null);
if (ctl != currentCtl)
break;
- if (System.nanoTime() - startTime >= SHRINK_TIMEOUT &&
+ if (deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
w.runState = -1; // shrink
@@ -1895,7 +1957,12 @@ public class ForkJoinPool extends Abstra
*/
private WorkQueue findNonEmptyStealQueue(WorkQueue w) {
// Similar to loop in scan(), but ignoring submissions
- int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
+ int r;
+ if (w == null) // allow external callers
+ r = ThreadLocalRandom.current().nextInt();
+ else {
+ r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
+ }
int step = (r >>> 16) | 1;
for (WorkQueue[] ws;;) {
int rs = runState, m;
@@ -1914,7 +1981,6 @@ public class ForkJoinPool extends Abstra
}
}
-
/**
* Runs tasks until {@code isQuiescent()}. We piggyback on
* active count ctl maintenance, but rather than blocking
@@ -1957,6 +2023,23 @@ public class ForkJoinPool extends Abstra
}
/**
+ * Restricted version of helpQuiescePool for non-FJ callers
+ */
+ static void externalHelpQuiescePool() {
+ ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q;
+ ForkJoinTask> t; int b;
+ int k = submitters.get().seed & SQMASK;
+ if ((p = commonPool) != null &&
+ (ws = p.workQueues) != null &&
+ ws.length > (k &= p.submitMask) &&
+ (w = ws[k]) != null &&
+ (q = p.findNonEmptyStealQueue(w)) != null &&
+ (b = q.base) - q.top < 0 &&
+ (t = q.pollAt(b)) != null)
+ t.doExec();
+ }
+
+ /**
* Gets and removes a local or stolen task for the given worker.
*
* @return a task, if available
@@ -1989,6 +2072,20 @@ public class ForkJoinPool extends Abstra
8);
}
+ /**
+ * Returns approximate submission queue length for the given caller
+ */
+ static int getEstimatedSubmitterQueueLength() {
+ ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
+ int k = submitters.get().seed & SQMASK;
+ return ((p = commonPool) != null &&
+ p.runState >= 0 &&
+ (ws = p.workQueues) != null &&
+ ws.length > (k &= p.submitMask) &&
+ (q = ws[k]) != null) ?
+ q.queueSize() : 0;
+ }
+
// Termination
/**
@@ -2170,6 +2267,36 @@ public class ForkJoinPool extends Abstra
lock.unlock();
}
+ /**
+ * Returns the common pool instance
+ *
+ * @return the common pool instance
+ */
+ public static ForkJoinPool commonPool() {
+ ForkJoinPool p;
+ return (p = commonPool) != null? p : ensureCommonPool();
+ }
+
+ private static ForkJoinPool ensureCommonPool() {
+ ForkJoinPool p;
+ if ((p = commonPool) == null) {
+ final Mutex lock = initializationLock;
+ lock.lock();
+ try {
+ if ((p = commonPool) == null) {
+ p = commonPool = new ForkJoinPool(commonPoolParallelism,
+ commonPoolFactory,
+ commonPoolUEH, false);
+ // use a more informative name string for workers
+ p.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ return p;
+ }
+
// Execution methods
/**
@@ -2343,6 +2470,15 @@ public class ForkJoinPool extends Abstra
}
/**
+ * Returns the targeted parallelism level of the common pool.
+ *
+ * @return the targeted parallelism level of the common pool
+ */
+ public static int getCommonPoolParallelism() {
+ return commonPoolParallelism;
+ }
+
+ /**
* Returns the number of worker threads that have started but not
* yet terminated. The result returned by this method may differ
* from {@link #getParallelism} when threads are created to
@@ -2594,11 +2730,13 @@ public class ForkJoinPool extends Abstra
}
/**
- * Initiates an orderly shutdown in which previously submitted
- * tasks are executed, but no new tasks will be accepted.
- * Invocation has no additional effect if already shut down.
- * Tasks that are in the process of being submitted concurrently
- * during the course of this method may or may not be rejected.
+ * Possibly initiates an orderly shutdown in which previously
+ * submitted tasks are executed, but no new tasks will be
+ * accepted. Invocation has no effect on execution state if this
+ * is the {@link #commonPool}, and no additional effect if
+ * already shut down. Tasks that are in the process of being
+ * submitted concurrently during the course of this method may or
+ * may not be rejected.
*
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
@@ -2607,18 +2745,21 @@ public class ForkJoinPool extends Abstra
*/
public void shutdown() {
checkPermission();
- tryTerminate(false, true);
+ if (this != commonPool)
+ tryTerminate(false, true);
}
/**
- * Attempts to cancel and/or stop all tasks, and reject all
- * subsequently submitted tasks. Tasks that are in the process of
- * being submitted or executed concurrently during the course of
- * this method may or may not be rejected. This method cancels
- * both existing and unexecuted tasks, in order to permit
- * termination in the presence of task dependencies. So the method
- * always returns an empty list (unlike the case for some other
- * Executors).
+ * Possibly attempts to cancel and/or stop all tasks, and reject
+ * all subsequently submitted tasks. Invocation has no effect on
+ * execution state if this is the {@link #commonPool}, and no
+ * additional effect if already shut down. Otherwise, tasks that
+ * are in the process of being submitted or executed concurrently
+ * during the course of this method may or may not be
+ * rejected. This method cancels both existing and unexecuted
+ * tasks, in order to permit termination in the presence of task
+ * dependencies. So the method always returns an empty list
+ * (unlike the case for some other Executors).
*
* @return an empty list
* @throws SecurityException if a security manager exists and
@@ -2628,7 +2769,8 @@ public class ForkJoinPool extends Abstra
*/
public List