--- jsr166/src/jsr166e/ForkJoinPool.java 2012/11/20 06:18:39 1.20 +++ jsr166/src/jsr166e/ForkJoinPool.java 2013/07/19 19:34:43 1.62 @@ -6,6 +6,7 @@ package jsr166e; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -49,9 +50,9 @@ import java.util.concurrent.TimeUnit; * level; by default, equal to the number of available processors. The * pool attempts to maintain enough active (or available) threads by * dynamically adding, suspending, or resuming internal worker - * threads, even if some tasks are stalled waiting to join - * others. However, no such adjustments are guaranteed in the face of - * blocked IO or other unmanaged synchronization. The nested {@link + * threads, even if some tasks are stalled waiting to join others. + * However, no such adjustments are guaranteed in the face of blocked + * I/O or other unmanaged synchronization. The nested {@link * ManagedBlocker} interface enables extension of the kinds of * synchronization accommodated. * @@ -75,38 +76,45 @@ import java.util.concurrent.TimeUnit; * there is little difference among choice of methods. * * + * * * * * * * - * + * * * * * - * + * * * * * - * + * * * * *
Summary of task execution methods
Call from non-fork/join clients Call from within fork/join computations
Arrange async execution Arrange async execution {@link #execute(ForkJoinTask)} {@link ForkJoinTask#fork}
Await and obtain result Await and obtain result {@link #invoke(ForkJoinTask)} {@link ForkJoinTask#invoke}
Arrange exec and obtain Future Arrange exec and obtain Future {@link #submit(ForkJoinTask)} {@link ForkJoinTask#fork} (ForkJoinTasks are Futures)
* *

The common pool is by default constructed with default - * parameters, but these may be controlled by setting three {@link - * System#getProperty properties} with prefix {@code - * java.util.concurrent.ForkJoinPool.common}: {@code parallelism} -- - * an integer greater than zero, {@code threadFactory} -- the class - * name of a {@link ForkJoinWorkerThreadFactory}, and {@code - * exceptionHandler} -- the class name of a {@link - * java.lang.Thread.UncaughtExceptionHandler - * Thread.UncaughtExceptionHandler}. Upon any error in establishing - * these settings, default parameters are used. + * parameters, but these may be controlled by setting three + * {@linkplain System#getProperty system properties}: + *

+ * The system class loader is used to load these classes. + * Upon any error in establishing these settings, default parameters + * are used. It is possible to disable or limit the use of threads in + * the common pool by setting the parallelism property to zero, and/or + * using a factory that may return {@code null}. * *

Implementation notes: This implementation restricts the * maximum number of running threads to 32767. Attempts to create @@ -152,32 +160,35 @@ public class ForkJoinPool extends Abstra * (http://research.sun.com/scalable/pubs/index.html) and * "Idempotent work stealing" by Michael, Saraswat, and Vechev, * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). - * The main differences ultimately stem from GC requirements that - * we null out taken slots as soon as we can, to maintain as small - * a footprint as possible even in programs generating huge - * numbers of tasks. To accomplish this, we shift the CAS - * arbitrating pop vs poll (steal) from being on the indices - * ("base" and "top") to the slots themselves. So, both a - * successful pop and poll mainly entail a CAS of a slot from - * non-null to null. Because we rely on CASes of references, we - * do not need tag bits on base or top. They are simple ints as - * used in any circular array-based queue (see for example - * ArrayDeque). Updates to the indices must still be ordered in a - * way that guarantees that top == base means the queue is empty, - * but otherwise may err on the side of possibly making the queue - * appear nonempty when a push, pop, or poll have not fully - * committed. Note that this means that the poll operation, - * considered individually, is not wait-free. One thief cannot - * successfully continue until another in-progress one (or, if - * previously empty, a push) completes. However, in the - * aggregate, we ensure at least probabilistic non-blockingness. - * If an attempted steal fails, a thief always chooses a different - * random victim target to try next. So, in order for one thief to - * progress, it suffices for any in-progress poll or new push on - * any empty queue to complete. (This is why we normally use - * method pollAt and its variants that try once at the apparent - * base index, else consider alternative actions, rather than - * method poll.) + * See also "Correct and Efficient Work-Stealing for Weak Memory + * Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 + * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an + * analysis of memory ordering (atomic, volatile etc) issues. The + * main differences ultimately stem from GC requirements that we + * null out taken slots as soon as we can, to maintain as small a + * footprint as possible even in programs generating huge numbers + * of tasks. To accomplish this, we shift the CAS arbitrating pop + * vs poll (steal) from being on the indices ("base" and "top") to + * the slots themselves. So, both a successful pop and poll + * mainly entail a CAS of a slot from non-null to null. Because + * we rely on CASes of references, we do not need tag bits on base + * or top. They are simple ints as used in any circular + * array-based queue (see for example ArrayDeque). Updates to the + * indices must still be ordered in a way that guarantees that top + * == base means the queue is empty, but otherwise may err on the + * side of possibly making the queue appear nonempty when a push, + * pop, or poll have not fully committed. Note that this means + * that the poll operation, considered individually, is not + * wait-free. One thief cannot successfully continue until another + * in-progress one (or, if previously empty, a push) completes. + * However, in the aggregate, we ensure at least probabilistic + * non-blockingness. If an attempted steal fails, a thief always + * chooses a different random victim target to try next. So, in + * order for one thief to progress, it suffices for any + * in-progress poll or new push on any empty queue to + * complete. (This is why we normally use method pollAt and its + * variants that try once at the apparent base index, else + * consider alternative actions, rather than method poll.) * * This approach also enables support of a user mode in which local * task processing is in FIFO, not LIFO order, simply by using @@ -196,18 +207,18 @@ public class ForkJoinPool extends Abstra * for work-stealing (this would contaminate lifo/fifo * processing). Instead, we randomly associate submission queues * with submitting threads, using a form of hashing. The - * ThreadLocal Submitter class contains a value initially used as - * a hash code for choosing existing queues, but may be randomly - * repositioned upon contention with other submitters. In - * essence, submitters act like workers except that they are - * restricted to executing local tasks that they submitted (or in - * the case of CountedCompleters, others with the same root task). - * However, because most shared/external queue operations are more - * expensive than internal, and because, at steady state, external - * submitters will compete for CPU with workers, ForkJoinTask.join - * and related methods disable them from repeatedly helping to - * process tasks if all workers are active. Insertion of tasks in - * shared mode requires a lock (mainly to protect in the case of + * Submitter probe value serves as a hash code for + * choosing existing queues, and may be randomly repositioned upon + * contention with other submitters. In essence, submitters act + * like workers except that they are restricted to executing local + * tasks that they submitted (or in the case of CountedCompleters, + * others with the same root task). However, because most + * shared/external queue operations are more expensive than + * internal, and because, at steady state, external submitters + * will compete for CPU with workers, ForkJoinTask.join and + * related methods disable them from repeatedly helping to process + * tasks if all workers are active. Insertion of tasks in shared + * mode requires a lock (mainly to protect in the case of * resizing) but we use only a simple spinlock (using bits in * field qlock), because submitters encountering a busy queue move * on to try or create other queues -- they block only when @@ -297,36 +308,35 @@ public class ForkJoinPool extends Abstra * has not yet entered the wait queue. We solve this by requiring * a full sweep of all workers (via repeated calls to method * scan()) both before and after a newly waiting worker is added - * to the wait queue. During a rescan, the worker might release - * some other queued worker rather than itself, which has the same - * net effect. Because enqueued workers may actually be rescanning - * rather than waiting, we set and clear the "parker" field of - * WorkQueues to reduce unnecessary calls to unpark. (This - * requires a secondary recheck to avoid missed signals.) Note - * the unusual conventions about Thread.interrupts surrounding - * parking and other blocking: Because interrupts are used solely - * to alert threads to check termination, which is checked anyway - * upon blocking, we clear status (using Thread.interrupted) - * before any call to park, so that park does not immediately - * return due to status being set via some other unrelated call to - * interrupt in user code. + * to the wait queue. Because enqueued workers may actually be + * rescanning rather than waiting, we set and clear the "parker" + * field of WorkQueues to reduce unnecessary calls to unpark. + * (This requires a secondary recheck to avoid missed signals.) + * Note the unusual conventions about Thread.interrupts + * surrounding parking and other blocking: Because interrupts are + * used solely to alert threads to check termination, which is + * checked anyway upon blocking, we clear status (using + * Thread.interrupted) before any call to park, so that park does + * not immediately return due to status being set via some other + * unrelated call to interrupt in user code. * * Signalling. We create or wake up workers only when there * appears to be at least one task they might be able to find and - * execute. However, many other threads may notice the same task - * and each signal to wake up a thread that might take it. So in - * general, pools will be over-signalled. When a submission is - * added or another worker adds a task to a queue that is - * apparently empty, they signal waiting workers (or trigger - * creation of new ones if fewer than the given parallelism - * level). These primary signals are buttressed by signals - * whenever other threads scan for work or do not have a task to - * process (including the case of leaving a hint to unparked - * threads to help signal others upon wakeup). On most platforms, - * signalling (unpark) overhead time is noticeably long, and the - * time between signalling a thread and it actually making - * progress can be very noticeably long, so it is worth offloading - * these delays from critical paths as much as possible. + * execute. When a submission is added or another worker adds a + * task to a queue that has fewer than two tasks, they signal + * waiting workers (or trigger creation of new ones if fewer than + * the given parallelism level -- signalWork). These primary + * signals are buttressed by others whenever other threads remove + * a task from a queue and notice that there are other tasks there + * as well. So in general, pools will be over-signalled. On most + * platforms, signalling (unpark) overhead time is noticeably + * long, and the time between signalling a thread and it actually + * making progress can be very noticeably long, so it is worth + * offloading these delays from critical paths as much as + * possible. Additionally, workers spin-down gradually, by staying + * alive so long as they see the ctl state changing. Similar + * stability-sensing techniques are also used before blocking in + * awaitJoin and helpComplete. * * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will @@ -439,7 +449,7 @@ public class ForkJoinPool extends Abstra * Common Pool * =========== * - * The static commonPool always exists after static + * The static common pool always exists after static * initialization. Since it (or any other created pool) need * never be used, we minimize initial construction overhead and * footprint to the setup of about a dozen fields, with no nested @@ -447,8 +457,11 @@ public class ForkJoinPool extends Abstra * fullExternalPush during the first submission to the pool. * * When external threads submit to the common pool, they can - * perform some subtask processing (see externalHelpJoin and - * related methods). We do not need to record whether these + * perform subtask processing (see externalHelpJoin and related + * methods). This caller-helps policy makes it sensible to set + * common pool parallelism level to one (or more) less than the + * total number of available cores, or even zero for pure + * caller-runs. We do not need to record whether external * submissions are to the common pool -- if not, externalHelpJoin * returns quickly (at the most helping to signal some common pool * workers). These submitters would otherwise be blocked waiting @@ -517,6 +530,7 @@ public class ForkJoinPool extends Abstra * Returns a new worker thread operating in the given pool. * * @param pool the pool this thread works in + * @return the new worker thread * @throws NullPointerException if the pool is null */ public ForkJoinWorkerThread newThread(ForkJoinPool pool); @@ -592,14 +606,8 @@ public class ForkJoinPool extends Abstra * do not want multiple WorkQueue instances or multiple queue * arrays sharing cache lines. (It would be best for queue objects * and their arrays to share, but there is nothing available to - * help arrange that). Unfortunately, because they are recorded - * in a common array, WorkQueue instances are often moved to be - * adjacent by garbage collectors. To reduce impact, we use field - * padding that works OK on common platforms; this effectively - * trades off slightly slower average field access for the sake of - * avoiding really bad worst-case access. (Until better JVM - * support is in place, this padding is dependent on transient - * properties of JVM field layout rules.) + * help arrange that). The @Contended annotation alerts JVMs to + * try to keep instances apart. */ static final class WorkQueue { /** @@ -622,13 +630,15 @@ public class ForkJoinPool extends Abstra */ static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M - int seed; // for random scanning; initialize nonzero + // Heuristic padding to ameliorate unfortunate memory placements + volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06; + volatile int eventCount; // encoded inactivation count; < 0 if inactive int nextWait; // encoded record of next event waiter - int hint; // steal or signal hint (index) - int poolIndex; // index of this queue in pool (or 0) - final int mode; // 0: lifo, > 0: fifo, < 0: shared int nsteals; // number of steals + int hint; // steal index hint + short poolIndex; // index of this queue in pool + final short mode; // 0: lifo, > 0: fifo, < 0: shared volatile int qlock; // 1: locked, -1: terminate; else 0 volatile int base; // index of next slot for poll int top; // index of next slot for push @@ -639,74 +649,64 @@ public class ForkJoinPool extends Abstra volatile ForkJoinTask currentJoin; // task being joined in awaitJoin ForkJoinTask currentSteal; // current non-local task being executed - // Heuristic padding to ameliorate unfortunate memory placements - Object p00, p01, p02, p03, p04, p05, p06, p07; - Object p08, p09, p0a, p0b, p0c; + volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17; + volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d; WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode, int seed) { - this.array = new ForkJoinTask[WorkQueue.INITIAL_QUEUE_CAPACITY]; this.pool = pool; this.owner = owner; - this.mode = mode; - this.seed = seed; - // Place indices in the center of array + this.mode = (short)mode; + this.hint = seed; // store initial seed for runWorker + // Place indices in the center of array (that is not yet allocated) base = top = INITIAL_QUEUE_CAPACITY >>> 1; } /** - * Pushes a task. Call only by owner in unshared queues. - * Cases needing resizing or rejection are relayed to fullPush - * (that also handles shared queues). + * Returns the approximate number of tasks in the queue. + */ + final int queueSize() { + int n = base - top; // non-owner callers must read base first + return (n >= 0) ? 0 : -n; // ignore transient negative + } + + /** + * Provides a more accurate estimate of whether this queue has + * any tasks than does queueSize, by checking whether a + * near-empty queue has at least one unclaimed task. + */ + final boolean isEmpty() { + ForkJoinTask[] a; int m, s; + int n = base - (s = top); + return (n >= 0 || + (n == -1 && + ((a = array) == null || + (m = a.length - 1) < 0 || + U.getObject + (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null))); + } + + /** + * Pushes a task. Call only by owner in unshared queues. (The + * shared-queue version is embedded in method externalPush.) * * @param task the task. Caller must ensure non-null. - * @throw RejectedExecutionException if array cannot be resized + * @throws RejectedExecutionException if array cannot be resized */ final void push(ForkJoinTask task) { ForkJoinTask[] a; ForkJoinPool p; - int s = top, m, n; + int s = top, n; if ((a = array) != null) { // ignore if queue removed - U.putOrderedObject - (a, (((m = a.length - 1) & s) << ASHIFT) + ABASE, task); - if ((n = (top = s + 1) - base) <= 1) { - if ((p = pool) != null) - p.signalWork(this, 0); - } + int m = a.length - 1; + U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); + if ((n = (top = s + 1) - base) <= 2) + (p = pool).signalWork(p.workQueues, this); else if (n >= m) growArray(); } } /** - * Pushes a task if lock is free and array is either big - * enough or can be resized to be big enough. - * - * @param task the task. Caller must ensure non-null. - * @return true if submitted - */ - final boolean trySharedPush(ForkJoinTask task) { - boolean submitted = false; - if (qlock == 0 && U.compareAndSwapInt(this, QLOCK, 0, 1)) { - ForkJoinTask[] a = array; ForkJoinPool p; - int s = top; - try { - if ((a != null && a.length > s + 1 - base) || - (a = growArray()) != null) { // must presize - int j = (((a.length - 1) & s) << ASHIFT) + ABASE; - U.putOrderedObject(a, j, task); - top = s + 1; - submitted = true; - } - } finally { - qlock = 0; // unlock - } - if (submitted && (p = pool) != null) - p.signalWork(this, 0); - } - return submitted; - } - - /** * Initializes or doubles the capacity of array. Call either * by owner or with lock held -- it is OK for base, but not * top, to move while resizings are in progress. @@ -764,9 +764,8 @@ public class ForkJoinPool extends Abstra if ((a = array) != null) { int j = (((a.length - 1) & b) << ASHIFT) + ABASE; if ((t = (ForkJoinTask)U.getObjectVolatile(a, j)) != null && - base == b && - U.compareAndSwapObject(a, j, t, null)) { - base = b + 1; + base == b && U.compareAndSwapObject(a, j, t, null)) { + U.putOrderedInt(this, QBASE, b + 1); return t; } } @@ -782,9 +781,8 @@ public class ForkJoinPool extends Abstra int j = (((a.length - 1) & b) << ASHIFT) + ABASE; t = (ForkJoinTask)U.getObjectVolatile(a, j); if (t != null) { - if (base == b && - U.compareAndSwapObject(a, j, t, null)) { - base = b + 1; + if (U.compareAndSwapObject(a, j, t, null)) { + U.putOrderedInt(this, QBASE, b + 1); return t; } } @@ -841,78 +839,62 @@ public class ForkJoinPool extends Abstra ForkJoinTask.cancelIgnoringExceptions(t); } - /** - * Computes next value for random probes. Scans don't require - * a very high quality generator, but also not a crummy one. - * Marsaglia xor-shift is cheap and works well enough. Note: - * This is manually inlined in its usages in ForkJoinPool to - * avoid writes inside busy scan loops. - */ - final int nextSeed() { - int r = seed; - r ^= r << 13; - r ^= r >>> 17; - return seed = r ^= r << 5; - } + // Specialized execution methods /** - * Provides a more accurate estimate of size than (top - base) - * by ordering reads and checking whether a near-empty queue - * has at least one unclaimed task. + * Polls and runs tasks until empty. */ - final int queueSize() { - ForkJoinTask[] a; int k, s, n; - return ((n = base - (s = top)) < 0 && - (n != -1 || - ((a = array) != null && (k = a.length) > 0 && - U.getObject - (a, (long)((((k - 1) & (s - 1)) << ASHIFT) + ABASE)) != null))) ? - -n : 0; + final void pollAndExecAll() { + for (ForkJoinTask t; (t = poll()) != null;) + t.doExec(); } - // Specialized execution methods - /** - * Pops and runs tasks until empty. + * Executes a top-level task and any local tasks remaining + * after execution. */ - private void popAndExecAll() { - // A bit faster than repeated pop calls - ForkJoinTask[] a; int m, s; long j; ForkJoinTask t; - while ((a = array) != null && (m = a.length - 1) >= 0 && - (s = top - 1) - base >= 0 && - (t = ((ForkJoinTask) - U.getObject(a, j = ((m & s) << ASHIFT) + ABASE))) - != null) { - if (U.compareAndSwapObject(a, j, t, null)) { - top = s; - t.doExec(); + final void runTask(ForkJoinTask task) { + if ((currentSteal = task) != null) { + task.doExec(); + ForkJoinTask[] a = array; + int md = mode; + ++nsteals; + currentSteal = null; + if (md != 0) + pollAndExecAll(); + else if (a != null) { + int s, m = a.length - 1; + while ((s = top - 1) - base >= 0) { + long i = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = (ForkJoinTask)U.getObject(a, i); + if (t == null) + break; + if (U.compareAndSwapObject(a, i, t, null)) { + top = s; + t.doExec(); + } + } } } } /** - * Polls and runs tasks until empty. - */ - private void pollAndExecAll() { - for (ForkJoinTask t; (t = poll()) != null;) - t.doExec(); - } - - /** * If present, removes from queue and executes the given task, * or any other cancelled task. Returns (true) on any CAS * or consistency check failure so caller can retry. * - * @return false if no progress can be made, else true; + * @return false if no progress can be made, else true */ final boolean tryRemoveAndExec(ForkJoinTask task) { - boolean stat = true, removed = false, empty = true; + boolean stat; ForkJoinTask[] a; int m, s, b, n; - if ((a = array) != null && (m = a.length - 1) >= 0 && + if (task != null && (a = array) != null && (m = a.length - 1) >= 0 && (n = (s = top) - (b = base)) > 0) { + boolean removed = false, empty = true; + stat = true; for (ForkJoinTask t;;) { // traverse from s to b - int j = ((--s & m) << ASHIFT) + ABASE; - t = (ForkJoinTask)U.getObjectVolatile(a, j); + long j = ((--s & m) << ASHIFT) + ABASE; + t = (ForkJoinTask)U.getObject(a, j); if (t == null) // inconsistent length break; else if (t == task) { @@ -940,68 +922,95 @@ public class ForkJoinPool extends Abstra break; } } + if (removed) + task.doExec(); } - if (removed) - task.doExec(); + else + stat = false; return stat; } /** - * Polls for and executes the given task or any other task in - * its CountedCompleter computation + * Tries to poll for and execute the given task or any other + * task in its CountedCompleter computation. */ - final boolean pollAndExecCC(ForkJoinTask root) { - ForkJoinTask[] a; int b; Object o; - outer: while ((b = base) - top < 0 && (a = array) != null) { + final boolean pollAndExecCC(CountedCompleter root) { + ForkJoinTask[] a; int b; Object o; CountedCompleter t, r; + if ((b = base) - top < 0 && (a = array) != null) { long j = (((a.length - 1) & b) << ASHIFT) + ABASE; - if ((o = U.getObject(a, j)) == null || - !(o instanceof CountedCompleter)) - break; - for (CountedCompleter t = (CountedCompleter)o, r = t;;) { - if (r == root) { - if (base == b && - U.compareAndSwapObject(a, j, t, null)) { - base = b + 1; - t.doExec(); + if ((o = U.getObjectVolatile(a, j)) == null) + return true; // retry + if (o instanceof CountedCompleter) { + for (t = (CountedCompleter)o, r = t;;) { + if (r == root) { + if (base == b && + U.compareAndSwapObject(a, j, t, null)) { + U.putOrderedInt(this, QBASE, b + 1); + t.doExec(); + } return true; } - else - break; // restart + else if ((r = r.completer) == null) + break; // not part of root computation } - if ((r = r.completer) == null) - break outer; // not part of root computation } } return false; } /** - * Executes a top-level task and any local tasks remaining - * after execution. + * Tries to pop and execute the given task or any other task + * in its CountedCompleter computation. */ - final void runTask(ForkJoinTask t) { - if (t != null) { - (currentSteal = t).doExec(); - currentSteal = null; - ++nsteals; - if (top != base) { // process remaining local tasks - if (mode == 0) - popAndExecAll(); - else - pollAndExecAll(); + final boolean externalPopAndExecCC(CountedCompleter root) { + ForkJoinTask[] a; int s; Object o; CountedCompleter t, r; + if (base - (s = top) < 0 && (a = array) != null) { + long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; + if ((o = U.getObject(a, j)) instanceof CountedCompleter) { + for (t = (CountedCompleter)o, r = t;;) { + if (r == root) { + if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { + if (top == s && array == a && + U.compareAndSwapObject(a, j, t, null)) { + top = s - 1; + qlock = 0; + t.doExec(); + } + else + qlock = 0; + } + return true; + } + else if ((r = r.completer) == null) + break; + } } } + return false; } /** - * Executes a non-top-level (stolen) task. + * Internal version */ - final void runSubtask(ForkJoinTask t) { - if (t != null) { - ForkJoinTask ps = currentSteal; - (currentSteal = t).doExec(); - currentSteal = ps; + final boolean internalPopAndExecCC(CountedCompleter root) { + ForkJoinTask[] a; int s; Object o; CountedCompleter t, r; + if (base - (s = top) < 0 && (a = array) != null) { + long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; + if ((o = U.getObject(a, j)) instanceof CountedCompleter) { + for (t = (CountedCompleter)o, r = t;;) { + if (r == root) { + if (U.compareAndSwapObject(a, j, t, null)) { + top = s - 1; + t.doExec(); + } + return true; + } + else if ((r = r.completer) == null) + break; + } + } } + return false; } /** @@ -1016,76 +1025,35 @@ public class ForkJoinPool extends Abstra s != Thread.State.TIMED_WAITING); } - /** - * If this owned and is not already interrupted, try to - * interrupt and/or unpark, ignoring exceptions. - */ - final void interruptOwner() { - Thread wt, p; - if ((wt = owner) != null && !wt.isInterrupted()) { - try { - wt.interrupt(); - } catch (SecurityException ignore) { - } - } - if ((p = parker) != null) - U.unpark(p); - } - // Unsafe mechanics private static final sun.misc.Unsafe U; + private static final long QBASE; private static final long QLOCK; private static final int ABASE; private static final int ASHIFT; static { - int s; try { U = getUnsafe(); Class k = WorkQueue.class; Class ak = ForkJoinTask[].class; + QBASE = U.objectFieldOffset + (k.getDeclaredField("base")); QLOCK = U.objectFieldOffset (k.getDeclaredField("qlock")); ABASE = U.arrayBaseOffset(ak); - s = U.arrayIndexScale(ak); + int scale = U.arrayIndexScale(ak); + if ((scale & (scale - 1)) != 0) + throw new Error("data type scale not a power of two"); + ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } - if ((s & (s-1)) != 0) - throw new Error("data type scale not a power of two"); - ASHIFT = 31 - Integer.numberOfLeadingZeros(s); } } // static fields (initialized in static initializer below) /** - * Creates a new ForkJoinWorkerThread. This factory is used unless - * overridden in ForkJoinPool constructors. - */ - public static final ForkJoinWorkerThreadFactory - defaultForkJoinWorkerThreadFactory; - - /** - * Per-thread records for threads that submit to pools. Currently - * holds only pseudo-random seed / index that is used to choose - * submission queues in method externalPush. In the future, this may - * also incorporate a means to implement different task rejection - * and resubmission policies. - * - * Seeds for submitters and workers/workQueues work in basically - * the same way but are initialized and updated using slightly - * different mechanics. Both are initialized using the same - * approach as in class ThreadLocal, where successive values are - * unlikely to collide with previous values. Seeds are then - * randomly modified upon collisions using xorshifts, which - * requires a non-zero seed. - */ - static final class Submitter { - int seed; - Submitter(int s) { seed = s; } - } - - /** * Per-thread submission bookkeeping. Shared across all pools * to reduce ThreadLocal pollution and because random motion * to avoid contention in one pool is likely to hold for others. @@ -1095,12 +1063,11 @@ public class ForkJoinPool extends Abstra static final ThreadLocal submitters; /** - * Common (static) pool. Non-null for public use unless a static - * construction exception, but internal usages null-check on use - * to paranoically avoid potential initialization circularities - * as well as to simplify generated code. + * Creates a new ForkJoinWorkerThread. This factory is used unless + * overridden in ForkJoinPool constructors. */ - static final ForkJoinPool commonPool; + public static final ForkJoinWorkerThreadFactory + defaultForkJoinWorkerThreadFactory; /** * Permission required for callers of methods that may start or @@ -1109,9 +1076,20 @@ public class ForkJoinPool extends Abstra private static final RuntimePermission modifyThreadPermission; /** - * Common pool parallelism. Must equal commonPool.parallelism. + * Common (static) pool. Non-null for public use unless a static + * construction exception, but internal usages null-check on use + * to paranoically avoid potential initialization circularities + * as well as to simplify generated code. + */ + static final ForkJoinPool common; + + /** + * Common pool parallelism. To allow simpler use and management + * when common pool threads are disabled, we allow the underlying + * common.parallelism field to be zero, but in that case still report + * parallelism as 1 to reflect resulting caller-runs mechanics. */ - static final int commonPoolParallelism; + static final int commonParallelism; /** * Sequence number for creating workerNamePrefix. @@ -1119,8 +1097,8 @@ public class ForkJoinPool extends Abstra private static int poolNumberSequence; /** - * Return the next sequence number. We don't expect this to - * ever contend so use simple builtin sync. + * Returns the next sequence number. We don't expect this to + * ever contend, so use simple builtin sync. */ private static final synchronized int nextPoolId() { return ++poolNumberSequence; @@ -1144,6 +1122,11 @@ public class ForkJoinPool extends Abstra private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L; /** + * Tolerance for idle timeouts, to cope with timer undershoots + */ + private static final long TIMEOUT_SLOP = 2000000L; + + /** * The maximum stolen->joining link depth allowed in method * tryHelpStealer. Must be a power of two. Depths for legitimate * chains are unbounded, but we use a fixed constant to avoid @@ -1159,7 +1142,7 @@ public class ForkJoinPool extends Abstra */ private static final int SEED_INCREMENT = 0x61c88647; - /** + /* * Bits and masks for control variables * * Field ctl is a long packed with: @@ -1243,57 +1226,40 @@ public class ForkJoinPool extends Abstra static final int FIFO_QUEUE = 1; static final int SHARED_QUEUE = -1; - // bounds for #steps in scan loop -- must be power 2 minus 1 - private static final int MIN_SCAN = 0x1ff; // cover estimation slop - private static final int MAX_SCAN = 0x1ffff; // 4 * max workers + // Heuristic padding to ameliorate unfortunate memory placements + volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06; // Instance fields - - /* - * Field layout of this class tends to matter more than one would - * like. Runtime layout order is only loosely related to - * declaration order and may differ across JVMs, but the following - * empirically works OK on current JVMs. - */ volatile long stealCount; // collects worker counts volatile long ctl; // main pool control volatile int plock; // shutdown status and seqLock volatile int indexSeed; // worker/submitter index seed - final int config; // mode and parallelism level + final short parallelism; // parallelism level + final short mode; // LIFO/FIFO WorkQueue[] workQueues; // main registry final ForkJoinWorkerThreadFactory factory; - final Thread.UncaughtExceptionHandler ueh; // per-worker UEH + final UncaughtExceptionHandler ueh; // per-worker UEH final String workerNamePrefix; // to create worker name string - /* + volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17; + volatile Object pad18, pad19, pad1a, pad1b; + + /** * Acquires the plock lock to protect worker array and related * updates. This method is called only if an initial CAS on plock - * fails. This acts as a spinLock for normal cases, but falls back + * fails. This acts as a spinlock for normal cases, but falls back * to builtin monitor to block when (rarely) needed. This would be * a terrible idea for a highly contended lock, but works fine as - * a more conservative alternative to a pure spinlock. See - * internal ConcurrentHashMap documentation for further - * explanation of nearly the same construction. + * a more conservative alternative to a pure spinlock. */ private int acquirePlock() { - int spins = PL_SPINS, r = 0, ps, nps; + int spins = PL_SPINS, ps, nps; for (;;) { if (((ps = plock) & PL_LOCK) == 0 && U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK)) return nps; - else if (r == 0) { // randomize spins if possible - Thread t = Thread.currentThread(); WorkQueue w; Submitter z; - if ((t instanceof ForkJoinWorkerThread) && - (w = ((ForkJoinWorkerThread)t).workQueue) != null) - r = w.seed; - else if ((z = submitters.get()) != null) - r = z.seed; - else - r = 1; - } else if (spins >= 0) { - r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift - if (r >= 0) + if (ThreadLocalRandom.current().nextInt() >= 0) --spins; } else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) { @@ -1325,49 +1291,32 @@ public class ForkJoinPool extends Abstra } /** - * Tries to create and start a worker; adjusts counts etc on failure + * Tries to create and start one worker if fewer than target + * parallelism level exist. Adjusts counts etc on failure. */ - private void addWorker() { - ForkJoinWorkerThread wt = null; - try { - (wt = factory.newThread(this)).start(); - } catch (Throwable ex) { - deregisterWorker(wt, ex); // adjust on failure - } - } - - /** - * Performs secondary initialization, called when plock is zero. - * Creates workQueue array and sets plock to a valid value. The - * lock body must be exception-free (so no try/finally) so we - * optimistically allocate new array outside the lock and throw - * away if (very rarely) not needed. (A similar tactic is used in - * fullExternalPush.) Because the plock seq value can eventually - * wrap around zero, this method harmlessly fails to reinitialize - * if workQueues exists, while still advancing plock. - */ - private void initWorkQueuesArray() { - WorkQueue[] ws; int ps; - int p = config & SMASK; // find power of two table size - int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots - n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; - WorkQueue[] nws = new WorkQueue[(n + 1) << 1]; - if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) - ps = acquirePlock(); - if ((ws = workQueues) == null || ws.length == 0) - workQueues = nws; - int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) - releasePlock(nps); - long c; int u; - if ((u = (int)((c = ctl) >>> 32)) < 0 && (int)c == 0) { - long nc = (long)(((u + UTC_UNIT) & UTC_MASK) | - ((u + UAC_UNIT) & UAC_MASK)) << 32; - if (U.compareAndSwapLong(this, CTL, c, nc)) - addWorker(); + private void tryAddWorker() { + long c; int u, e; + while ((u = (int)((c = ctl) >>> 32)) < 0 && + (u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) { + long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) | + ((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e; + if (U.compareAndSwapLong(this, CTL, c, nc)) { + ForkJoinWorkerThreadFactory fac; + Throwable ex = null; + ForkJoinWorkerThread wt = null; + try { + if ((fac = factory) != null && + (wt = fac.newThread(this)) != null) { + wt.start(); + break; + } + } catch (Throwable rex) { + ex = rex; + } + deregisterWorker(wt, ex); + break; + } } - } // Registering and deregistering workers @@ -1380,42 +1329,46 @@ public class ForkJoinPool extends Abstra * expanding as needed. * * @param wt the worker thread + * @return the worker's queue */ - final void registerWorker(ForkJoinWorkerThread wt) { - if (wt != null && wt.workQueue == null) { - int s, ps; // generate a rarely colliding candidate index seed - do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed, - s += SEED_INCREMENT) || - s == 0); // skip 0 - WorkQueue w = new WorkQueue(this, wt, config >>> 16, s); - if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) - ps = acquirePlock(); - int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); - try { - WorkQueue[] ws; - if ((ws = workQueues) != null && wt.workQueue == null) { - int n = ws.length, m = n - 1; - int r = (s << 1) | 1; // use odd-numbered indices - if (ws[r &= m] != null) { // collision - int probes = 0; // step by approx half size - int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; - while (ws[r = (r + step) & m] != null) { - if (++probes >= n) { - workQueues = ws = Arrays.copyOf(ws, n <<= 1); - m = n - 1; - probes = 0; - } + final WorkQueue registerWorker(ForkJoinWorkerThread wt) { + UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps; + wt.setDaemon(true); + if ((handler = ueh) != null) + wt.setUncaughtExceptionHandler(handler); + do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed, + s += SEED_INCREMENT) || + s == 0); // skip 0 + WorkQueue w = new WorkQueue(this, wt, mode, s); + if (((ps = plock) & PL_LOCK) != 0 || + !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) + ps = acquirePlock(); + int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); + try { + if ((ws = workQueues) != null) { // skip if shutting down + int n = ws.length, m = n - 1; + int r = (s << 1) | 1; // use odd-numbered indices + if (ws[r &= m] != null) { // collision + int probes = 0; // step by approx half size + int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; + while (ws[r = (r + step) & m] != null) { + if (++probes >= n) { + workQueues = ws = Arrays.copyOf(ws, n <<= 1); + m = n - 1; + probes = 0; } } - w.eventCount = w.poolIndex = r; // volatile write orders - wt.workQueue = ws[r] = w; } - } finally { - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) - releasePlock(nps); + w.poolIndex = (short)r; + w.eventCount = r; // volatile write orders + ws[r] = w; } + } finally { + if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) + releasePlock(nps); } + wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1))); + return w; } /** @@ -1424,17 +1377,17 @@ public class ForkJoinPool extends Abstra * array, and adjusts counts. If pool is shutting down, tries to * complete termination. * - * @param wt the worker thread or null if construction failed + * @param wt the worker thread, or null if construction failed * @param ex the exception causing failure, or null if none */ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { - int ps; + int ps; long sc; w.qlock = -1; // ensure set - long ns = w.nsteals, sc; // collect steal count do {} while (!U.compareAndSwapLong(this, STEALCOUNT, - sc = stealCount, sc + ns)); + sc = stealCount, + sc + w.nsteals)); if (((ps = plock) & PL_LOCK) != 0 || !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) ps = acquirePlock(); @@ -1450,27 +1403,68 @@ public class ForkJoinPool extends Abstra } } - long c; // adjust ctl counts + long c; // adjust ctl counts do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) | ((c - TC_UNIT) & TC_MASK) | (c & ~(AC_MASK|TC_MASK))))); - if (!tryTerminate(false, false) && w != null) { - w.cancelAll(); // cancel remaining tasks - if (w.array != null) // suppress signal if never ran - helpSignal(null, 0); // wake up or create replacement - if (ex == null) // help clean refs on way out - ForkJoinTask.helpExpungeStaleExceptions(); + if (!tryTerminate(false, false) && w != null && w.array != null) { + w.cancelAll(); // cancel remaining tasks + WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e; + while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) { + if (e > 0) { // activate or create replacement + if ((ws = workQueues) == null || + (i = e & SMASK) >= ws.length || + (v = ws[i]) == null) + break; + long nc = (((long)(v.nextWait & E_MASK)) | + ((long)(u + UAC_UNIT) << 32)); + if (v.eventCount != (e | INT_SIGN)) + break; + if (U.compareAndSwapLong(this, CTL, c, nc)) { + v.eventCount = (e + E_SEQ) & E_MASK; + if ((p = v.parker) != null) + U.unpark(p); + break; + } + } + else { + if ((short)u < 0) + tryAddWorker(); + break; + } + } } - - if (ex != null) // rethrow + if (ex == null) // help clean refs on way out + ForkJoinTask.helpExpungeStaleExceptions(); + else // rethrow ForkJoinTask.rethrow(ex); } // Submissions /** + * Per-thread records for threads that submit to pools. Currently + * holds only pseudo-random seed / index that is used to choose + * submission queues in method externalPush. In the future, this may + * also incorporate a means to implement different task rejection + * and resubmission policies. + * + * Seeds for submitters and workers/workQueues work in basically + * the same way but are initialized and updated using slightly + * different mechanics. Both are initialized using the same + * approach as in class ThreadLocal, where successive values are + * unlikely to collide with previous values. Seeds are then + * randomly modified upon collisions using xorshifts, which + * requires a non-zero seed. + */ + static final class Submitter { + int seed; + Submitter(int s) { seed = s; } + } + + /** * Unless shutting down, adds the given task to a submission queue * at submitter's current queue index (modulo submission * range). Only the most common path is directly handled in this @@ -1479,18 +1473,21 @@ public class ForkJoinPool extends Abstra * @param task the task. Caller must ensure non-null. */ final void externalPush(ForkJoinTask task) { - WorkQueue[] ws; WorkQueue q; Submitter z; int m; ForkJoinTask[] a; - if ((z = submitters.get()) != null && plock > 0 && - (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && - (q = ws[m & z.seed & SQMASK]) != null && + Submitter z = submitters.get(); + WorkQueue q; int r, m, s, n, am; ForkJoinTask[] a; + int ps = plock; + WorkQueue[] ws = workQueues; + if (z != null && ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 && + (q = ws[m & (r = z.seed) & SQMASK]) != null && r != 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock - int b = q.base, s = q.top, n, an; - if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) { - U.putObject(a, (long)(((an - 1) & s) << ASHIFT) + ABASE, task); + if ((a = q.array) != null && + (am = a.length - 1) > (n = (s = q.top) - q.base)) { + int j = ((am & s) << ASHIFT) + ABASE; + U.putOrderedObject(a, j, task); q.top = s + 1; // push on to deque q.qlock = 0; - if (n <= 2) - signalWork(q, 0); + if (n <= 1) + signalWork(ws, q); return; } q.qlock = 0; @@ -1501,16 +1498,22 @@ public class ForkJoinPool extends Abstra /** * Full version of externalPush. This method is called, among * other times, upon the first submission of the first task to the - * pool, so must perform secondary initialization (via - * initWorkQueuesArray). It also detects first submission by an - * external thread by looking up its ThreadLocal, and creates a - * new shared queue if the one at index if empty or contended. The - * lock body must be exception-free (so no try/finally) so we - * optimistically allocate new queues outside the lock and throw - * them away if (very rarely) not needed. + * pool, so must perform secondary initialization. It also + * detects first submission by an external thread by looking up + * its ThreadLocal, and creates a new shared queue if the one at + * index if empty or contended. The plock lock body must be + * exception-free (so no try/finally) so we optimistically + * allocate new queues outside the lock and throw them away if + * (very rarely) not needed. + * + * Secondary initialization occurs when plock is zero, to create + * workQueue array and set plock to a valid value. This lock body + * must also be exception-free. Because the plock seq value can + * eventually wrap around zero, this method harmlessly fails to + * reinitialize if workQueues exists, while still advancing plock. */ private void fullExternalPush(ForkJoinTask task) { - int r = 0; + int r = 0; // random index seed for (Submitter z = submitters.get();;) { WorkQueue[] ws; WorkQueue q; int ps, m, k; if (z == null) { @@ -1518,25 +1521,57 @@ public class ForkJoinPool extends Abstra r += SEED_INCREMENT) && r != 0) submitters.set(z = new Submitter(r)); } - else if (r == 0) { // move to a different index + else if (r == 0) { // move to a different index r = z.seed; - r ^= r << 13; // same xorshift as WorkQueues + r ^= r << 13; // same xorshift as WorkQueues r ^= r >>> 17; - z.seed = r ^ (r << 5); + z.seed = r ^= (r << 5); } - else if ((ps = plock) < 0) + if ((ps = plock) < 0) throw new RejectedExecutionException(); else if (ps == 0 || (ws = workQueues) == null || - (m = ws.length - 1) < 0) - initWorkQueuesArray(); + (m = ws.length - 1) < 0) { // initialize workQueues + int p = parallelism; // find power of two table size + int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots + n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; + n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; + WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ? + new WorkQueue[n] : null); + if (((ps = plock) & PL_LOCK) != 0 || + !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) + ps = acquirePlock(); + if (((ws = workQueues) == null || ws.length == 0) && nws != null) + workQueues = nws; + int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); + if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) + releasePlock(nps); + } else if ((q = ws[k = r & m & SQMASK]) != null) { - if (q.trySharedPush(task)) - return; - else - r = 0; // move on contention + if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { + ForkJoinTask[] a = q.array; + int s = q.top; + boolean submitted = false; + try { // locked version of push + if ((a != null && a.length > s + 1 - q.base) || + (a = q.growArray()) != null) { // must presize + int j = (((a.length - 1) & s) << ASHIFT) + ABASE; + U.putOrderedObject(a, j, task); + q.top = s + 1; + submitted = true; + } + } finally { + q.qlock = 0; // unlock + } + if (submitted) { + signalWork(ws, q); + return; + } + } + r = 0; // move on failure } else if (((ps = plock) & PL_LOCK) == 0) { // create new queue q = new WorkQueue(this, null, SHARED_QUEUE, r); + q.poolIndex = (short)k; if (((ps = plock) & PL_LOCK) != 0 || !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) ps = acquirePlock(); @@ -1547,7 +1582,7 @@ public class ForkJoinPool extends Abstra releasePlock(nps); } else - r = 0; // try elsewhere while lock held + r = 0; } } @@ -1558,53 +1593,41 @@ public class ForkJoinPool extends Abstra */ final void incrementActiveCount() { long c; - do {} while (!U.compareAndSwapLong(this, CTL, c = ctl, c + AC_UNIT)); + do {} while (!U.compareAndSwapLong + (this, CTL, c = ctl, ((c & ~AC_MASK) | + ((c & AC_MASK) + AC_UNIT)))); } /** - * Tries to create (at most one) or activate (possibly several) - * workers if too few are active. On contention failure, continues - * until at least one worker is signalled or the given queue is - * empty or all workers are active. - * - * @param q if non-null, the queue holding tasks to be signalled - * @param signals the target number of signals (at least one -- - * if argument is zero also sets signallee hint if parked). - */ - final void signalWork(WorkQueue q, int signals) { - long c; int e, u, i, s; WorkQueue[] ws; WorkQueue w; Thread p; - while ((u = (int)((c = ctl) >>> 32)) < 0) { - if ((e = (int)c) > 0) { - if ((ws = workQueues) != null && ws.length > (i = e & SMASK) && - (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) { - long nc = (((long)(w.nextWait & E_MASK)) | - ((long)(u + UAC_UNIT) << 32)); - if (U.compareAndSwapLong(this, CTL, c, nc)) { - w.eventCount = (e + E_SEQ) & E_MASK; - if ((p = w.parker) != null) { - if (q != null && signals == 0) - w.hint = q.poolIndex; - U.unpark(p); - } - if (--signals <= 0) - break; - } - if (q != null && (s = q.queueSize()) <= signals && - (signals = s) <= 0) - break; - } - else - break; + * Tries to create or activate a worker if too few are active. + * + * @param ws the worker array to use to find signallees + * @param q if non-null, the queue holding tasks to be processed + */ + final void signalWork(WorkQueue[] ws, WorkQueue q) { + for (;;) { + long c; int e, u, i; WorkQueue w; Thread p; + if ((u = (int)((c = ctl) >>> 32)) >= 0) + break; + if ((e = (int)c) <= 0) { + if ((short)u < 0) + tryAddWorker(); + break; } - else if (e == 0 && (u & SHORT_SIGN) != 0) { - long nc = (long)(((u + UTC_UNIT) & UTC_MASK) | - ((u + UAC_UNIT) & UAC_MASK)) << 32; - if (U.compareAndSwapLong(this, CTL, c, nc)) { - addWorker(); - break; - } + if (ws == null || ws.length <= (i = e & SMASK) || + (w = ws[i]) == null) + break; + long nc = (((long)(w.nextWait & E_MASK)) | + ((long)(u + UAC_UNIT)) << 32); + int ne = (e + E_SEQ) & E_MASK; + if (w.eventCount == (e | INT_SIGN) && + U.compareAndSwapLong(this, CTL, c, nc)) { + w.eventCount = ne; + if ((p = w.parker) != null) + U.unpark(p); + break; } - else + if (q != null && q.base >= q.top) break; } } @@ -1615,179 +1638,157 @@ public class ForkJoinPool extends Abstra * Top-level runloop for workers, called by ForkJoinWorkerThread.run. */ final void runWorker(WorkQueue w) { - if (w != null) // skip on initialization failure - do { w.runTask(scan(w)); } while (w.qlock >= 0); + w.growArray(); // allocate queue + for (int r = w.hint; scan(w, r) == 0; ) { + r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift + } } /** - * Scans for and, if found, returns one task, else possibly + * Scans for and, if found, runs one task, else possibly * inactivates the worker. This method operates on single reads of * volatile state and is designed to be re-invoked continuously, * in part because it returns upon detecting inconsistencies, * contention, or state changes that indicate possible success on * re-invocation. * - * The scan searches for tasks across queues (starting at a random - * index, and relying on registerWorker to irregularly scatter - * them within array to avoid bias), checking each at least twice. - * The scan terminates upon either finding a non-empty queue, or - * completing the sweep. If the worker is not inactivated, it - * takes and returns a task from this queue. Otherwise, if not - * activated, it signals workers (that may include itself) and - * returns so caller can retry. Also returns for true if the - * worker array may have changed during an empty scan. On failure - * to find a task, we take one of the following actions, after - * which the caller will retry calling this method unless - * terminated. - * - * * If pool is terminating, terminate the worker. - * - * * If not already enqueued, try to inactivate and enqueue the - * worker on wait queue. Or, if inactivating has caused the pool - * to be quiescent, relay to idleAwaitWork to check for - * termination and possibly shrink pool. - * - * * If already enqueued and none of the above apply, possibly - * (with 1/2 probability) park awaiting signal, else lingering to - * help scan and signal. + * The scan searches for tasks across queues starting at a random + * index, checking each at least twice. The scan terminates upon + * either finding a non-empty queue, or completing the sweep. If + * the worker is not inactivated, it takes and runs a task from + * this queue. Otherwise, if not activated, it tries to activate + * itself or some other worker by signalling. On failure to find a + * task, returns (for retry) if pool state may have changed during + * an empty scan, or tries to inactivate if active, else possibly + * blocks or terminates via method awaitWork. * * @param w the worker (via its WorkQueue) - * @return a task or null if none found + * @param r a random seed + * @return worker qlock status if would have waited, else 0 */ - private final ForkJoinTask scan(WorkQueue w) { - WorkQueue[] ws; int m, hint; - int ps = plock; // read plock before ws - if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) { - int ec = w.eventCount; // ec is negative if inactive - int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; - for (int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN; ; --j) { - WorkQueue q; ForkJoinTask[] a; int b; - if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 && - (a = q.array) != null) { // probably nonempty - int i = (((a.length - 1) & b) << ASHIFT) + ABASE; - ForkJoinTask t = (ForkJoinTask) - U.getObjectVolatile(a, i); - if (q.base == b && ec >= 0 && t != null && - U.compareAndSwapObject(a, i, t, null)) { - if ((q.base = b + 1) - q.top < 0) - signalWork(q, 0); - return t; // taken - } - else if (ec < 0 || j < m) { // cannot take or cannot rescan - w.hint = q.poolIndex; // use hint below - break; // let caller retry after signal - } - } - else if (j < 0) { // end of scan; in loop to simplify code - long c, sc; int e, ns; - if ((ns = w.nsteals) != 0) { - if (U.compareAndSwapLong(this, STEALCOUNT, - sc = stealCount, sc + ns)) - w.nsteals = 0; // collect steals + private final int scan(WorkQueue w, int r) { + WorkQueue[] ws; int m; + long c = ctl; // for consistency check + if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) { + for (int j = m + m + 1, ec = w.eventCount;;) { + WorkQueue q; int b, e; ForkJoinTask[] a; ForkJoinTask t; + if ((q = ws[(r - j) & m]) != null && + (b = q.base) - q.top < 0 && (a = q.array) != null) { + long i = (((a.length - 1) & b) << ASHIFT) + ABASE; + if ((t = ((ForkJoinTask) + U.getObjectVolatile(a, i))) != null) { + if (ec < 0) + helpRelease(c, ws, w, q, b); + else if (q.base == b && + U.compareAndSwapObject(a, i, t, null)) { + U.putOrderedInt(q, QBASE, b + 1); + if ((b + 1) - q.top < 0) + signalWork(ws, q); + w.runTask(t); + } } - else if (plock != ps) // ws may have changed - break; - else if ((e = (int)(c = ctl)) < 0) - w.qlock = -1; // pool is terminating - else if (ec >= 0) { // try to enqueue/inactivate - long nc = ((long)ec | - ((c - AC_UNIT) & (AC_MASK|TC_MASK))); - w.nextWait = e; // link and mark inactive - w.hint = -1; // use hint if set while parked + break; + } + else if (--j < 0) { + if ((ec | (e = (int)c)) < 0) // inactive or terminating + return awaitWork(w, c, ec); + else if (ctl == c) { // try to inactivate and enqueue + long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); + w.nextWait = e; w.eventCount = ec | INT_SIGN; - if (ctl != c || - !U.compareAndSwapLong(this, CTL, c, nc)) - w.eventCount = ec; // unmark on CAS failure - else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK)) - idleAwaitWork(w, nc, c); - } - else if (w.eventCount < 0) { // block - Thread wt = Thread.currentThread(); - Thread.interrupted(); // clear status - U.putObject(wt, PARKBLOCKER, this); - w.parker = wt; // emulate LockSupport.park - if (w.eventCount < 0) // recheck - U.park(false, 0L); - w.parker = null; - U.putObject(wt, PARKBLOCKER, null); + if (!U.compareAndSwapLong(this, CTL, c, nc)) + w.eventCount = ec; // back out } break; } } - if ((hint = w.hint) >= 0) { // help signal - WorkQueue[] vs; WorkQueue v; int k; - w.hint = -1; // suppress resignal - if ((vs = workQueues) != null && hint < vs.length && - (v = vs[hint]) != null && (k = v.base - v.top) < -1) - signalWork(v, 1 - k); - } } - return null; + return 0; } /** - * If inactivating worker w has caused the pool to become - * quiescent, checks for pool termination, and, so long as this is - * not the only worker, waits for event for up to a given - * duration. On timeout, if ctl has not changed, terminates the - * worker, which will in turn wake up another worker to possibly - * repeat this process. + * A continuation of scan(), possibly blocking or terminating + * worker w. Returns without blocking if pool state has apparently + * changed since last invocation. Also, if inactivating w has + * caused the pool to become quiescent, checks for pool + * termination, and, so long as this is not the only worker, waits + * for event for up to a given duration. On timeout, if ctl has + * not changed, terminates the worker, which will in turn wake up + * another worker to possibly repeat this process. * * @param w the calling worker - * @param currentCtl the ctl value triggering possible quiescence - * @param prevCtl the ctl value to restore if thread is terminated + * @param c the ctl value on entry to scan + * @param ec the worker's eventCount on entry to scan */ - private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) { - if (w != null && w.eventCount < 0 && - !tryTerminate(false, false) && (int)prevCtl != 0) { - int dc = -(short)(currentCtl >>> TC_SHIFT); - long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT; - long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop - Thread wt = Thread.currentThread(); - while (ctl == currentCtl) { - Thread.interrupted(); // timed variant of version in scan() - U.putObject(wt, PARKBLOCKER, this); - w.parker = wt; - if (ctl == currentCtl) - U.park(false, parkTime); - w.parker = null; - U.putObject(wt, PARKBLOCKER, null); - if (ctl != currentCtl) - break; - if (deadline - System.nanoTime() <= 0L && - U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) { - w.eventCount = (w.eventCount + E_SEQ) | E_MASK; - w.qlock = -1; // shrink - w.hint = -1; // suppress helping - break; + private final int awaitWork(WorkQueue w, long c, int ec) { + int stat, ns; long parkTime, deadline; + if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c && + !Thread.interrupted()) { + int e = (int)c; + int u = (int)(c >>> 32); + int d = (u >> UAC_SHIFT) + parallelism; // active count + + if (e < 0 || (d <= 0 && tryTerminate(false, false))) + stat = w.qlock = -1; // pool is terminating + else if ((ns = w.nsteals) != 0) { // collect steals and retry + long sc; + w.nsteals = 0; + do {} while (!U.compareAndSwapLong(this, STEALCOUNT, + sc = stealCount, sc + ns)); + } + else { + long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L : + ((long)(w.nextWait & E_MASK)) | // ctl to restore + ((long)(u + UAC_UNIT)) << 32); + if (pc != 0L) { // timed wait if last waiter + int dc = -(short)(c >>> TC_SHIFT); + parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT: + (dc + 1) * IDLE_TIMEOUT); + deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; + } + else + parkTime = deadline = 0L; + if (w.eventCount == ec && ctl == c) { + Thread wt = Thread.currentThread(); + U.putObject(wt, PARKBLOCKER, this); + w.parker = wt; // emulate LockSupport.park + if (w.eventCount == ec && ctl == c) + U.park(false, parkTime); // must recheck before park + w.parker = null; + U.putObject(wt, PARKBLOCKER, null); + if (parkTime != 0L && ctl == c && + deadline - System.nanoTime() <= 0L && + U.compareAndSwapLong(this, CTL, c, pc)) + stat = w.qlock = -1; // shrink pool } } } + return stat; } /** - * Scans through queues looking for work (optionally, while - * joining a task); if any are present, signals. May return early - * if more signalling is detectably unneeded. - * - * @param task if non-null, return early if done - * @param origin an index to start scan - */ - final int helpSignal(ForkJoinTask task, int origin) { - WorkQueue[] ws; WorkQueue q; int m, n, s, u; - if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { - for (int i = 0; i <= m; ++i) { - if (task != null && (s = task.status) < 0) - return s; - if ((q = ws[(i + origin) & m]) != null && - (n = q.queueSize()) > 0) { - signalWork(q, n); - if ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0) - break; - } + * Possibly releases (signals) a worker. Called only from scan() + * when a worker with apparently inactive status finds a non-empty + * queue. This requires revalidating all of the associated state + * from caller. + */ + private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w, + WorkQueue q, int b) { + WorkQueue v; int e, i; Thread p; + if (w != null && w.eventCount < 0 && (e = (int)c) > 0 && + ws != null && ws.length > (i = e & SMASK) && + (v = ws[i]) != null && ctl == c) { + long nc = (((long)(v.nextWait & E_MASK)) | + ((long)((int)(c >>> 32) + UAC_UNIT)) << 32); + int ne = (e + E_SEQ) & E_MASK; + if (q != null && q.base == b && w.eventCount < 0 && + v.eventCount == (e | INT_SIGN) && + U.compareAndSwapLong(this, CTL, c, nc)) { + v.eventCount = ne; + if ((p = v.parker) != null) + U.unpark(p); } } - return 0; } /** @@ -1810,7 +1811,8 @@ public class ForkJoinPool extends Abstra */ private int tryHelpStealer(WorkQueue joiner, ForkJoinTask task) { int stat = 0, steps = 0; // bound to avoid cycles - if (joiner != null && task != null) { // hoist null checks + if (task != null && joiner != null && + joiner.base - joiner.top >= 0) { // hoist checks restart: for (;;) { ForkJoinTask subtask = task; // current target for (WorkQueue j = joiner, v;;) { // v is stealer of subtask @@ -1837,7 +1839,7 @@ public class ForkJoinPool extends Abstra } } for (;;) { // help stealer or descend to its stealer - ForkJoinTask[] a; int b; + ForkJoinTask[] a; int b; if (subtask.status < 0) // surround probes with continue restart; // consistency checks if ((b = v.base) - v.top < 0 && (a = v.array) != null) { @@ -1848,13 +1850,23 @@ public class ForkJoinPool extends Abstra v.currentSteal != subtask) continue restart; // stale stat = 1; // apparent progress - if (t != null && v.base == b && - U.compareAndSwapObject(a, i, t, null)) { - v.base = b + 1; // help stealer - joiner.runSubtask(t); + if (v.base == b) { + if (t == null) + break restart; + if (U.compareAndSwapObject(a, i, t, null)) { + U.putOrderedInt(v, QBASE, b + 1); + ForkJoinTask ps = joiner.currentSteal; + int jt = joiner.top; + do { + joiner.currentSteal = t; + t.doExec(); // clear local tasks too + } while (task.status >= 0 && + joiner.top != jt && + (t = joiner.pop()) != null); + joiner.currentSteal = ps; + break restart; + } } - else if (v.base == b && ++steps == MAX_HELP) - break restart; // v apparently stalled } else { // empty -- try to descend ForkJoinTask next = v.currentJoin; @@ -1881,28 +1893,33 @@ public class ForkJoinPool extends Abstra * and run tasks within the target's computation. * * @param task the task to join - * @param mode if shared, exit upon completing any task - * if all workers are active - * */ - private int helpComplete(ForkJoinTask task, int mode) { - WorkQueue[] ws; WorkQueue q; int m, n, s, u; - if (task != null && (ws = workQueues) != null && - (m = ws.length - 1) >= 0) { - for (int j = 1, origin = j;;) { + private int helpComplete(WorkQueue joiner, CountedCompleter task) { + WorkQueue[] ws; int m; + int s = 0; + if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && + joiner != null && task != null) { + int j = joiner.poolIndex; + int scans = m + m + 1; + long c = 0L; // for stability check + for (int k = scans; ; j += 2) { + WorkQueue q; if ((s = task.status) < 0) - return s; - if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) { - origin = j; - if (mode == SHARED_QUEUE && - ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)) + break; + else if (joiner.internalPopAndExecCC(task)) + k = scans; + else if ((s = task.status) < 0) + break; + else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) + k = scans; + else if (--k < 0) { + if (c == (c = ctl)) break; + k = scans; } - else if ((j = (j + 2) & m) == origin) - break; } } - return 0; + return s; } /** @@ -1911,17 +1928,22 @@ public class ForkJoinPool extends Abstra * for blocking. Fails on contention or termination. Otherwise, * adds a new thread if no idle workers are available and pool * may become starved. + * + * @param c the assumed ctl value */ - final boolean tryCompensate() { - int pc = config & SMASK, e, i, tc; long c; - WorkQueue[] ws; WorkQueue w; Thread p; - if ((ws = workQueues) != null && (e = (int)(c = ctl)) >= 0) { - if (e != 0 && (i = e & SMASK) < ws.length && - (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) { + final boolean tryCompensate(long c) { + WorkQueue[] ws = workQueues; + int pc = parallelism, e = (int)c, m, tc; + if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) { + WorkQueue w = ws[e & m]; + if (e != 0 && w != null) { + Thread p; long nc = ((long)(w.nextWait & E_MASK) | (c & (AC_MASK|TC_MASK))); - if (U.compareAndSwapLong(this, CTL, c, nc)) { - w.eventCount = (e + E_SEQ) & E_MASK; + int ne = (e + E_SEQ) & E_MASK; + if (w.eventCount == (e | INT_SIGN) && + U.compareAndSwapLong(this, CTL, c, nc)) { + w.eventCount = ne; if ((p = w.parker) != null) U.unpark(p); return true; // replace with idle worker @@ -1936,8 +1958,19 @@ public class ForkJoinPool extends Abstra else if (tc + pc < MAX_CAP) { long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); if (U.compareAndSwapLong(this, CTL, c, nc)) { - addWorker(); - return true; + ForkJoinWorkerThreadFactory fac; + Throwable ex = null; + ForkJoinWorkerThread wt = null; + try { + if ((fac = factory) != null && + (wt = fac.newThread(this)) != null) { + wt.start(); + return true; + } + } catch (Throwable rex) { + ex = rex; + } + deregisterWorker(wt, ex); // clean up and return false } } } @@ -1953,29 +1986,19 @@ public class ForkJoinPool extends Abstra */ final int awaitJoin(WorkQueue joiner, ForkJoinTask task) { int s = 0; - if (joiner != null && task != null && (s = task.status) >= 0) { + if (task != null && (s = task.status) >= 0 && joiner != null) { ForkJoinTask prevJoin = joiner.currentJoin; joiner.currentJoin = task; - do {} while ((s = task.status) >= 0 && - joiner.queueSize() > 0 && - joiner.tryRemoveAndExec(task)); // process local tasks - if (s >= 0 && (s = task.status) >= 0 && - (s = helpSignal(task, joiner.poolIndex)) >= 0 && - (task instanceof CountedCompleter)) - s = helpComplete(task, LIFO_QUEUE); - int k = 0; // to perform pre-block yield for politeness + do {} while (joiner.tryRemoveAndExec(task) && // process local tasks + (s = task.status) >= 0); + if (s >= 0 && (task instanceof CountedCompleter)) + s = helpComplete(joiner, (CountedCompleter)task); + long cc = 0; // for stability checks while (s >= 0 && (s = task.status) >= 0) { - if ((joiner.queueSize() > 0 || // try helping - (s = tryHelpStealer(joiner, task)) == 0) && + if ((s = tryHelpStealer(joiner, task)) == 0 && (s = task.status) >= 0) { - if (k < 3) { - if (++k < 3) - s = helpSignal(task, joiner.poolIndex); - else - Thread.yield(); - } - else if (!tryCompensate()) - k = 0; + if (!tryCompensate(cc)) + cc = ctl; else { if (task.trySetSignal() && (s = task.status) >= 0) { synchronized (task) { @@ -1989,9 +2012,11 @@ public class ForkJoinPool extends Abstra task.notifyAll(); } } - long c; // re-activate + long c; // reactivate do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, c + AC_UNIT)); + (this, CTL, c = ctl, + ((c & ~AC_MASK) | + ((c & AC_MASK) + AC_UNIT)))); } } } @@ -2013,14 +2038,11 @@ public class ForkJoinPool extends Abstra if (joiner != null && task != null && (s = task.status) >= 0) { ForkJoinTask prevJoin = joiner.currentJoin; joiner.currentJoin = task; - do {} while ((s = task.status) >= 0 && - joiner.queueSize() > 0 && - joiner.tryRemoveAndExec(task)); - if (s >= 0 && (s = task.status) >= 0 && - (s = helpSignal(task, joiner.poolIndex)) >= 0 && - (task instanceof CountedCompleter)) - s = helpComplete(task, LIFO_QUEUE); - if (s >= 0 && joiner.queueSize() == 0) { + do {} while (joiner.tryRemoveAndExec(task) && // process local tasks + (s = task.status) >= 0); + if (s >= 0) { + if (task instanceof CountedCompleter) + helpComplete(joiner, (CountedCompleter)task); do {} while (task.status >= 0 && tryHelpStealer(joiner, task) > 0); } @@ -2030,29 +2052,22 @@ public class ForkJoinPool extends Abstra /** * Returns a (probably) non-empty steal queue, if one is found - * during a random, then cyclic scan, else null. This method must - * be retried by caller if, by the time it tries to use the queue, - * it is empty. - * @param r a (random) seed for scanning - */ - private WorkQueue findNonEmptyStealQueue(int r) { - for (WorkQueue[] ws;;) { - int ps = plock, m, n; - if ((ws = workQueues) == null || (m = ws.length - 1) < 1) - return null; - for (int j = (m + 1) << 2; ;) { - WorkQueue q = ws[(((r + j) << 1) | 1) & m]; - if (q != null && (n = q.queueSize()) > 0) { - if (n > 1) - signalWork(q, 0); - return q; - } - else if (--j < 0) { - if (plock == ps) - return null; - break; + * during a scan, else null. This method must be retried by + * caller if, by the time it tries to use the queue, it is empty. + */ + private WorkQueue findNonEmptyStealQueue() { + int r = ThreadLocalRandom.current().nextInt(); + for (;;) { + int ps = plock, m; WorkQueue[] ws; WorkQueue q; + if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { + for (int j = (m + 1) << 2; j >= 0; --j) { + if ((q = ws[(((r - j) << 1) | 1) & m]) != null && + q.base - q.top < 0) + return q; } } + if (plock == ps) + return null; } } @@ -2063,38 +2078,36 @@ public class ForkJoinPool extends Abstra * find tasks either. */ final void helpQuiescePool(WorkQueue w) { + ForkJoinTask ps = w.currentSteal; for (boolean active = true;;) { - ForkJoinTask localTask; // exhaust local queue - while ((localTask = w.nextLocalTask()) != null) - localTask.doExec(); - // Similar to loop in scan(), but ignoring submissions - WorkQueue q = findNonEmptyStealQueue(w.nextSeed()); - if (q != null) { - ForkJoinTask t; int b; + long c; WorkQueue q; ForkJoinTask t; int b; + while ((t = w.nextLocalTask()) != null) + t.doExec(); + if ((q = findNonEmptyStealQueue()) != null) { if (!active) { // re-establish active count - long c; active = true; do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, c + AC_UNIT)); + (this, CTL, c = ctl, + ((c & ~AC_MASK) | + ((c & AC_MASK) + AC_UNIT)))); + } + if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { + (w.currentSteal = t).doExec(); + w.currentSteal = ps; } - if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) - w.runSubtask(t); } - else { - long c; - if (active) { // decrement active count without queuing + else if (active) { // decrement active count without queuing + long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT); + if ((int)(nc >> AC_SHIFT) + parallelism == 0) + break; // bypass decrement-then-increment + if (U.compareAndSwapLong(this, CTL, c, nc)) active = false; - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, c -= AC_UNIT)); - } - else - c = ctl; // re-increment on exit - if ((int)(c >> AC_SHIFT) + (config & SMASK) == 0) { - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, c + AC_UNIT)); - break; - } } + else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 && + U.compareAndSwapLong + (this, CTL, c, ((c & ~AC_MASK) | + ((c & AC_MASK) + AC_UNIT)))) + break; } } @@ -2108,7 +2121,7 @@ public class ForkJoinPool extends Abstra WorkQueue q; int b; if ((t = w.nextLocalTask()) != null) return t; - if ((q = findNonEmptyStealQueue(w.nextSeed())) == null) + if ((q = findNonEmptyStealQueue()) == null) return null; if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) return t; @@ -2140,7 +2153,7 @@ public class ForkJoinPool extends Abstra * producing extra tasks amortizes the uncertainty of progress and * diffusion assumptions. * - * So, users will want to use values larger, but not much larger + * So, users will want to use values larger (but not much larger) * than 1 to both smooth over transient shortages and hedge * against uneven progress; as traded off against the cost of * extra task overhead. We leave the user to pick a threshold @@ -2164,7 +2177,7 @@ public class ForkJoinPool extends Abstra static int getSurplusQueuedTaskCount() { Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) { - int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK; + int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).parallelism; int n = (q = wt.workQueue).top - q.base; int a = (int)(pool.ctl >> AC_SHIFT) + p; return n - (a > (p >>>= 1) ? 0 : @@ -2193,61 +2206,69 @@ public class ForkJoinPool extends Abstra * @return true if now terminating or terminated */ private boolean tryTerminate(boolean now, boolean enable) { - if (this == commonPool) // cannot shut down + int ps; + if (this == common) // cannot shut down return false; + if ((ps = plock) >= 0) { // enable by setting plock + if (!enable) + return false; + if ((ps & PL_LOCK) != 0 || + !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) + ps = acquirePlock(); + int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN; + if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) + releasePlock(nps); + } for (long c;;) { - if (((c = ctl) & STOP_BIT) != 0) { // already terminating - if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) { + if (((c = ctl) & STOP_BIT) != 0) { // already terminating + if ((short)(c >>> TC_SHIFT) + parallelism <= 0) { synchronized (this) { - notifyAll(); // signal when 0 workers + notifyAll(); // signal when 0 workers } } return true; } - if (plock >= 0) { // not yet enabled - int ps; - if (!enable) - return false; - if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) - ps = acquirePlock(); - int nps = SHUTDOWN; - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) - releasePlock(nps); - } - if (!now) { // check if idle & no tasks - if ((int)(c >> AC_SHIFT) != -(config & SMASK) || - hasQueuedSubmissions()) + if (!now) { // check if idle & no tasks + WorkQueue[] ws; WorkQueue w; + if ((int)(c >> AC_SHIFT) + parallelism > 0) return false; - // Check for unqueued inactive workers. One pass suffices. - WorkQueue[] ws = workQueues; WorkQueue w; - if (ws != null) { - for (int i = 1; i < ws.length; i += 2) { - if ((w = ws[i]) != null && w.eventCount >= 0) + if ((ws = workQueues) != null) { + for (int i = 0; i < ws.length; ++i) { + if ((w = ws[i]) != null && + (!w.isEmpty() || + ((i & 1) != 0 && w.eventCount >= 0))) { + signalWork(ws, w); return false; + } } } } if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) { for (int pass = 0; pass < 3; ++pass) { - WorkQueue[] ws = workQueues; - if (ws != null) { - WorkQueue w; + WorkQueue[] ws; WorkQueue w; Thread wt; + if ((ws = workQueues) != null) { int n = ws.length; for (int i = 0; i < n; ++i) { if ((w = ws[i]) != null) { w.qlock = -1; if (pass > 0) { w.cancelAll(); - if (pass > 1) - w.interruptOwner(); + if (pass > 1 && (wt = w.owner) != null) { + if (!wt.isInterrupted()) { + try { + wt.interrupt(); + } catch (Throwable ignore) { + } + } + U.unpark(wt); + } } } } // Wake up workers parked on event queue int i, e; long cc; Thread p; while ((e = (int)(cc = ctl) & E_MASK) != 0 && - (i = e & SMASK) < n && + (i = e & SMASK) < n && i >= 0 && (w = ws[i]) != null) { long nc = ((long)(w.nextWait & E_MASK) | ((cc + AC_UNIT) & AC_MASK) | @@ -2273,9 +2294,9 @@ public class ForkJoinPool extends Abstra * least one task. */ static WorkQueue commonSubmitterQueue() { - ForkJoinPool p; WorkQueue[] ws; int m; Submitter z; + Submitter z; ForkJoinPool p; WorkQueue[] ws; int m, r; return ((z = submitters.get()) != null && - (p = commonPool) != null && + (p = common) != null && (ws = p.workQueues) != null && (m = ws.length - 1) >= 0) ? ws[m & z.seed & SQMASK] : null; @@ -2284,123 +2305,57 @@ public class ForkJoinPool extends Abstra /** * Tries to pop the given task from submitter's queue in common pool. */ - static boolean tryExternalUnpush(ForkJoinTask t) { - ForkJoinPool p; WorkQueue[] ws; WorkQueue q; Submitter z; - ForkJoinTask[] a; int m, s; long j; - if ((z = submitters.get()) != null && - (p = commonPool) != null && - (ws = p.workQueues) != null && - (m = ws.length - 1) >= 0 && - (q = ws[m & z.seed & SQMASK]) != null && - (s = q.top) != q.base && - (a = q.array) != null && - U.getObjectVolatile - (a, j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE) == t && - U.compareAndSwapInt(q, QLOCK, 0, 1)) { - if (q.array == a && q.top == s && // recheck - U.compareAndSwapObject(a, j, t, null)) { - q.top = s - 1; - q.qlock = 0; - return true; - } - q.qlock = 0; - } - return false; - } - - /** - * Tries to pop and run local tasks within the same computation - * as the given root. On failure, tries to help complete from - * other queues via helpComplete. - */ - private void externalHelpComplete(WorkQueue q, ForkJoinTask root) { - ForkJoinTask[] a; int m; - if (q != null && (a = q.array) != null && (m = (a.length - 1)) >= 0 && - root != null && root.status >= 0) { - for (;;) { - int s, u; Object o; CountedCompleter task = null; - if ((s = q.top) - q.base > 0) { - long j = ((m & (s - 1)) << ASHIFT) + ABASE; - if ((o = U.getObject(a, j)) != null && - (o instanceof CountedCompleter)) { - CountedCompleter t = (CountedCompleter)o, r = t; - do { - if (r == root) { - if (U.compareAndSwapInt(q, QLOCK, 0, 1)) { - if (q.array == a && q.top == s && - U.compareAndSwapObject(a, j, t, null)) { - q.top = s - 1; - task = t; - } - q.qlock = 0; - } - break; - } - } while ((r = r.completer) != null); - } - } - if (task != null) - task.doExec(); - if (root.status < 0 || - (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0) + final boolean tryExternalUnpush(ForkJoinTask task) { + WorkQueue joiner; ForkJoinTask[] a; int m, s; + Submitter z = submitters.get(); + WorkQueue[] ws = workQueues; + boolean popped = false; + if (z != null && ws != null && (m = ws.length - 1) >= 0 && + (joiner = ws[z.seed & m & SQMASK]) != null && + joiner.base != (s = joiner.top) && + (a = joiner.array) != null) { + long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; + if (U.getObject(a, j) == task && + U.compareAndSwapInt(joiner, QLOCK, 0, 1)) { + if (joiner.top == s && joiner.array == a && + U.compareAndSwapObject(a, j, task, null)) { + joiner.top = s - 1; + popped = true; + } + joiner.qlock = 0; + } + } + return popped; + } + + final int externalHelpComplete(CountedCompleter task) { + WorkQueue joiner; int m, j; + Submitter z = submitters.get(); + WorkQueue[] ws = workQueues; + int s = 0; + if (z != null && ws != null && (m = ws.length - 1) >= 0 && + (joiner = ws[(j = z.seed) & m & SQMASK]) != null && task != null) { + int scans = m + m + 1; + long c = 0L; // for stability check + j |= 1; // poll odd queues + for (int k = scans; ; j += 2) { + WorkQueue q; + if ((s = task.status) < 0) break; - if (task == null) { - if (helpSignal(root, q.poolIndex) >= 0) - helpComplete(root, SHARED_QUEUE); + else if (joiner.externalPopAndExecCC(task)) + k = scans; + else if ((s = task.status) < 0) break; + else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) + k = scans; + else if (--k < 0) { + if (c == (c = ctl)) + break; + k = scans; } } } - } - - /** - * Tries to help execute or signal availability of the given task - * from submitter's queue in common pool. - */ - static void externalHelpJoin(ForkJoinTask t) { - // Some hard-to-avoid overlap with tryExternalUnpush - ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; Submitter z; - ForkJoinTask[] a; int m, s, n; long j; - if (t != null && - (z = submitters.get()) != null && - (p = commonPool) != null && - (ws = p.workQueues) != null && - (m = ws.length - 1) >= 0 && - (q = ws[m & z.seed & SQMASK]) != null && - (a = q.array) != null && - t.status >= 0) { - if ((s = q.top) != q.base && - U.getObjectVolatile - (a, j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE) == t && - U.compareAndSwapInt(q, QLOCK, 0, 1)) { - if (q.array == a && q.top == s && - U.compareAndSwapObject(a, j, t, null)) { - q.top = s - 1; - q.qlock = 0; - t.doExec(); - } - else - q.qlock = 0; - } - if (t.status >= 0) { - if (t instanceof CountedCompleter) - p.externalHelpComplete(q, t); - else - p.helpSignal(t, q.poolIndex); - } - } - } - - /** - * Restricted version of helpQuiescePool for external callers - */ - static void externalHelpQuiescePool() { - ForkJoinPool p; ForkJoinTask t; WorkQueue q; int b; - if ((p = commonPool) != null && - (q = p.findNonEmptyStealQueue(1)) != null && - (b = q.base) - q.top < 0 && - (t = q.pollAt(b)) != null) - t.doExec(); + return s; } // Exported methods @@ -2419,7 +2374,7 @@ public class ForkJoinPool extends Abstra * java.lang.RuntimePermission}{@code ("modifyThread")} */ public ForkJoinPool() { - this(Runtime.getRuntime().availableProcessors(), + this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } @@ -2467,47 +2422,64 @@ public class ForkJoinPool extends Abstra */ public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, - Thread.UncaughtExceptionHandler handler, + UncaughtExceptionHandler handler, boolean asyncMode) { + this(checkParallelism(parallelism), + checkFactory(factory), + handler, + (asyncMode ? FIFO_QUEUE : LIFO_QUEUE), + "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); - if (factory == null) - throw new NullPointerException(); + } + + private static int checkParallelism(int parallelism) { if (parallelism <= 0 || parallelism > MAX_CAP) throw new IllegalArgumentException(); - this.factory = factory; - this.ueh = handler; - this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0); - long np = (long)(-parallelism); // offset ctl counts - this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); - int pn = nextPoolId(); - StringBuilder sb = new StringBuilder("ForkJoinPool-"); - sb.append(Integer.toString(pn)); - sb.append("-worker-"); - this.workerNamePrefix = sb.toString(); + return parallelism; + } + + private static ForkJoinWorkerThreadFactory checkFactory + (ForkJoinWorkerThreadFactory factory) { + if (factory == null) + throw new NullPointerException(); + return factory; } /** - * Constructor for common pool, suitable only for static initialization. - * Basically the same as above, but uses smallest possible initial footprint. - */ - ForkJoinPool(int parallelism, long ctl, - ForkJoinWorkerThreadFactory factory, - Thread.UncaughtExceptionHandler handler) { - this.config = parallelism; - this.ctl = ctl; + * Creates a {@code ForkJoinPool} with the given parameters, without + * any security checks or parameter validation. Invoked directly by + * makeCommonPool. + */ + private ForkJoinPool(int parallelism, + ForkJoinWorkerThreadFactory factory, + UncaughtExceptionHandler handler, + int mode, + String workerNamePrefix) { + this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; - this.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; + this.mode = (short)mode; + this.parallelism = (short)parallelism; + long np = (long)(-parallelism); // offset ctl counts + this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); } /** - * Returns the common pool instance. + * Returns the common pool instance. This pool is statically + * constructed; its run state is unaffected by attempts to {@link + * #shutdown} or {@link #shutdownNow}. However this pool and any + * ongoing processing are automatically terminated upon program + * {@link System#exit}. Any program that relies on asynchronous + * task processing to complete before program termination should + * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence}, + * before exit. * * @return the common pool instance + * @since 1.8 */ public static ForkJoinPool commonPool() { - // assert commonPool != null : "static init error"; - return commonPool; + // assert common != null : "static init error"; + return common; } // Execution methods @@ -2563,7 +2535,7 @@ public class ForkJoinPool extends Abstra if (task instanceof ForkJoinTask) // avoid re-wrap job = (ForkJoinTask) task; else - job = new ForkJoinTask.AdaptedRunnableAction(task); + job = new ForkJoinTask.RunnableExecuteAction(task); externalPush(job); } @@ -2630,27 +2602,23 @@ public class ForkJoinPool extends Abstra // In previous versions of this class, this method constructed // a task to run ForkJoinTask.invokeAll, but now external // invocation of multiple tasks is at least as efficient. - List> fs = new ArrayList>(tasks.size()); - // Workaround needed because method wasn't declared with - // wildcards in return type but should have been. - @SuppressWarnings({"unchecked", "rawtypes"}) - List> futures = (List>) (List) fs; + ArrayList> futures = new ArrayList>(tasks.size()); boolean done = false; try { for (Callable t : tasks) { ForkJoinTask f = new ForkJoinTask.AdaptedCallable(t); + futures.add(f); externalPush(f); - fs.add(f); } - for (ForkJoinTask f : fs) - f.quietlyJoin(); + for (int i = 0, size = futures.size(); i < size; i++) + ((ForkJoinTask)futures.get(i)).quietlyJoin(); done = true; return futures; } finally { if (!done) - for (ForkJoinTask f : fs) - f.cancel(false); + for (int i = 0, size = futures.size(); i < size; i++) + futures.get(i).cancel(false); } } @@ -2669,7 +2637,7 @@ public class ForkJoinPool extends Abstra * * @return the handler, or {@code null} if none */ - public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { + public UncaughtExceptionHandler getUncaughtExceptionHandler() { return ueh; } @@ -2679,16 +2647,18 @@ public class ForkJoinPool extends Abstra * @return the targeted parallelism level of this pool */ public int getParallelism() { - return config & SMASK; + int par; + return ((par = parallelism) > 0) ? par : 1; } /** * Returns the targeted parallelism level of the common pool. * * @return the targeted parallelism level of the common pool + * @since 1.8 */ public static int getCommonPoolParallelism() { - return commonPoolParallelism; + return commonParallelism; } /** @@ -2700,7 +2670,7 @@ public class ForkJoinPool extends Abstra * @return the number of worker threads */ public int getPoolSize() { - return (config & SMASK) + (short)(ctl >>> TC_SHIFT); + return parallelism + (short)(ctl >>> TC_SHIFT); } /** @@ -2710,7 +2680,7 @@ public class ForkJoinPool extends Abstra * @return {@code true} if this pool uses async mode */ public boolean getAsyncMode() { - return (config >>> 16) == FIFO_QUEUE; + return mode == FIFO_QUEUE; } /** @@ -2741,7 +2711,7 @@ public class ForkJoinPool extends Abstra * @return the number of active threads */ public int getActiveThreadCount() { - int r = (config & SMASK) + (int)(ctl >> AC_SHIFT); + int r = parallelism + (int)(ctl >> AC_SHIFT); return (r <= 0) ? 0 : r; // suppress momentarily negative values } @@ -2757,7 +2727,7 @@ public class ForkJoinPool extends Abstra * @return {@code true} if all threads are currently idle */ public boolean isQuiescent() { - return (int)(ctl >> AC_SHIFT) + (config & SMASK) == 0; + return parallelism + (int)(ctl >> AC_SHIFT) <= 0; } /** @@ -2834,7 +2804,7 @@ public class ForkJoinPool extends Abstra WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { for (int i = 0; i < ws.length; i += 2) { - if ((w = ws[i]) != null && w.queueSize() != 0) + if ((w = ws[i]) != null && !w.isEmpty()) return true; } } @@ -2920,7 +2890,7 @@ public class ForkJoinPool extends Abstra } } } - int pc = (config & SMASK); + int pc = parallelism; int tc = pc + (short)(c >>> TC_SHIFT); int ac = pc + (int)(c >> AC_SHIFT); if (ac < 0) // ignore transient negative @@ -2946,7 +2916,7 @@ public class ForkJoinPool extends Abstra * Possibly initiates an orderly shutdown in which previously * submitted tasks are executed, but no new tasks will be * accepted. Invocation has no effect on execution state if this - * is the {@link #commonPool}, and no additional effect if + * is the {@link #commonPool()}, and no additional effect if * already shut down. Tasks that are in the process of being * submitted concurrently during the course of this method may or * may not be rejected. @@ -2964,7 +2934,7 @@ public class ForkJoinPool extends Abstra /** * Possibly attempts to cancel and/or stop all tasks, and reject * all subsequently submitted tasks. Invocation has no effect on - * execution state if this is the {@link #commonPool}, and no + * execution state if this is the {@link #commonPool()}, and no * additional effect if already shut down. Otherwise, tasks that * are in the process of being submitted or executed concurrently * during the course of this method may or may not be @@ -2993,7 +2963,7 @@ public class ForkJoinPool extends Abstra public boolean isTerminated() { long c = ctl; return ((c & STOP_BIT) != 0L && - (short)(c >>> TC_SHIFT) == -(config & SMASK)); + (short)(c >>> TC_SHIFT) + parallelism <= 0); } /** @@ -3001,7 +2971,7 @@ public class ForkJoinPool extends Abstra * commenced but not yet completed. This method may be useful for * debugging. A return of {@code true} reported a sufficient * period after shutdown may indicate that submitted tasks have - * ignored or suppressed interruption, or are waiting for IO, + * ignored or suppressed interruption, or are waiting for I/O, * causing this executor not to properly terminate. (See the * advisory notes for class {@link ForkJoinTask} stating that * tasks should not normally entail blocking operations. But if @@ -3012,7 +2982,7 @@ public class ForkJoinPool extends Abstra public boolean isTerminating() { long c = ctl; return ((c & STOP_BIT) != 0L && - (short)(c >>> TC_SHIFT) != -(config & SMASK)); + (short)(c >>> TC_SHIFT) + parallelism > 0); } /** @@ -3027,9 +2997,10 @@ public class ForkJoinPool extends Abstra /** * Blocks until all tasks have completed execution after a * shutdown request, or the timeout occurs, or the current thread - * is interrupted, whichever happens first. Note that the {@link - * #commonPool()} never terminates until program shutdown so - * this method will always time out. + * is interrupted, whichever happens first. Because the {@link + * #commonPool()} never terminates until program shutdown, when + * applied to the common pool, this method is equivalent to {@link + * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument @@ -3039,22 +3010,82 @@ public class ForkJoinPool extends Abstra */ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + if (this == common) { + awaitQuiescence(timeout, unit); + return false; + } long nanos = unit.toNanos(timeout); if (isTerminated()) return true; - long startTime = System.nanoTime(); - boolean terminated = false; + if (nanos <= 0L) + return false; + long deadline = System.nanoTime() + nanos; synchronized (this) { - for (long waitTime = nanos, millis = 0L;;) { - if (terminated = isTerminated() || - waitTime <= 0L || - (millis = unit.toMillis(waitTime)) <= 0L) + for (;;) { + if (isTerminated()) + return true; + if (nanos <= 0L) + return false; + long millis = TimeUnit.NANOSECONDS.toMillis(nanos); + wait(millis > 0L ? millis : 1L); + nanos = deadline - System.nanoTime(); + } + } + } + + /** + * If called by a ForkJoinTask operating in this pool, equivalent + * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, + * waits and/or attempts to assist performing tasks until this + * pool {@link #isQuiescent} or the indicated timeout elapses. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return {@code true} if quiescent; {@code false} if the + * timeout elapsed. + */ + public boolean awaitQuiescence(long timeout, TimeUnit unit) { + long nanos = unit.toNanos(timeout); + ForkJoinWorkerThread wt; + Thread thread = Thread.currentThread(); + if ((thread instanceof ForkJoinWorkerThread) && + (wt = (ForkJoinWorkerThread)thread).pool == this) { + helpQuiescePool(wt.workQueue); + return true; + } + long startTime = System.nanoTime(); + WorkQueue[] ws; + int r = 0, m; + boolean found = true; + while (!isQuiescent() && (ws = workQueues) != null && + (m = ws.length - 1) >= 0) { + if (!found) { + if ((System.nanoTime() - startTime) > nanos) + return false; + Thread.yield(); // cannot block + } + found = false; + for (int j = (m + 1) << 2; j >= 0; --j) { + ForkJoinTask t; WorkQueue q; int b; + if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) { + found = true; + if ((t = q.pollAt(b)) != null) + t.doExec(); break; - wait(millis); - waitTime = nanos - (System.nanoTime() - startTime); + } } } - return terminated; + return true; + } + + /** + * Waits and/or attempts to assist performing tasks indefinitely + * until the {@link #commonPool()} {@link #isQuiescent}. + */ + static void quiesceCommonPool() { + common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } /** @@ -3066,9 +3097,9 @@ public class ForkJoinPool extends Abstra * not necessary. Method {@code block} blocks the current thread * if necessary (perhaps internally invoking {@code isReleasable} * before actually blocking). These actions are performed by any - * thread invoking {@link ForkJoinPool#managedBlock}. The - * unusual methods in this API accommodate synchronizers that may, - * but don't usually, block for long periods. Similarly, they + * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}. + * The unusual methods in this API accommodate synchronizers that + * may, but don't usually, block for long periods. Similarly, they * allow more efficient internal handling of cases in which * additional workers may be, but usually are not, needed to * ensure sufficient parallelism. Toward this end, @@ -3126,6 +3157,7 @@ public class ForkJoinPool extends Abstra /** * Returns {@code true} if blocking is unnecessary. + * @return {@code true} if blocking is unnecessary */ boolean isReleasable(); } @@ -3155,21 +3187,8 @@ public class ForkJoinPool extends Abstra Thread t = Thread.currentThread(); if (t instanceof ForkJoinWorkerThread) { ForkJoinPool p = ((ForkJoinWorkerThread)t).pool; - while (!blocker.isReleasable()) { // variant of helpSignal - WorkQueue[] ws; WorkQueue q; int m, n, u; - if ((ws = p.workQueues) != null && (m = ws.length - 1) >= 0) { - for (int i = 0; i <= m; ++i) { - if (blocker.isReleasable()) - return; - if ((q = ws[i]) != null && (n = q.queueSize()) > 0) { - p.signalWork(q, n); - if ((u = (int)(p.ctl >>> 32)) >= 0 || - (u >> UAC_SHIFT) >= 0) - break; - } - } - } - if (p.tryCompensate()) { + while (!blocker.isReleasable()) { + if (p.tryCompensate(p.ctl)) { try { do {} while (!blocker.isReleasable() && !blocker.block()); @@ -3207,10 +3226,11 @@ public class ForkJoinPool extends Abstra private static final long STEALCOUNT; private static final long PLOCK; private static final long INDEXSEED; + private static final long QBASE; private static final long QLOCK; static { - int s; // initialize field offsets for CAS etc + // initialize field offsets for CAS etc try { U = getUnsafe(); Class k = ForkJoinPool.class; @@ -3226,57 +3246,66 @@ public class ForkJoinPool extends Abstra PARKBLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); Class wk = WorkQueue.class; + QBASE = U.objectFieldOffset + (wk.getDeclaredField("base")); QLOCK = U.objectFieldOffset (wk.getDeclaredField("qlock")); Class ak = ForkJoinTask[].class; ABASE = U.arrayBaseOffset(ak); - s = U.arrayIndexScale(ak); - ASHIFT = 31 - Integer.numberOfLeadingZeros(s); + int scale = U.arrayIndexScale(ak); + if ((scale & (scale - 1)) != 0) + throw new Error("data type scale not a power of two"); + ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } - if ((s & (s-1)) != 0) - throw new Error("data type scale not a power of two"); submitters = new ThreadLocal(); - ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory = + defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); - /* - * Establish common pool parameters. For extra caution, - * computations to set up common pool state are here; the - * constructor just assigns these values to fields. - */ + modifyThreadPermission = new RuntimePermission("modifyThread"); - int par = 0; - Thread.UncaughtExceptionHandler handler = null; - try { // TBD: limit or report ignored exceptions? + common = java.security.AccessController.doPrivileged + (new java.security.PrivilegedAction() { + public ForkJoinPool run() { return makeCommonPool(); }}); + int par = common.parallelism; // report 1 even if threads disabled + commonParallelism = par > 0 ? par : 1; + } + + /** + * Creates and returns the common pool, respecting user settings + * specified via system properties. + */ + private static ForkJoinPool makeCommonPool() { + int parallelism = -1; + ForkJoinWorkerThreadFactory factory + = defaultForkJoinWorkerThreadFactory; + UncaughtExceptionHandler handler = null; + try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); - String hp = System.getProperty - ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); + String hp = System.getProperty + ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); + if (pp != null) + parallelism = Integer.parseInt(pp); if (fp != null) - fac = ((ForkJoinWorkerThreadFactory)ClassLoader. - getSystemClassLoader().loadClass(fp).newInstance()); + factory = ((ForkJoinWorkerThreadFactory)ClassLoader. + getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) - handler = ((Thread.UncaughtExceptionHandler)ClassLoader. + handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); - if (pp != null) - par = Integer.parseInt(pp); } catch (Exception ignore) { } - if (par <= 0) - par = Runtime.getRuntime().availableProcessors(); - if (par > MAX_CAP) - par = MAX_CAP; - commonPoolParallelism = par; - long np = (long)(-par); // precompute initial ctl value - long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); - - commonPool = new ForkJoinPool(par, ct, fac, handler); - modifyThreadPermission = new RuntimePermission("modifyThread"); + if (parallelism < 0 && // default 1 less than #cores + (parallelism = Runtime.getRuntime().availableProcessors() - 1) < 0) + parallelism = 0; + if (parallelism > MAX_CAP) + parallelism = MAX_CAP; + return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, + "ForkJoinPool.commonPool-worker-"); } /** @@ -3289,22 +3318,23 @@ public class ForkJoinPool extends Abstra private static sun.misc.Unsafe getUnsafe() { try { return sun.misc.Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security - .PrivilegedExceptionAction() { - public sun.misc.Unsafe run() throws Exception { - java.lang.reflect.Field f = sun.misc - .Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (sun.misc.Unsafe) f.get(null); - }}); - } catch (java.security.PrivilegedActionException e) { - throw new RuntimeException("Could not initialize intrinsics", - e.getCause()); - } + } catch (SecurityException tryReflectionInstead) {} + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + Class k = sun.misc.Unsafe.class; + for (java.lang.reflect.Field f : k.getDeclaredFields()) { + f.setAccessible(true); + Object x = f.get(null); + if (k.isInstance(x)) + return k.cast(x); + } + throw new NoSuchFieldError("the Unsafe"); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); } } - }