--- jsr166/src/jsr166y/ForkJoinPool.java 2012/01/26 19:09:03 1.114 +++ jsr166/src/jsr166y/ForkJoinPool.java 2012/01/27 17:27:28 1.116 @@ -195,20 +195,22 @@ public class ForkJoinPool extends Abstra * WorkQueues are also used in a similar way for tasks submitted * to the pool. We cannot mix these tasks in the same queues used * for work-stealing (this would contaminate lifo/fifo - * processing). Instead, we loosely associate (via hashing) - * submission queues with submitting threads, and randomly scan - * these queues as well when looking for work. In essence, - * submitters act like workers except that they never take tasks, - * and they are multiplexed on to a finite number of shared work - * queues. However, classes are set up so that future extensions - * could allow submitters to optionally help perform tasks as - * well. Pool submissions from internal workers are also allowed, - * but use randomized rather than thread-hashed queue indices to - * avoid imbalance. Insertion of tasks in shared mode requires a - * lock (mainly to protect in the case of resizing) but we use - * only a simple spinlock (using bits in field runState), because - * submitters encountering a busy queue try or create others so - * never block. + * processing). Instead, we loosely associate submission queues + * with submitting threads, using a form of hashing. The + * ThreadLocal Submitter class contains a value initially used as + * a hash code for choosing existing queues, but may be randomly + * repositioned upon contention with other submitters. In + * essence, submitters act like workers except that they never + * take tasks, and they are multiplexed on to a finite number of + * shared work queues. However, classes are set up so that future + * extensions could allow submitters to optionally help perform + * tasks as well. Pool submissions from internal workers are also + * allowed, but use randomized rather than thread-hashed queue + * indices to avoid imbalance. Insertion of tasks in shared mode + * requires a lock (mainly to protect in the case of resizing) but + * we use only a simple spinlock (using bits in field runState), + * because submitters encountering a busy queue try or create + * others so never block. * * Management. * ========== @@ -1085,27 +1087,58 @@ public class ForkJoinPool extends Abstra } /** - * Computes a hash code for the given thread. This method is - * expected to provide higher-quality hash codes than those using - * method hashCode(). - */ - static final int hashThread(Thread t) { - long id = (t == null) ? 0L : t.getId(); // Use MurmurHash of thread id - int h = (int)id ^ (int)(id >>> 32); - h ^= h >>> 16; - h *= 0x85ebca6b; - h ^= h >>> 13; - h *= 0xc2b2ae35; - return h ^ (h >>> 16); +<<<<<<< ForkJoinPool.java + * Per-thread records for (typically non-FJ) threads that submit + * to pools. Cureently holds only psuedo-random seed / index that + * is used to chose submission queues in method doSubmit. In the + * future, this may incorporate a means to implement different + * task rejection and resubmission policies. + */ + static final class Submitter { + int seed; // seed for random submission queue selection + + // Heuristic padding to ameliorate unfortunate memory placements + int p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; + + Submitter() { + // Use identityHashCode, forced negative, for seed + seed = System.identityHashCode(Thread.currentThread()) | (1 << 31); + } + + /** + * Computes next value for random probes. Like method + * WorkQueue.nextSeed, this is manually inlined in several + * usages to avoid writes inside busy loops. + */ + final int nextSeed() { + int r = seed; + r ^= r << 13; + r ^= r >>> 17; + return seed = r ^= r << 5; + } + } + + /** ThreadLocal class for Submitters */ + static final class ThreadSubmitter extends ThreadLocal { + public Submitter initialValue() { return new Submitter(); } } /** + * Per-thread submission bookeeping. Shared across all pools + * to reduce ThreadLocal pollution and because random motion + * to avoid contention in one pool is likely to hold for others. + */ + static final ThreadSubmitter submitters = new ThreadSubmitter(); + + /** * Top-level runloop for workers */ final void runWorker(ForkJoinWorkerThread wt) { + // Initialize queue array and seed in this thread WorkQueue w = wt.workQueue; - w.growArray(false); // Initialize queue array and seed in this thread - w.seed = hashThread(Thread.currentThread()) | (1 << 31); // force < 0 + w.growArray(false); + // Same initial hash as Submitters + w.seed = System.identityHashCode(Thread.currentThread()) | (1 << 31); do {} while (w.runTask(scan(w))); } @@ -1218,6 +1251,37 @@ public class ForkJoinPool extends Abstra U.throwException(ex); } + /** + * Tries to add and register a new queue at the given index. + * + * @param idx the workQueues array index to register the queue + * @return the queue, or null if could not add because could + * not acquire lock or idx is unusable + */ + private WorkQueue tryAddSharedQueue(int idx) { + WorkQueue q = null; + ReentrantLock lock = this.lock; + if (idx >= 0 && (idx & 1) == 0 && !lock.isLocked()) { + // create queue outside of lock but only if apparently free + WorkQueue nq = new WorkQueue(null, SHARED_QUEUE); + if (lock.tryLock()) { + try { + WorkQueue[] ws = workQueues; + if (ws != null && idx < ws.length) { + if ((q = ws[idx]) == null) { + int rs; // update runState seq + ws[idx] = q = nq; + runState = (((rs = runState) & SHUTDOWN) | + ((rs + RS_SEQ) & ~SHUTDOWN)); + } + } + } finally { + lock.unlock(); + } + } + } + return q; + } // Maintaining ctl counts @@ -1320,73 +1384,35 @@ public class ForkJoinPool extends Abstra // Submissions /** - * Unless shutting down, adds the given task to some submission - * queue; using a randomly chosen queue index if the caller is a - * ForkJoinWorkerThread, else one based on caller thread's hash - * code. If no queue exists at the index, one is created. If the - * queue is busy, another is chosen by sweeping through the queues - * array. + * Unless shutting down, adds the given task to a submission queue + * at submitter's current queue index. If no queue exists at the + * index, one is created unless pool lock is busy. If the queue + * and/or lock are busy, another index is randomly chosen. */ private void doSubmit(ForkJoinTask task) { if (task == null) throw new NullPointerException(); - Thread t = Thread.currentThread(); - int r = ((t instanceof ForkJoinWorkerThread) ? - ((ForkJoinWorkerThread)t).workQueue.nextSeed() : hashThread(t)); - for (;;) { + Submitter s = submitters.get(); + for (int r = s.seed;;) { + WorkQueue q; int k; int rs = runState, m = rs & SMASK; - int j = r &= (m & ~1); // even numbered queues WorkQueue[] ws = workQueues; - if (rs < 0 || ws == null) - throw new RejectedExecutionException(); // shutting down - if (ws.length > m) { // consistency check - for (WorkQueue q;;) { // circular sweep - if (((q = ws[j]) != null || - (q = tryAddSharedQueue(j)) != null) && - q.trySharedPush(task)) { - signalWork(); - return; - } - if ((j = (j + 2) & m) == r) { - Thread.yield(); // all queues busy - break; - } - } + if (rs < 0 || ws == null) // shutting down + throw new RejectedExecutionException(); + if (ws.length > m && // k must be at index + ((q = ws[k = (r << 1) & m]) != null || + (q = tryAddSharedQueue(k)) != null) && + q.trySharedPush(task)) { + signalWork(); + return; } + r ^= r << 13; // xorshift seed to new position + r ^= r >>> 17; + if (((s.seed = r ^= r << 5) & m) == 0) + Thread.yield(); // occasionally yield if busy } } - /** - * Tries to add and register a new queue at the given index. - * - * @param idx the workQueues array index to register the queue - * @return the queue, or null if could not add because could - * not acquire lock or idx is unusable - */ - private WorkQueue tryAddSharedQueue(int idx) { - WorkQueue q = null; - ReentrantLock lock = this.lock; - if (idx >= 0 && (idx & 1) == 0 && !lock.isLocked()) { - // create queue outside of lock but only if apparently free - WorkQueue nq = new WorkQueue(null, SHARED_QUEUE); - if (lock.tryLock()) { - try { - WorkQueue[] ws = workQueues; - if (ws != null && idx < ws.length) { - if ((q = ws[idx]) == null) { - int rs; // update runState seq - ws[idx] = q = nq; - runState = (((rs = runState) & SHUTDOWN) | - ((rs + RS_SEQ) & ~SHUTDOWN)); - } - } - } finally { - lock.unlock(); - } - } - } - return q; - } // Scanning for tasks @@ -2553,7 +2579,7 @@ public class ForkJoinPool extends Abstra * *

If the caller is not a {@link ForkJoinTask}, this method is * behaviorally equivalent to -a *

 {@code
+     *  
 {@code
      * while (!blocker.isReleasable())
      *   if (blocker.block())
      *     return;