--- jsr166/src/jsr166y/ForkJoinPool.java 2012/01/31 01:51:13 1.122 +++ jsr166/src/jsr166y/ForkJoinPool.java 2012/02/20 18:20:06 1.123 @@ -5,7 +5,6 @@ */ package jsr166y; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -177,7 +176,10 @@ public class ForkJoinPool extends Abstra * If an attempted steal fails, a thief always chooses a different * random victim target to try next. So, in order for one thief to * progress, it suffices for any in-progress poll or new push on - * any empty queue to complete. + * any empty queue to complete. (This is why we normally use + * method pollAt and its variants that try once at the apparent + * base index, else consider alternative actions, rather than + * method poll.) * * This approach also enables support of a user mode in which local * task processing is in FIFO, not LIFO order, simply by using @@ -207,7 +209,8 @@ public class ForkJoinPool extends Abstra * lock (mainly to protect in the case of resizing) but we use * only a simple spinlock (using bits in field runState), because * submitters encountering a busy queue move on to try or create - * other queues, so never block. + * other queues -- they block only when creating and registering + * new queues. * * Management * ========== @@ -233,10 +236,7 @@ public class ForkJoinPool extends Abstra * deregister WorkQueues, as well as to enable shutdown. It is * only modified under a lock (normally briefly held, but * occasionally protecting allocations and resizings) but even - * when locked remains available to check consistency. An - * auxiliary field "growHints", also only modified under lock, - * contains a candidate index for the next WorkQueue and - * a mask for submission queue indices. + * when locked remains available to check consistency. * * Recording WorkQueues. WorkQueues are recorded in the * "workQueues" array that is created upon pool construction and @@ -248,12 +248,7 @@ public class ForkJoinPool extends Abstra * readers must tolerate null slots. Shared (submission) queues * are at even indices, worker queues at odd indices. Grouping * them together in this way simplifies and speeds up task - * scanning. To avoid flailing during start-up, the array is - * presized to hold twice #parallelism workers (which is unlikely - * to need further resizing during execution). But to avoid - * dealing with so many null slots, variable runState includes a - * mask for the nearest power of two that contains all currently - * used indices. + * scanning. * * All worker thread creation is on-demand, triggered by task * submissions, replacement of terminated workers, and/or @@ -385,8 +380,8 @@ public class ForkJoinPool extends Abstra * (http://portal.acm.org/citation.cfm?id=155354). It differs in * that: (1) We only maintain dependency links across workers upon * steals, rather than use per-task bookkeeping. This sometimes - * requires a linear scan of workQueues array to locate stealers, but - * often doesn't because stealers leave hints (that may become + * requires a linear scan of workQueues array to locate stealers, + * but often doesn't because stealers leave hints (that may become * stale/wrong) of where to locate them. A stealHint is only a * hint because a worker might have had multiple steals and the * hint records only one of them (usually the most current). @@ -397,22 +392,43 @@ public class ForkJoinPool extends Abstra * which means that we miss links in the chain during long-lived * tasks, GC stalls etc (which is OK since blocking in such cases * is usually a good idea). (4) We bound the number of attempts - * to find work (see MAX_HELP_DEPTH) and fall back to suspending - * the worker and if necessary replacing it with another. + * to find work (see MAX_HELP) and fall back to suspending the + * worker and if necessary replacing it with another. * * It is impossible to keep exactly the target parallelism number * of threads running at any given time. Determining the * existence of conservatively safe helping targets, the * availability of already-created spares, and the apparent need * to create new spares are all racy, so we rely on multiple - * retries of each. Currently, in keeping with on-demand - * signalling policy, we compensate only if blocking would leave - * less than one active (non-waiting, non-blocked) worker. - * Additionally, to avoid some false alarms due to GC, lagging - * counters, system activity, etc, compensated blocking for joins - * is only attempted after rechecks stabilize in - * ForkJoinTask.awaitJoin. (Retries are interspersed with - * Thread.yield, for good citizenship.) + * retries of each. Compensation in the apparent absence of + * helping opportunities is challenging to control on JVMs, where + * GC and other activities can stall progress of tasks that in + * turn stall out many other dependent tasks, without us being + * able to determine whether they will ever require compensation. + * Even though work-stealing otherwise encounters little + * degradation in the presence of more threads than cores, + * aggressively adding new threads in such cases entails risk of + * unwanted positive feedback control loops in which more threads + * cause more dependent stalls (as well as delayed progress of + * unblocked threads to the point that we know they are available) + * leading to more situations requiring more threads, and so + * on. This aspect of control can be seen as an (analytically + * intractible) game with an opponent that may choose the worst + * (for us) active thread to stall at any time. We take several + * precautions to bound losses (and thus bound gains), mainly in + * methods tryCompensate and awaitJoin: (1) We only try + * compensation after attempting enough helping steps (measured + * via counting and timing) that we have already consumed the + * estimated cost of creating and activating a new thread. (2) We + * allow up to 50% of threads to be blocked before initially + * adding any others, and unless completely saturated, check that + * some work is available for a new worker before adding. Also, we + * create up to only 50% more threads until entering a mode that + * only adds a thread if all others are possibly blocked. All + * together, this means that we might be half as fast to react, + * and create half as many threads as possible in the ideal case, + * but present vastly fewer anomalies in all other cases compared + * to both more aggressive and more conservative alternatives. * * Style notes: There is a lot of representation-level coupling * among classes ForkJoinPool, ForkJoinWorkerThread, and @@ -449,19 +465,6 @@ public class ForkJoinPool extends Abstra // Static utilities /** - * Computes an initial hash code (also serving as a non-zero - * random seed) for a thread id. This method is expected to - * provide higher-quality hash codes than using method hashCode(). - */ - static final int hashId(long id) { - int h = (int)id ^ (int)(id >>> 32); // Use MurmurHash of thread id - h ^= h >>> 16; h *= 0x85ebca6b; - h ^= h >>> 13; h *= 0xc2b2ae35; - h ^= h >>> 16; - return (h == 0) ? 1 : h; // ensure nonzero - } - - /** * If there is a security manager, makes sure caller has * permission to modify threads. */ @@ -592,10 +595,14 @@ public class ForkJoinPool extends Abstra static final class WorkQueue { /** * Capacity of work-stealing queue array upon initialization. - * Must be a power of two; at least 4, but set larger to - * reduce cacheline sharing among queues. + * Must be a power of two; at least 4, but should be larger to + * reduce or eliminate cacheline sharing among queues. + * Currently, it is much larger, as a partial workaround for + * the fact that JVMs often place arrays in locations that + * share GC bookkeeping (especially cardmarks) such that + * per-write accesses encounter serious memory contention. */ - static final int INITIAL_QUEUE_CAPACITY = 1 << 8; + static final int INITIAL_QUEUE_CAPACITY = 1 << 13; /** * Maximum size for queue arrays. Must be a power of two less @@ -619,43 +626,61 @@ public class ForkJoinPool extends Abstra volatile int base; // index of next slot for poll int top; // index of next slot for push ForkJoinTask[] array; // the elements (initially unallocated) + final ForkJoinPool pool; // the containing pool (may be null) final ForkJoinWorkerThread owner; // owning thread or null if shared volatile Thread parker; // == owner during call to park; else null ForkJoinTask currentJoin; // task being joined in awaitJoin ForkJoinTask currentSteal; // current non-local task being executed // Heuristic padding to ameliorate unfortunate memory placements - Object p00, p01, p02, p03, p04, p05, p06, p07, p08, p09, p0a; + Object p00, p01, p02, p03, p04, p05, p06, p07; + Object p08, p09, p0a, p0b, p0c, p0d, p0e; - WorkQueue(ForkJoinWorkerThread owner, int mode) { - this.owner = owner; + WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode) { this.mode = mode; + this.pool = pool; + this.owner = owner; // Place indices in the center of array (that is not yet allocated) base = top = INITIAL_QUEUE_CAPACITY >>> 1; } /** - * Returns number of tasks in the queue. + * Returns the approximate number of tasks in the queue. */ final int queueSize() { - int n = base - top; // non-owner callers must read base first - return (n >= 0) ? 0 : -n; + int n = base - top; // non-owner callers must read base first + return (n >= 0) ? 0 : -n; // ignore transient negative + } + + /** + * Provides a more accurate estimate of whether this queue has + * any tasks than does queueSize, by checking whether a + * near-empty queue has at least one unclaimed task. + */ + final boolean isEmpty() { + ForkJoinTask[] a; int m, s; + int n = base - (s = top); + return (n >= 0 || + (n == -1 && + ((a = array) == null || + (m = a.length - 1) < 0 || + U.getObjectVolatile + (a, ((m & (s - 1)) << ASHIFT) + ABASE) == null))); } /** * Pushes a task. Call only by owner in unshared queues. * * @param task the task. Caller must ensure non-null. - * @param p if non-null, pool to signal if necessary * @throw RejectedExecutionException if array cannot be resized */ - final void push(ForkJoinTask task, ForkJoinPool p) { - ForkJoinTask[] a; + final void push(ForkJoinTask task) { + ForkJoinTask[] a; ForkJoinPool p; int s = top, m, n; if ((a = array) != null) { // ignore if queue removed U.putOrderedObject (a, (((m = a.length - 1) & s) << ASHIFT) + ABASE, task); if ((n = (top = s + 1) - base) <= 2) { - if (p != null) + if ((p = pool) != null) p.signalWork(); } else if (n >= m) @@ -691,23 +716,6 @@ public class ForkJoinPool extends Abstra } /** - * Takes next task, if one exists, in FIFO order. - */ - final ForkJoinTask poll() { - ForkJoinTask[] a; int b; ForkJoinTask t; - while ((b = base) - top < 0 && (a = array) != null) { - int j = (((a.length - 1) & b) << ASHIFT) + ABASE; - if ((t = (ForkJoinTask)U.getObjectVolatile(a, j)) != null && - base == b && - U.compareAndSwapObject(a, j, t, null)) { - base = b + 1; - return t; - } - } - return null; - } - - /** * Takes next task, if one exists, in LIFO order. Call only * by owner in unshared queues. (We do not have a shared * version of this method because it is never needed.) @@ -730,6 +738,49 @@ public class ForkJoinPool extends Abstra } /** + * Takes a task in FIFO order if b is base of queue and a task + * can be claimed without contention. Specialized versions + * appear in ForkJoinPool methods scan and tryHelpStealer. + */ + final ForkJoinTask pollAt(int b) { + ForkJoinTask t; ForkJoinTask[] a; + if ((a = array) != null) { + int j = (((a.length - 1) & b) << ASHIFT) + ABASE; + if ((t = (ForkJoinTask)U.getObjectVolatile(a, j)) != null && + base == b && + U.compareAndSwapObject(a, j, t, null)) { + base = b + 1; + return t; + } + } + return null; + } + + /** + * Takes next task, if one exists, in FIFO order. + */ + final ForkJoinTask poll() { + ForkJoinTask[] a; int b; ForkJoinTask t; + while ((b = base) - top < 0 && (a = array) != null) { + int j = (((a.length - 1) & b) << ASHIFT) + ABASE; + t = (ForkJoinTask)U.getObjectVolatile(a, j); + if (t != null) { + if (base == b && + U.compareAndSwapObject(a, j, t, null)) { + base = b + 1; + return t; + } + } + else if (base == b) { + if (b + 1 == top) + break; + Thread.yield(); // wait for lagging update + } + } + return null; + } + + /** * Takes next task, if one exists, in order specified by mode. */ final ForkJoinTask nextLocalTask() { @@ -749,23 +800,6 @@ public class ForkJoinPool extends Abstra } /** - * Returns task at index b if b is current base of queue. - */ - final ForkJoinTask pollAt(int b) { - ForkJoinTask t; ForkJoinTask[] a; - if ((a = array) != null) { - int j = (((a.length - 1) & b) << ASHIFT) + ABASE; - if ((t = (ForkJoinTask)U.getObjectVolatile(a, j)) != null && - base == b && - U.compareAndSwapObject(a, j, t, null)) { - base = b + 1; - return t; - } - } - return null; - } - - /** * Pops the given task only if it is at the current top. */ final boolean tryUnpush(ForkJoinTask t) { @@ -892,8 +926,8 @@ public class ForkJoinPool extends Abstra * Computes next value for random probes. Scans don't require * a very high quality generator, but also not a crummy one. * Marsaglia xor-shift is cheap and works well enough. Note: - * This is manually inlined in several usages in ForkJoinPool - * to avoid writes inside busy scan loops. + * This is manually inlined in its usages in ForkJoinPool to + * avoid writes inside busy scan loops. */ final int nextSeed() { int r = seed; @@ -906,12 +940,46 @@ public class ForkJoinPool extends Abstra /** * Removes and runs tasks until empty, using local mode - * ordering. + * ordering. Normally called only after checking for apparent + * non-emptiness. */ final void runLocalTasks() { - if (base - top < 0) { - for (ForkJoinTask t; (t = nextLocalTask()) != null; ) - t.doExec(); + // hoist checks from repeated pop/poll + ForkJoinTask[] a; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { + if (mode == 0) { + for (int s; (s = top - 1) - base >= 0;) { + int j = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObjectVolatile(a, j); + if (t != null) { + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + t.doExec(); + } + } + else + break; + } + } + else { + for (int b; (b = base) - top < 0;) { + int j = ((m & b) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObjectVolatile(a, j); + if (t != null) { + if (base == b && + U.compareAndSwapObject(a, j, t, null)) { + base = b + 1; + t.doExec(); + } + } else if (base == b) { + if (b + 1 == top) + break; + Thread.yield(); // wait for lagging update + } + } + } } } @@ -926,11 +994,12 @@ public class ForkJoinPool extends Abstra if (t != null) { currentSteal = t; t.doExec(); - runLocalTasks(); + if (top != base) // conservative guard + runLocalTasks(); ++nsteals; currentSteal = null; } - else if (runState < 0) // terminating + else if (runState < 0) // terminating alive = false; return alive; } @@ -1005,10 +1074,22 @@ public class ForkJoinPool extends Abstra * submission queues in method doSubmit. In the future, this may * also incorporate a means to implement different task rejection * and resubmission policies. + * + * Seeds for submitters and workers/workQueues work in basically + * the same way but are initialized and updated using slightly + * different mechanics. Both are initialized using the same + * approach as in class ThreadLocal, where successive values are + * unlikely to collide with previous values. This is done during + * registration for workers, but requires a separate AtomicInteger + * for submitters. Seeds are then randomly modified upon + * collisions using xorshifts, which requires a non-zero seed. */ static final class Submitter { int seed; - Submitter() { seed = hashId(Thread.currentThread().getId()); } + Submitter() { + int s = nextSubmitterSeed.getAndAdd(SEED_INCREMENT); + seed = (s == 0) ? 1 : s; // ensure non-zero + } } /** ThreadLocal class for Submitters */ @@ -1031,6 +1112,12 @@ public class ForkJoinPool extends Abstra private static final AtomicInteger poolNumberGenerator; /** + * Generator for initial hashes/seeds for submitters. Accessed by + * Submitter class constructor. + */ + static final AtomicInteger nextSubmitterSeed; + + /** * Permission required for callers of methods that may start or * kill threads. */ @@ -1063,13 +1150,33 @@ public class ForkJoinPool extends Abstra private static final long SHRINK_TIMEOUT = SHRINK_RATE - (SHRINK_RATE / 10); /** - * The maximum stolen->joining link depth allowed in tryHelpStealer. - * Depths for legitimate chains are unbounded, but we use a fixed - * constant to avoid (otherwise unchecked) cycles and to bound - * staleness of traversal parameters at the expense of sometimes - * blocking when we could be helping. + * The maximum stolen->joining link depth allowed in method + * tryHelpStealer. Must be a power of two. This value also + * controls the maximum number of times to try to help join a task + * without any apparent progress or change in pool state before + * giving up and blocking (see awaitJoin). Depths for legitimate + * chains are unbounded, but we use a fixed constant to avoid + * (otherwise unchecked) cycles and to bound staleness of + * traversal parameters at the expense of sometimes blocking when + * we could be helping. + */ + private static final int MAX_HELP = 32; + + /** + * Secondary time-based bound (in nanosecs) for helping attempts + * before trying compensated blocking in awaitJoin. Used in + * conjunction with MAX_HELP to reduce variance due to different + * polling rates associated with different helping options. The + * value should roughly approximate the time required to create + * and/or activate a worker thread. + */ + private static final long COMPENSATION_DELAY = 100L * 1000L; // 0.1 millisec + + /** + * Increment for seed generators. See class ThreadLocal for + * explanation. */ - private static final int MAX_HELP_DEPTH = 16; + private static final int SEED_INCREMENT = 0x61c88647; /** * Bits and masks for control variables @@ -1101,15 +1208,12 @@ public class ForkJoinPool extends Abstra * * Field runState is an int packed with: * SHUTDOWN: true if shutdown is enabled (1 bit) - * SEQ: a sequence number updated upon (de)registering workers (15 bits) - * MASK: mask (power of 2 - 1) covering all registered poolIndexes (16 bits) + * SEQ: a sequence number updated upon (de)registering workers (30 bits) + * INIT: set true after workQueues array construction (1 bit) * - * The combination of mask and sequence number enables simple - * consistency checks: Staleness of read-only operations on the - * workQueues array can be checked by comparing runState before vs - * after the reads. The low 16 bits (i.e, anding with SMASK) hold - * the smallest power of two covering all indices, minus - * one. + * The sequence number enables simple consistency checks: + * Staleness of read-only operations on the workQueues array can + * be checked by comparing runState before vs after the reads. */ // bit positions/shifts for fields @@ -1119,8 +1223,8 @@ public class ForkJoinPool extends Abstra private static final int EC_SHIFT = 16; // bounds - private static final int POOL_MAX = 0x7fff; // max #workers - 1 private static final int SMASK = 0xffff; // short bits + private static final int MAX_CAP = 0x7fff; // max #workers - 1 private static final int SQMASK = 0xfffe; // even short bits private static final int SHORT_SIGN = 1 << 15; private static final int INT_SIGN = 1 << 31; @@ -1148,8 +1252,6 @@ public class ForkJoinPool extends Abstra // runState bits private static final int SHUTDOWN = 1 << 31; - private static final int RS_SEQ = 1 << 16; - private static final int RS_SEQ_MASK = 0x7fff0000; // access mode for WorkQueue static final int LIFO_QUEUE = 0; @@ -1168,8 +1270,9 @@ public class ForkJoinPool extends Abstra volatile long ctl; // main pool control final int parallelism; // parallelism level final int localMode; // per-worker scheduling mode - int growHints; // for expanding indices/ranges - volatile int runState; // shutdown status, seq, and mask + final int submitMask; // submit queue index bound + int nextSeed; // for initializing worker seeds + volatile int runState; // shutdown status and seq WorkQueue[] workQueues; // main registry final Mutex lock; // for registration final Condition termination; // for awaitTermination @@ -1179,7 +1282,7 @@ public class ForkJoinPool extends Abstra final AtomicInteger nextWorkerNumber; // to create worker name string final String workerNamePrefix; // to create worker name string - // Creating, registering, deregistering and running workers + // Creating, registering, and deregistering workers /** * Tries to create and start a worker @@ -1210,34 +1313,33 @@ public class ForkJoinPool extends Abstra } /** - * Callback from ForkJoinWorkerThread constructor to establish and - * record its WorkQueue. + * Callback from ForkJoinWorkerThread constructor to establish its + * poolIndex and record its WorkQueue. To avoid scanning bias due + * to packing entries in front of the workQueues array, we treat + * the array as a simple power-of-two hash table using per-thread + * seed as hash, expanding as needed. * - * @param wt the worker thread + * @param w the worker's queue */ - final void registerWorker(ForkJoinWorkerThread wt) { - WorkQueue w = wt.workQueue; + final void registerWorker(WorkQueue w) { Mutex lock = this.lock; lock.lock(); try { - int g = growHints, k = g & SMASK; WorkQueue[] ws = workQueues; - if (ws != null) { // ignore on shutdown - int n = ws.length; - if ((k & 1) == 0 || k >= n || ws[k] != null) { - for (k = 1; k < n && ws[k] != null; k += 2) - ; // workers are at odd indices - if (k >= n) // resize - workQueues = ws = Arrays.copyOf(ws, n << 1); - } - w.eventCount = w.poolIndex = k; // establish before recording - ws[k] = w; - growHints = (g & ~SMASK) | ((k + 2) & SMASK); - int rs = runState; - int m = rs & SMASK; // recalculate runState mask - if (k > m) - m = (m << 1) + 1; - runState = (rs & SHUTDOWN) | ((rs + RS_SEQ) & RS_SEQ_MASK) | m; + if (w != null && ws != null) { // skip on shutdown/failure + int rs, n; + while ((n = ws.length) < // ensure can hold total + (parallelism + (short)(ctl >>> TC_SHIFT) << 1)) + workQueues = ws = Arrays.copyOf(ws, n << 1); + int m = n - 1; + int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence + w.seed = (s == 0) ? 1 : s; // ensure non-zero seed + int r = (s << 1) | 1; // use odd-numbered indices + while (ws[r &= m] != null) // step by approx half size + r += ((n >>> 1) & SQMASK) + 2; + w.eventCount = w.poolIndex = r; // establish before recording + ws[r] = w; // also update seq + runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN); } } finally { lock.unlock(); @@ -1254,19 +1356,17 @@ public class ForkJoinPool extends Abstra * @param ex the exception causing failure, or null if none */ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { + Mutex lock = this.lock; WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { w.runState = -1; // ensure runState is set stealCount.getAndAdd(w.totalSteals + w.nsteals); int idx = w.poolIndex; - Mutex lock = this.lock; lock.lock(); try { // remove record from array WorkQueue[] ws = workQueues; - if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w) { + if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w) ws[idx] = null; - growHints = (growHints & ~SMASK) | idx; - } } finally { lock.unlock(); } @@ -1290,73 +1390,51 @@ public class ForkJoinPool extends Abstra U.throwException(ex); } - /** - * Top-level runloop for workers, called by ForkJoinWorkerThread.run. - */ - final void runWorker(ForkJoinWorkerThread wt) { - // Initialize queue array and seed in this thread - WorkQueue w = wt.workQueue; - w.growArray(false); - w.seed = hashId(Thread.currentThread().getId()); - - do {} while (w.runTask(scan(w))); - } // Submissions /** * Unless shutting down, adds the given task to a submission queue * at submitter's current queue index (modulo submission - * range). If no queue exists at the index, one is created unless - * pool lock is busy. If the queue and/or lock are busy, another - * index is randomly chosen. The mask in growHints controls the - * effective index range of queues considered. The mask is - * expanded, up to the current workerQueue mask, upon any detected - * contention but otherwise remains small to avoid needlessly - * creating queues when there is no contention. + * range). If no queue exists at the index, one is created. If + * the queue is busy, another index is randomly chosen. The + * submitMask bounds the effective number of queues to the + * (nearest poswer of two for) parallelism level. + * + * @param task the task. Caller must ensure non-null. */ private void doSubmit(ForkJoinTask task) { - if (task == null) - throw new NullPointerException(); Submitter s = submitters.get(); - for (int r = s.seed, m = growHints >>> 16;;) { - WorkQueue[] ws; WorkQueue q; Mutex lk; + for (int r = s.seed, m = submitMask;;) { + WorkQueue[] ws; WorkQueue q; int k = r & m & SQMASK; // use only even indices if (runState < 0 || (ws = workQueues) == null || ws.length <= k) throw new RejectedExecutionException(); // shutting down - if ((q = ws[k]) == null && (lk = lock).tryAcquire(0)) { - try { // try to create new queue - if (ws == workQueues && (q = ws[k]) == null) { - int rs; // update runState seq - ws[k] = q = new WorkQueue(null, SHARED_QUEUE); - runState = (((rs = runState) & SHUTDOWN) | - ((rs + RS_SEQ) & ~SHUTDOWN)); + else if ((q = ws[k]) == null) { // create new queue + WorkQueue nq = new WorkQueue(this, null, SHARED_QUEUE); + Mutex lock = this.lock; // construct outside lock + lock.lock(); + try { // recheck under lock + int rs = runState; // to update seq + if (ws == workQueues && ws[k] == null) { + ws[k] = nq; + runState = ((rs & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN)); } } finally { - lk.unlock(); + lock.unlock(); } } - if (q != null) { - if (q.trySharedPush(task)) { - signalWork(); - return; - } - else if (m < parallelism - 1 && m < (runState & SMASK)) { - Mutex lock = this.lock; - lock.lock(); // block until lock free - int g = growHints; - if (g >>> 16 == m) // expand range - growHints = (((m << 1) + 1) << 16) | (g & SMASK); - lock.unlock(); // no need for try/finally - } - else if ((r & m) == 0) - Thread.yield(); // occasionally yield if busy - } - if (m == (m = growHints >>> 16)) { - r ^= r << 13; // update seed unless new range - r ^= r >>> 17; // same xorshift as WorkQueues + else if (q.trySharedPush(task)) { + signalWork(); + return; + } + else if (m > 1) { // move to a different index + r ^= r << 13; // same xorshift as WorkQueues + r ^= r >>> 17; s.seed = r ^= r << 5; } + else + Thread.yield(); // yield if no alternatives } } @@ -1405,66 +1483,33 @@ public class ForkJoinPool extends Abstra } } + + // Scanning for tasks + /** - * Tries to decrement active count (sometimes implicitly) and - * possibly release or create a compensating worker in preparation - * for blocking. Fails on contention or termination. - * - * @return true if the caller can block, else should recheck and retry + * Top-level runloop for workers, called by ForkJoinWorkerThread.run. */ - final boolean tryCompensate() { - WorkQueue w; Thread p; - int pc = parallelism, e, u, ac, tc, i; - long c = ctl; - WorkQueue[] ws = workQueues; - if ((e = (int)c) >= 0) { - if ((ac = ((u = (int)(c >>> 32)) >> UAC_SHIFT)) <= 0 && - e != 0 && ws != null && (i = e & SMASK) < ws.length && - (w = ws[i]) != null) { - long nc = (long)(w.nextWait & E_MASK) | (c & (AC_MASK|TC_MASK)); - if (w.eventCount == (e | INT_SIGN) && - U.compareAndSwapLong(this, CTL, c, nc)) { - w.eventCount = (e + E_SEQ) & E_MASK; - if ((p = w.parker) != null) - U.unpark(p); - return true; // release an idle worker - } - } - else if ((tc = (short)(u >>> UTC_SHIFT)) >= 0 && ac + pc > 1) { - long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK); - if (U.compareAndSwapLong(this, CTL, c, nc)) - return true; // no compensation needed - } - else if (tc + pc < POOL_MAX) { - long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); - if (U.compareAndSwapLong(this, CTL, c, nc)) { - addWorker(); - return true; // create replacement - } - } - } - return false; + final void runWorker(WorkQueue w) { + w.growArray(false); // initialize queue array in this thread + do {} while (w.runTask(scan(w))); } - // Scanning for tasks - /** * Scans for and, if found, returns one task, else possibly * inactivates the worker. This method operates on single reads of - * volatile state and is designed to be re-invoked continuously in - * part because it returns upon detecting inconsistencies, + * volatile state and is designed to be re-invoked continuously, + * in part because it returns upon detecting inconsistencies, * contention, or state changes that indicate possible success on * re-invocation. * - * The scan searches for tasks across queues, randomly selecting - * the first #queues probes, favoring steals over submissions - * (by exploiting even/odd indexing), and then performing a - * circular sweep of all queues. The scan terminates upon either - * finding a non-empty queue, or completing a full sweep. If the - * worker is not inactivated, it takes and returns a task from - * this queue. On failure to find a task, we take one of the - * following actions, after which the caller will retry calling - * this method unless terminated. + * The scan searches for tasks across a random permutation of + * queues (starting at a random index and stepping by a random + * relative prime, checking each at least once). The scan + * terminates upon either finding a non-empty queue, or completing + * the sweep. If the worker is not inactivated, it takes and + * returns a task from this queue. On failure to find a task, we + * take one of the following actions, after which the caller will + * retry calling this method unless terminated. * * * If pool is terminating, terminate the worker. * @@ -1475,94 +1520,94 @@ public class ForkJoinPool extends Abstra * another worker, but with same net effect. Releasing in other * cases as well ensures that we have enough workers running. * - * * If the caller has run a task since the last empty scan, - * return (to allow rescan) if other workers are not also yet - * enqueued. Field WorkQueue.rescans counts down on each scan to - * ensure eventual inactivation and blocking. - * * * If not already enqueued, try to inactivate and enqueue the - * worker on wait queue. + * worker on wait queue. Or, if inactivating has caused the pool + * to be quiescent, relay to idleAwaitWork to check for + * termination and possibly shrink pool. + * + * * If already inactive, and the caller has run a task since the + * last empty scan, return (to allow rescan) unless others are + * also inactivated. Field WorkQueue.rescans counts down on each + * scan to ensure eventual inactivation and blocking. * - * * If already enqueued and none of the above apply, either park - * awaiting signal, or if this is the most recent waiter and pool - * is quiescent, relay to idleAwaitWork to check for termination - * and possibly shrink pool. + * * If already enqueued and none of the above apply, park + * awaiting signal, * * @param w the worker (via its WorkQueue) * @return a task or null of none found */ private final ForkJoinTask scan(WorkQueue w) { - boolean swept = false; // true after full empty scan - WorkQueue[] ws; // volatile read order matters - int r = w.seed, ec = w.eventCount; // ec is negative if inactive - int rs = runState, m = rs & SMASK; - if ((ws = workQueues) != null && ws.length > m) { // consistency check - for (int k = 0, j = -1 - m; ; ++j) { - WorkQueue q; int b; - if (j < 0) { // random probes while j negative - r ^= r << 13; r ^= r >>> 17; k = (r ^= r << 5) | (j & 1); - } // worker (not submit) for odd j - else // cyclic scan when j >= 0 - k += 7; // step 7 reduces array packing bias - if ((q = ws[k & m]) != null && (b = q.base) - q.top < 0) { - ForkJoinTask t = (ec >= 0) ? q.pollAt(b) : null; - w.seed = r; // save seed for next scan - if (t != null) + WorkQueue[] ws; // first update random seed + int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; + int rs = runState, m; // volatile read order matters + if ((ws = workQueues) != null && (m = ws.length - 1) > 0) { + int ec = w.eventCount; // ec is negative if inactive + int step = (r >>> 16) | 1; // relative prime + for (int j = (m + 1) << 2; ; r += step) { + WorkQueue q; ForkJoinTask t; ForkJoinTask[] a; int b; + if ((q = ws[r & m]) != null && (b = q.base) - q.top < 0 && + (a = q.array) != null) { // probably nonempty + int i = (((a.length - 1) & b) << ASHIFT) + ABASE; + t = (ForkJoinTask)U.getObjectVolatile(a, i); + if (q.base == b && ec >= 0 && t != null && + U.compareAndSwapObject(a, i, t, null)) { + q.base = b + 1; // specialization of pollAt return t; - break; + } + else if ((t != null || b + 1 != q.top) && + (ec < 0 || j <= m)) { + rs = 0; // mark scan as imcomplete + break; // caller can retry after release + } } - else if (j - m > m) { - if (rs == runState) // staleness check - swept = true; + if (--j < 0) break; - } } - - // Decode ctl on empty scan long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns; - if (e < 0) // pool is terminating - w.runState = -1; - else if (!swept) { // try to release a waiter - WorkQueue v; Thread p; - if (e > 0 && a < 0 && (v = ws[e & m]) != null && - v.eventCount == (e | INT_SIGN)) { + if (e < 0) // decode ctl on empty scan + w.runState = -1; // pool is terminating + else if (rs == 0 || rs != runState) { // incomplete scan + WorkQueue v; Thread p; // try to release a waiter + if (e > 0 && a < 0 && w.eventCount == ec && + (v = ws[e & m]) != null && v.eventCount == (e | INT_SIGN)) { long nc = ((long)(v.nextWait & E_MASK) | ((c + AC_UNIT) & (AC_MASK|TC_MASK))); - if (U.compareAndSwapLong(this, CTL, c, nc)) { + if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) { v.eventCount = (e + E_SEQ) & E_MASK; if ((p = v.parker) != null) U.unpark(p); } } } - else if ((nr = w.rescans) > 0) { // continue rescanning - int ac = a + parallelism; - if (((w.rescans = (ac < nr) ? ac : nr - 1) & 3) == 0 && - w.eventCount == ec) - Thread.yield(); // occasionally yield - } - else if (ec >= 0) { // try to enqueue + else if (ec >= 0) { // try to enqueue/inactivate long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); w.nextWait = e; - w.eventCount = ec | INT_SIGN;// mark as inactive - if (!U.compareAndSwapLong(this, CTL, c, nc)) - w.eventCount = ec; // unmark on CAS failure - else if ((ns = w.nsteals) != 0) { - w.nsteals = 0; // set rescans if ran task - w.rescans = a + parallelism; - w.totalSteals += ns; + w.eventCount = ec | INT_SIGN; // mark as inactive + if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc)) + w.eventCount = ec; // unmark on CAS failure + else { + if ((ns = w.nsteals) != 0) { + w.nsteals = 0; // set rescans if ran task + w.rescans = (a > 0)? 0 : a + parallelism; + w.totalSteals += ns; + } + if (a == 1 - parallelism) // quiescent + idleAwaitWork(w, nc, c); } } - else { // already queued - if (parallelism == -a) - idleAwaitWork(w); // quiescent - if (w.eventCount == ec) { - Thread.interrupted(); // clear status - ForkJoinWorkerThread wt = w.owner; + else if (w.eventCount < 0) { // already queued + if ((nr = w.rescans) > 0) { // continue rescanning + int ac = a + parallelism; + if (((w.rescans = (ac < nr) ? ac : nr - 1) & 3) == 0) + Thread.yield(); // yield before block + } + else { + Thread.interrupted(); // clear status + Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); - w.parker = wt; // emulate LockSupport.park - if (w.eventCount == ec) // recheck - U.park(false, 0L); // block + w.parker = wt; // emulate LockSupport.park + if (w.eventCount < 0) // recheck + U.park(false, 0L); w.parker = null; U.putObject(wt, PARKBLOCKER, null); } @@ -1572,39 +1617,37 @@ public class ForkJoinPool extends Abstra } /** - * If inactivating worker w has caused pool to become quiescent, - * checks for pool termination, and, so long as this is not the - * only worker, waits for event for up to SHRINK_RATE nanosecs. - * On timeout, if ctl has not changed, terminates the worker, - * which will in turn wake up another worker to possibly repeat - * this process. + * If inactivating worker w has caused the pool to become + * quiescent, checks for pool termination, and, so long as this is + * not the only worker, waits for event for up to SHRINK_RATE + * nanosecs. On timeout, if ctl has not changed, terminates the + * worker, which will in turn wake up another worker to possibly + * repeat this process. * * @param w the calling worker + * @param currentCtl the ctl value triggering possible quiescence + * @param prevCtl the ctl value to restore if thread is terminated */ - private void idleAwaitWork(WorkQueue w) { - long c; int nw, ec; - if (!tryTerminate(false, false) && - (int)((c = ctl) >> AC_SHIFT) + parallelism == 0 && - (ec = w.eventCount) == ((int)c | INT_SIGN) && - (nw = w.nextWait) != 0) { - long nc = ((long)(nw & E_MASK) | // ctl to restore on timeout - ((c + AC_UNIT) & AC_MASK) | (c & TC_MASK)); - ForkJoinWorkerThread wt = w.owner; - while (ctl == c) { + private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) { + if (w.eventCount < 0 && !tryTerminate(false, false) && + (int)prevCtl != 0 && ctl == currentCtl) { + Thread wt = Thread.currentThread(); + Thread.yield(); // yield before block + while (ctl == currentCtl) { long startTime = System.nanoTime(); Thread.interrupted(); // timed variant of version in scan() U.putObject(wt, PARKBLOCKER, this); w.parker = wt; - if (ctl == c) + if (ctl == currentCtl) U.park(false, SHRINK_RATE); w.parker = null; U.putObject(wt, PARKBLOCKER, null); - if (ctl != c) + if (ctl != currentCtl) break; if (System.nanoTime() - startTime >= SHRINK_TIMEOUT && - U.compareAndSwapLong(this, CTL, c, nc)) { - w.eventCount = (ec + E_SEQ) | E_MASK; - w.runState = -1; // shrink + U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) { + w.eventCount = (w.eventCount + E_SEQ) | E_MASK; + w.runState = -1; // shrink break; } } @@ -1622,32 +1665,31 @@ public class ForkJoinPool extends Abstra * leaves hints in workers to speed up subsequent calls. The * implementation is very branchy to cope with potential * inconsistencies or loops encountering chains that are stale, - * unknown, or of length greater than MAX_HELP_DEPTH links. All - * of these cases are dealt with by just retrying by caller. + * unknown, or so long that they are likely cyclic. All of these + * cases are dealt with by just retrying by caller. * * @param joiner the joining worker * @param task the task to join * @return true if found or ran a task (and so is immediately retryable) */ - final boolean tryHelpStealer(WorkQueue joiner, ForkJoinTask task) { - ForkJoinTask subtask; // current target + private boolean tryHelpStealer(WorkQueue joiner, ForkJoinTask task) { + WorkQueue[] ws; + int m, depth = MAX_HELP; // remaining chain depth boolean progress = false; - int depth = 0; // current chain depth - int m = runState & SMASK; - WorkQueue[] ws = workQueues; - - if (ws != null && ws.length > m && (subtask = task).status >= 0) { - outer:for (WorkQueue j = joiner;;) { - // Try to find the stealer of subtask, by first using hint - WorkQueue stealer = null; - WorkQueue v = ws[j.stealHint & m]; + if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && + task.status >= 0) { + ForkJoinTask subtask = task; // current target + outer: for (WorkQueue j = joiner;;) { + WorkQueue stealer = null; // find stealer of subtask + WorkQueue v = ws[j.stealHint & m]; // try hint if (v != null && v.currentSteal == subtask) stealer = v; - else { + else { // scan for (int i = 1; i <= m; i += 2) { - if ((v = ws[i]) != null && v.currentSteal == subtask) { + if ((v = ws[i]) != null && v.currentSteal == subtask && + v != joiner) { stealer = v; - j.stealHint = i; // save hint + j.stealHint = i; // save hint break; } } @@ -1655,24 +1697,30 @@ public class ForkJoinPool extends Abstra break; } - for (WorkQueue q = stealer;;) { // Try to help stealer - ForkJoinTask t; int b; + for (WorkQueue q = stealer;;) { // try to help stealer + ForkJoinTask[] a; ForkJoinTask t; int b; if (task.status < 0) break outer; - if ((b = q.base) - q.top < 0) { + if ((b = q.base) - q.top < 0 && (a = q.array) != null) { progress = true; - if (subtask.status < 0) - break outer; // stale - if ((t = q.pollAt(b)) != null) { - stealer.stealHint = joiner.poolIndex; + int i = (((a.length - 1) & b) << ASHIFT) + ABASE; + t = (ForkJoinTask)U.getObjectVolatile(a, i); + if (subtask.status < 0) // must recheck before taking + break outer; + if (t != null && + q.base == b && + U.compareAndSwapObject(a, i, t, null)) { + q.base = b + 1; joiner.runSubtask(t); } + else if (q.base == b) + break outer; // possibly stalled } - else { // empty - try to descend to find stealer's stealer + else { // descend ForkJoinTask next = stealer.currentJoin; - if (++depth == MAX_HELP_DEPTH || subtask.status < 0 || + if (--depth <= 0 || subtask.status < 0 || next == null || next == subtask) - break outer; // max depth, stale, dead-end, cyclic + break outer; // stale, dead-end, or cyclic subtask = next; j = stealer; break; @@ -1689,11 +1737,10 @@ public class ForkJoinPool extends Abstra * @param joiner the joining worker * @param task the task */ - final void tryPollForAndExec(WorkQueue joiner, ForkJoinTask task) { + private void tryPollForAndExec(WorkQueue joiner, ForkJoinTask task) { WorkQueue[] ws; - int m = runState & SMASK; - if ((ws = workQueues) != null && ws.length > m) { - for (int j = 1; j <= m && task.status >= 0; j += 2) { + if ((ws = workQueues) != null) { + for (int j = 1; j < ws.length && task.status >= 0; j += 2) { WorkQueue q = ws[j]; if (q != null && q.pollFor(task)) { joiner.runSubtask(task); @@ -1704,30 +1751,164 @@ public class ForkJoinPool extends Abstra } /** - * Returns a non-empty steal queue, if one is found during a random, - * then cyclic scan, else null. This method must be retried by - * caller if, by the time it tries to use the queue, it is empty. + * Tries to decrement active count (sometimes implicitly) and + * possibly release or create a compensating worker in preparation + * for blocking. Fails on contention or termination. Otherwise, + * adds a new thread if no idle workers are available and either + * pool would become completely starved or: (at least half + * starved, and fewer than 50% spares exist, and there is at least + * one task apparently available). Even though the availablity + * check requires a full scan, it is worthwhile in reducing false + * alarms. + * + * @param task if nonnull, a task being waited for + * @param blocker if nonnull, a blocker being waited for + * @return true if the caller can block, else should recheck and retry + */ + final boolean tryCompensate(ForkJoinTask task, ManagedBlocker blocker) { + int pc = parallelism, e; + long c = ctl; + WorkQueue[] ws = workQueues; + if ((e = (int)c) >= 0 && ws != null) { + int u, a, ac, hc; + int tc = (short)((u = (int)(c >>> 32)) >>> UTC_SHIFT) + pc; + boolean replace = false; + if ((a = u >> UAC_SHIFT) <= 0) { + if ((ac = a + pc) <= 1) + replace = true; + else if ((e > 0 || (task != null && + ac <= (hc = pc >>> 1) && tc < pc + hc))) { + WorkQueue w; + for (int j = 0; j < ws.length; ++j) { + if ((w = ws[j]) != null && !w.isEmpty()) { + replace = true; + break; // in compensation range and tasks available + } + } + } + } + if ((task == null || task.status >= 0) && // recheck need to block + (blocker == null || !blocker.isReleasable()) && ctl == c) { + if (!replace) { // no compensation + long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK); + if (U.compareAndSwapLong(this, CTL, c, nc)) + return true; + } + else if (e != 0) { // release an idle worker + WorkQueue w; Thread p; int i; + if ((i = e & SMASK) < ws.length && (w = ws[i]) != null) { + long nc = ((long)(w.nextWait & E_MASK) | + (c & (AC_MASK|TC_MASK))); + if (w.eventCount == (e | INT_SIGN) && + U.compareAndSwapLong(this, CTL, c, nc)) { + w.eventCount = (e + E_SEQ) & E_MASK; + if ((p = w.parker) != null) + U.unpark(p); + return true; + } + } + } + else if (tc < MAX_CAP) { // create replacement + long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); + if (U.compareAndSwapLong(this, CTL, c, nc)) { + addWorker(); + return true; + } + } + } + } + return false; + } + + /** + * Helps and/or blocks until the given task is done + * + * @param joiner the joining worker + * @param task the task + * @return task status on exit + */ + final int awaitJoin(WorkQueue joiner, ForkJoinTask task) { + ForkJoinTask prevJoin = joiner.currentJoin; + joiner.currentJoin = task; + long startTime = 0L; + for (int k = 0, s; ; ++k) { + if ((joiner.isEmpty() ? // try to help + !tryHelpStealer(joiner, task) : + !joiner.tryRemoveAndExec(task))) { + if (k == 0) { + startTime = System.nanoTime(); + tryPollForAndExec(joiner, task); // check uncommon case + } + else if ((k & (MAX_HELP - 1)) == 0 && + System.nanoTime() - startTime >= COMPENSATION_DELAY && + tryCompensate(task, null)) { + if (task.trySetSignal() && task.status >= 0) { + synchronized (task) { + if (task.status >= 0) { + try { // see ForkJoinTask + task.wait(); // for explanation + } catch (InterruptedException ie) { + } + } + else + task.notifyAll(); + } + } + long c; // re-activate + do {} while (!U.compareAndSwapLong + (this, CTL, c = ctl, c + AC_UNIT)); + } + } + if ((s = task.status) < 0) { + joiner.currentJoin = prevJoin; + return s; + } + else if ((k & (MAX_HELP - 1)) == MAX_HELP >>> 1) + Thread.yield(); // for politeness + } + } + + /** + * Stripped-down variant of awaitJoin used by timed joins. Tries + * to help join only while there is continuous progress. (Caller + * will then enter a timed wait.) + * + * @param joiner the joining worker + * @param task the task + * @return task status on exit + */ + final int helpJoinOnce(WorkQueue joiner, ForkJoinTask task) { + int s; + while ((s = task.status) >= 0 && + (joiner.isEmpty() ? + tryHelpStealer(joiner, task) : + joiner.tryRemoveAndExec(task))) + ; + return s; + } + + /** + * Returns a (probably) non-empty steal queue, if one is found + * during a random, then cyclic scan, else null. This method must + * be retried by caller if, by the time it tries to use the queue, + * it is empty. */ private WorkQueue findNonEmptyStealQueue(WorkQueue w) { - int r = w.seed; // Same idea as scan(), but ignoring submissions + // Similar to loop in scan(), but ignoring submissions + int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; + int step = (r >>> 16) | 1; for (WorkQueue[] ws;;) { - int m = runState & SMASK; - if ((ws = workQueues) == null) + int rs = runState, m; + if ((ws = workQueues) == null || (m = ws.length - 1) < 1) return null; - if (ws.length > m) { - WorkQueue q; - for (int k = 0, j = -1 - m;; ++j) { - if (j < 0) { - r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5; - } - else - k += 7; - if ((q = ws[(k | 1) & m]) != null && q.base - q.top < 0) { - w.seed = r; - return q; - } - else if (j - m > m) + for (int j = (m + 1) << 2; ; r += step) { + WorkQueue q = ws[((r << 1) | 1) & m]; + if (q != null && !q.isEmpty()) + return q; + else if (--j < 0) { + if (runState == rs) return null; + break; } } } @@ -1741,17 +1922,18 @@ public class ForkJoinPool extends Abstra */ final void helpQuiescePool(WorkQueue w) { for (boolean active = true;;) { - w.runLocalTasks(); // exhaust local queue + if (w.base - w.top < 0) + w.runLocalTasks(); // exhaust local queue WorkQueue q = findNonEmptyStealQueue(w); if (q != null) { - ForkJoinTask t; + ForkJoinTask t; int b; if (!active) { // re-establish active count long c; active = true; do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, c + AC_UNIT)); } - if ((t = q.poll()) != null) + if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) w.runSubtask(t); } else { @@ -1779,12 +1961,12 @@ public class ForkJoinPool extends Abstra */ final ForkJoinTask nextTaskFor(WorkQueue w) { for (ForkJoinTask t;;) { - WorkQueue q; + WorkQueue q; int b; if ((t = w.nextLocalTask()) != null) return t; if ((q = findNonEmptyStealQueue(w)) == null) return null; - if ((t = q.poll()) != null) + if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) return t; } } @@ -1959,30 +2141,31 @@ public class ForkJoinPool extends Abstra checkPermission(); if (factory == null) throw new NullPointerException(); - if (parallelism <= 0 || parallelism > POOL_MAX) + if (parallelism <= 0 || parallelism > MAX_CAP) throw new IllegalArgumentException(); this.parallelism = parallelism; this.factory = factory; this.ueh = handler; this.localMode = asyncMode ? FIFO_QUEUE : LIFO_QUEUE; - this.growHints = 1; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); - // initialize workQueues array with room for 2*parallelism if possible - int n = parallelism << 1; - if (n >= POOL_MAX) - n = POOL_MAX; - else { // See Hackers Delight, sec 3.2, where n < (1 << 16) - n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; - } - this.workQueues = new WorkQueue[(n + 1) << 1]; // #slots = 2 * #workers + // Use nearest power 2 for workQueues size. See Hackers Delight sec 3.2. + int n = parallelism - 1; + n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; + int size = (n + 1) << 1; // #slots = 2*#workers + this.submitMask = size - 1; // room for max # of submit queues + this.workQueues = new WorkQueue[size]; this.termination = (this.lock = new Mutex()).newCondition(); this.stealCount = new AtomicLong(); this.nextWorkerNumber = new AtomicInteger(); + int pn = poolNumberGenerator.incrementAndGet(); StringBuilder sb = new StringBuilder("ForkJoinPool-"); - sb.append(poolNumberGenerator.incrementAndGet()); + sb.append(Integer.toString(pn)); sb.append("-worker-"); this.workerNamePrefix = sb.toString(); + lock.lock(); + this.runState = 1; // set init flag + lock.unlock(); } // Execution methods @@ -2004,6 +2187,8 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public T invoke(ForkJoinTask task) { + if (task == null) + throw new NullPointerException(); doSubmit(task); return task.join(); } @@ -2017,6 +2202,8 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public void execute(ForkJoinTask task) { + if (task == null) + throw new NullPointerException(); doSubmit(task); } @@ -2034,7 +2221,7 @@ public class ForkJoinPool extends Abstra if (task instanceof ForkJoinTask) // avoid re-wrap job = (ForkJoinTask) task; else - job = ForkJoinTask.adapt(task, null); + job = new ForkJoinTask.AdaptedRunnableAction(task); doSubmit(job); } @@ -2048,6 +2235,8 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public ForkJoinTask submit(ForkJoinTask task) { + if (task == null) + throw new NullPointerException(); doSubmit(task); return task; } @@ -2058,9 +2247,7 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public ForkJoinTask submit(Callable task) { - if (task == null) - throw new NullPointerException(); - ForkJoinTask job = ForkJoinTask.adapt(task); + ForkJoinTask job = new ForkJoinTask.AdaptedCallable(task); doSubmit(job); return job; } @@ -2071,9 +2258,7 @@ public class ForkJoinPool extends Abstra * scheduled for execution */ public ForkJoinTask submit(Runnable task, T result) { - if (task == null) - throw new NullPointerException(); - ForkJoinTask job = ForkJoinTask.adapt(task, result); + ForkJoinTask job = new ForkJoinTask.AdaptedRunnable(task, result); doSubmit(job); return job; } @@ -2090,7 +2275,7 @@ public class ForkJoinPool extends Abstra if (task instanceof ForkJoinTask) // avoid re-wrap job = (ForkJoinTask) task; else - job = ForkJoinTask.adapt(task, null); + job = new ForkJoinTask.AdaptedRunnableAction(task); doSubmit(job); return job; } @@ -2112,7 +2297,7 @@ public class ForkJoinPool extends Abstra boolean done = false; try { for (Callable t : tasks) { - ForkJoinTask f = ForkJoinTask.adapt(t); + ForkJoinTask f = new ForkJoinTask.AdaptedCallable(t); doSubmit(f); fs.add(f); } @@ -2298,7 +2483,7 @@ public class ForkJoinPool extends Abstra WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { for (int i = 0; i < ws.length; i += 2) { - if ((w = ws[i]) != null && w.queueSize() != 0) + if ((w = ws[i]) != null && !w.isEmpty()) return true; } } @@ -2612,7 +2797,7 @@ public class ForkJoinPool extends Abstra ForkJoinPool p = ((t instanceof ForkJoinWorkerThread) ? ((ForkJoinWorkerThread)t).pool : null); while (!blocker.isReleasable()) { - if (p == null || p.tryCompensate()) { + if (p == null || p.tryCompensate(null, blocker)) { try { do {} while (!blocker.isReleasable() && !blocker.block()); } finally { @@ -2629,35 +2814,45 @@ public class ForkJoinPool extends Abstra // implement RunnableFuture. protected RunnableFuture newTaskFor(Runnable runnable, T value) { - return (RunnableFuture) ForkJoinTask.adapt(runnable, value); + return new ForkJoinTask.AdaptedRunnable(runnable, value); } protected RunnableFuture newTaskFor(Callable callable) { - return (RunnableFuture) ForkJoinTask.adapt(callable); + return new ForkJoinTask.AdaptedCallable(callable); } // Unsafe mechanics private static final sun.misc.Unsafe U; private static final long CTL; private static final long PARKBLOCKER; + private static final int ABASE; + private static final int ASHIFT; static { poolNumberGenerator = new AtomicInteger(); + nextSubmitterSeed = new AtomicInteger(0x55555555); modifyThreadPermission = new RuntimePermission("modifyThread"); defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); submitters = new ThreadSubmitter(); + int s; try { U = getUnsafe(); Class k = ForkJoinPool.class; + Class ak = ForkJoinTask[].class; CTL = U.objectFieldOffset (k.getDeclaredField("ctl")); Class tk = Thread.class; PARKBLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); + ABASE = U.arrayBaseOffset(ak); + s = U.arrayIndexScale(ak); } catch (Exception e) { throw new Error(e); } + if ((s & (s-1)) != 0) + throw new Error("data type scale not a power of two"); + ASHIFT = 31 - Integer.numberOfLeadingZeros(s); } /**