--- jsr166/src/jsr166y/ForkJoinPool.java 2012/02/21 00:44:53 1.126 +++ jsr166/src/jsr166y/ForkJoinPool.java 2012/03/04 15:52:45 1.127 @@ -721,12 +721,11 @@ public class ForkJoinPool extends Abstra * version of this method because it is never needed.) */ final ForkJoinTask pop() { - ForkJoinTask t; int m; - ForkJoinTask[] a = array; - if (a != null && (m = a.length - 1) >= 0) { + ForkJoinTask[] a; ForkJoinTask t; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { for (int s; (s = top - 1) - base >= 0;) { - int j = ((m & s) << ASHIFT) + ABASE; - if ((t = (ForkJoinTask)U.getObjectVolatile(a, j)) == null) + long j = ((m & s) << ASHIFT) + ABASE; + if ((t = (ForkJoinTask)U.getObject(a, j)) == null) break; if (U.compareAndSwapObject(a, j, t, null)) { top = s; @@ -830,54 +829,6 @@ public class ForkJoinPool extends Abstra } /** - * If present, removes from queue and executes the given task, or - * any other cancelled task. Returns (true) immediately on any CAS - * or consistency check failure so caller can retry. - * - * @return false if no progress can be made - */ - final boolean tryRemoveAndExec(ForkJoinTask task) { - boolean removed = false, empty = true, progress = true; - ForkJoinTask[] a; int m, s, b, n; - if ((a = array) != null && (m = a.length - 1) >= 0 && - (n = (s = top) - (b = base)) > 0) { - for (ForkJoinTask t;;) { // traverse from s to b - int j = ((--s & m) << ASHIFT) + ABASE; - t = (ForkJoinTask)U.getObjectVolatile(a, j); - if (t == null) // inconsistent length - break; - else if (t == task) { - if (s + 1 == top) { // pop - if (!U.compareAndSwapObject(a, j, task, null)) - break; - top = s; - removed = true; - } - else if (base == b) // replace with proxy - removed = U.compareAndSwapObject(a, j, task, - new EmptyTask()); - break; - } - else if (t.status >= 0) - empty = false; - else if (s + 1 == top) { // pop and throw away - if (U.compareAndSwapObject(a, j, t, null)) - top = s; - break; - } - if (--n == 0) { - if (!empty && base == b) - progress = false; - break; - } - } - } - if (removed) - task.doExec(); - return progress; - } - - /** * Initializes or doubles the capacity of array. Call either * by owner or with lock held -- it is OK for base, but not * top, to move while resizings are in progress. @@ -939,69 +890,96 @@ public class ForkJoinPool extends Abstra // Execution methods /** - * Removes and runs tasks until empty, using local mode - * ordering. Normally called only after checking for apparent - * non-emptiness. + * Pops and runs tasks until empty. */ - final void runLocalTasks() { - // hoist checks from repeated pop/poll - ForkJoinTask[] a; int m; - if ((a = array) != null && (m = a.length - 1) >= 0) { - if (mode == 0) { - for (int s; (s = top - 1) - base >= 0;) { - int j = ((m & s) << ASHIFT) + ABASE; - ForkJoinTask t = - (ForkJoinTask)U.getObjectVolatile(a, j); - if (t != null) { - if (U.compareAndSwapObject(a, j, t, null)) { - top = s; - t.doExec(); - } - } - else - break; - } + private void popAndExecAll() { + // A bit faster than repeated pop calls + ForkJoinTask[] a; int m, s; long j; ForkJoinTask t; + while ((a = array) != null && (m = a.length - 1) >= 0 && + (s = top - 1) - base >= 0 && + (t = ((ForkJoinTask) + U.getObject(a, j = ((m & s) << ASHIFT) + ABASE))) + != null) { + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + t.doExec(); } - else { - for (int b; (b = base) - top < 0;) { - int j = ((m & b) << ASHIFT) + ABASE; - ForkJoinTask t = - (ForkJoinTask)U.getObjectVolatile(a, j); - if (t != null) { - if (base == b && - U.compareAndSwapObject(a, j, t, null)) { - base = b + 1; - t.doExec(); - } - } else if (base == b) { - if (b + 1 == top) + } + } + + /** + * Polls and runs tasks until empty. + */ + private void pollAndExecAll() { + for (ForkJoinTask t; (t = poll()) != null;) + t.doExec(); + } + + /** + * If present, removes from queue and executes the given task, or + * any other cancelled task. Returns (true) immediately on any CAS + * or consistency check failure so caller can retry. + * + * @return false if no progress can be made + */ + final boolean tryRemoveAndExec(ForkJoinTask task) { + boolean removed = false, empty = true, progress = true; + ForkJoinTask[] a; int m, s, b, n; + if ((a = array) != null && (m = a.length - 1) >= 0 && + (n = (s = top) - (b = base)) > 0) { + for (ForkJoinTask t;;) { // traverse from s to b + int j = ((--s & m) << ASHIFT) + ABASE; + t = (ForkJoinTask)U.getObjectVolatile(a, j); + if (t == null) // inconsistent length + break; + else if (t == task) { + if (s + 1 == top) { // pop + if (!U.compareAndSwapObject(a, j, task, null)) break; - Thread.yield(); // wait for lagging update + top = s; + removed = true; } + else if (base == b) // replace with proxy + removed = U.compareAndSwapObject(a, j, task, + new EmptyTask()); + break; + } + else if (t.status >= 0) + empty = false; + else if (s + 1 == top) { // pop and throw away + if (U.compareAndSwapObject(a, j, t, null)) + top = s; + break; + } + if (--n == 0) { + if (!empty && base == b) + progress = false; + break; } } } + if (removed) + task.doExec(); + return progress; } /** * Executes a top-level task and any local tasks remaining * after execution. - * - * @return true unless terminating */ - final boolean runTask(ForkJoinTask t) { - boolean alive = true; + final void runTask(ForkJoinTask t) { if (t != null) { currentSteal = t; t.doExec(); - if (top != base) // conservative guard - runLocalTasks(); + if (top != base) { // process remaining local tasks + if (mode == 0) + popAndExecAll(); + else + pollAndExecAll(); + } ++nsteals; currentSteal = null; } - else if (runState < 0) // terminating - alive = false; - return alive; } /** @@ -1321,22 +1299,28 @@ public class ForkJoinPool extends Abstra * * @param w the worker's queue */ + final void registerWorker(WorkQueue w) { Mutex lock = this.lock; lock.lock(); try { WorkQueue[] ws = workQueues; if (w != null && ws != null) { // skip on shutdown/failure - int rs, n; - while ((n = ws.length) < // ensure can hold total - (parallelism + (short)(ctl >>> TC_SHIFT) << 1)) - workQueues = ws = Arrays.copyOf(ws, n << 1); - int m = n - 1; + int rs, n = ws.length, m = n - 1; int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence w.seed = (s == 0) ? 1 : s; // ensure non-zero seed int r = (s << 1) | 1; // use odd-numbered indices - while (ws[r &= m] != null) // step by approx half size - r += ((n >>> 1) & SQMASK) + 2; + if (ws[r &= m] != null) { // collision + int probes = 0; // step by approx half size + int step = (n <= 4) ? 2 : ((n >>> 1) & SQMASK) + 2; + while (ws[r = (r + step) & m] != null) { + if (++probes >= n) { + workQueues = ws = Arrays.copyOf(ws, n <<= 1); + m = n - 1; + probes = 0; + } + } + } w.eventCount = w.poolIndex = r; // establish before recording ws[r] = w; // also update seq runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN); @@ -1483,7 +1467,6 @@ public class ForkJoinPool extends Abstra } } - // Scanning for tasks /** @@ -1491,7 +1474,7 @@ public class ForkJoinPool extends Abstra */ final void runWorker(WorkQueue w) { w.growArray(false); // initialize queue array in this thread - do {} while (w.runTask(scan(w))); + do { w.runTask(scan(w)); } while (w.runState >= 0); } /** @@ -1554,8 +1537,7 @@ public class ForkJoinPool extends Abstra q.base = b + 1; // specialization of pollAt return t; } - else if ((t != null || b + 1 != q.top) && - (ec < 0 || j <= m)) { + else if (ec < 0 || j <= m) { rs = 0; // mark scan as imcomplete break; // caller can retry after release } @@ -1630,7 +1612,7 @@ public class ForkJoinPool extends Abstra */ private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) { if (w.eventCount < 0 && !tryTerminate(false, false) && - (int)prevCtl != 0 && ctl == currentCtl) { + (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) { Thread wt = Thread.currentThread(); Thread.yield(); // yield before block while (ctl == currentCtl) { @@ -1828,44 +1810,49 @@ public class ForkJoinPool extends Abstra * @return task status on exit */ final int awaitJoin(WorkQueue joiner, ForkJoinTask task) { + int s; ForkJoinTask prevJoin = joiner.currentJoin; - joiner.currentJoin = task; - long startTime = 0L; - for (int k = 0, s; ; ++k) { - if ((joiner.isEmpty() ? // try to help - !tryHelpStealer(joiner, task) : - !joiner.tryRemoveAndExec(task))) { - if (k == 0) { - startTime = System.nanoTime(); - tryPollForAndExec(joiner, task); // check uncommon case - } - else if ((k & (MAX_HELP - 1)) == 0 && - System.nanoTime() - startTime >= COMPENSATION_DELAY && - tryCompensate(task, null)) { - if (task.trySetSignal() && task.status >= 0) { - synchronized (task) { - if (task.status >= 0) { - try { // see ForkJoinTask - task.wait(); // for explanation - } catch (InterruptedException ie) { + if ((s = task.status) >= 0) { + joiner.currentJoin = task; + long startTime = 0L; + for (int k = 0;;) { + if ((joiner.isEmpty() ? // try to help + !tryHelpStealer(joiner, task) : + !joiner.tryRemoveAndExec(task))) { + if (k == 0) { + startTime = System.nanoTime(); + tryPollForAndExec(joiner, task); // check uncommon case + } + else if ((k & (MAX_HELP - 1)) == 0 && + System.nanoTime() - startTime >= + COMPENSATION_DELAY && + tryCompensate(task, null)) { + if (task.trySetSignal() && task.status >= 0) { + synchronized (task) { + if (task.status >= 0) { + try { // see ForkJoinTask + task.wait(); // for explanation + } catch (InterruptedException ie) { + } } + else + task.notifyAll(); } - else - task.notifyAll(); } + long c; // re-activate + do {} while (!U.compareAndSwapLong + (this, CTL, c = ctl, c + AC_UNIT)); } - long c; // re-activate - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, c + AC_UNIT)); } + if ((s = task.status) < 0) { + joiner.currentJoin = prevJoin; + break; + } + else if ((k++ & (MAX_HELP - 1)) == MAX_HELP >>> 1) + Thread.yield(); // for politeness } - if ((s = task.status) < 0) { - joiner.currentJoin = prevJoin; - return s; - } - else if ((k & (MAX_HELP - 1)) == MAX_HELP >>> 1) - Thread.yield(); // for politeness } + return s; } /** @@ -1922,8 +1909,9 @@ public class ForkJoinPool extends Abstra */ final void helpQuiescePool(WorkQueue w) { for (boolean active = true;;) { - if (w.base - w.top < 0) - w.runLocalTasks(); // exhaust local queue + ForkJoinTask localTask; // exhaust local queue + while ((localTask = w.nextLocalTask()) != null) + localTask.doExec(); WorkQueue q = findNonEmptyStealQueue(w); if (q != null) { ForkJoinTask t; int b;