--- jsr166/src/jsr166e/ForkJoinPool.java 2012/10/29 17:23:26 1.8 +++ jsr166/src/jsr166e/ForkJoinPool.java 2012/10/30 16:05:35 1.10 @@ -692,8 +692,7 @@ public class ForkJoinPool extends Abstra /** * Takes next task, if one exists, in LIFO order. Call only - * by owner in unshared queues. (We do not have a shared - * version of this method because it is never needed.) + * by owner in unshared queues. */ final ForkJoinTask pop() { ForkJoinTask[] a; ForkJoinTask t; int m; @@ -711,6 +710,90 @@ public class ForkJoinPool extends Abstra return null; } + final ForkJoinTask sharedPop() { + ForkJoinTask task = null; + if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { + try { + ForkJoinTask[] a; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { + for (int s; (s = top - 1) - base >= 0;) { + long j = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObject(a, j); + if (t == null) + break; + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + task = t; + break; + } + } + } + } finally { + runState = 0; + } + } + return task; + } + + /** + * Version of pop that takes top element only if it + * its root is the given CountedCompleter. + */ + final ForkJoinTask popCC(CountedCompleter root) { + ForkJoinTask[] a; int m; + if (root != null && (a = array) != null && (m = a.length - 1) >= 0) { + for (int s; (s = top - 1) - base >= 0;) { + long j = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObject(a, j); + if (t == null || !(t instanceof CountedCompleter) || + ((CountedCompleter)t).getRoot() != root) + break; + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + return t; + } + if (root.status < 0) + break; + } + } + return null; + } + + /** + * Shared version of popCC + */ + final ForkJoinTask sharedPopCC(CountedCompleter root) { + ForkJoinTask task = null; + if (root != null && + runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { + try { + ForkJoinTask[] a; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { + for (int s; (s = top - 1) - base >= 0;) { + long j = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObject(a, j); + if (t == null || !(t instanceof CountedCompleter) || + ((CountedCompleter)t).getRoot() != root) + break; + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + task = t; + break; + } + if (root.status < 0) + break; + } + } + } finally { + runState = 0; + } + } + return task; + } + /** * Takes a task in FIFO order if b is base of queue and a task * can be claimed without contention. Specialized versions @@ -1335,8 +1418,8 @@ public class ForkJoinPool extends Abstra */ final String nextWorkerName() { int n; - do {} while(!U.compareAndSwapInt(this, NEXTWORKERNUMBER, - n = nextWorkerNumber, ++n)); + do {} while (!U.compareAndSwapInt(this, NEXTWORKERNUMBER, + n = nextWorkerNumber, ++n)); return workerNamePrefix.concat(Integer.toString(n)); } @@ -1399,8 +1482,8 @@ public class ForkJoinPool extends Abstra if (wt != null && (w = wt.workQueue) != null) { w.runState = -1; // ensure runState is set long steals = w.totalSteals + w.nsteals, sc; - do {} while(!U.compareAndSwapLong(this, STEALCOUNT, - sc = stealCount, sc + steals)); + do {} while (!U.compareAndSwapLong(this, STEALCOUNT, + sc = stealCount, sc + steals)); int idx = w.poolIndex; while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1)) tryAwaitMainLock(); @@ -1508,6 +1591,22 @@ public class ForkJoinPool extends Abstra } /** + * Returns true if caller is (or may be) submitter to the common + * pool, and not all workers are active, and there appear to be + * tasks in the associated submission queue. + */ + static boolean canHelpCommonPool() { + ForkJoinPool p; WorkQueue[] ws; WorkQueue q; + int k = submitters.get().seed & SQMASK; + return ((p = commonPool) != null && + (int)(p.ctl >> AC_SHIFT) < 0 && + (ws = p.workQueues) != null && + ws.length > (k &= p.submitMask) && + (q = ws[k]) != null && + q.top - q.base > 0); + } + + /** * Returns true if the given task was submitted to common pool * and has not yet commenced execution, and is available for * removal according to execution policies; if so removing the @@ -1520,19 +1619,40 @@ public class ForkJoinPool extends Abstra // Peek, looking for task and eligibility before // using trySharedUnpush to actually take it under lock ForkJoinPool p; WorkQueue[] ws; WorkQueue q; - ForkJoinTask[] a; int t, s, n; + ForkJoinTask[] a; int s; int k = submitters.get().seed & SQMASK; return ((p = commonPool) != null && + (int)(p.ctl >> AC_SHIFT) < 0 && (ws = p.workQueues) != null && ws.length > (k &= p.submitMask) && (q = ws[k]) != null && (a = q.array) != null && - (n = (t = q.top) - q.base) > 0 && - (n > 1 || (int)(p.ctl >> AC_SHIFT) < 0) && - (s = t - 1) >= 0 && s < a.length && a[s] == task && + (s = q.top - 1) - q.base >= 0 && + s >= 0 && s < a.length && + a[s] == task && q.trySharedUnpush(task)); } + /** + * Tries to pop a task from common pool with given root + */ + static ForkJoinTask popCCFromCommonPool(CountedCompleter root) { + ForkJoinPool p; WorkQueue[] ws; WorkQueue q; + ForkJoinTask t; + int k = submitters.get().seed & SQMASK; + if (root != null && + (p = commonPool) != null && + (int)(p.ctl >> AC_SHIFT) < 0 && + (ws = p.workQueues) != null && + ws.length > (k &= p.submitMask) && + (q = ws[k]) != null && q.top - q.base > 0 && + root.status < 0 && + (t = q.sharedPopCC(root)) != null) + return t; + return null; + } + + // Maintaining ctl counts /** @@ -2068,17 +2188,25 @@ public class ForkJoinPool extends Abstra * Restricted version of helpQuiescePool for non-FJ callers */ static void externalHelpQuiescePool() { - ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q; - ForkJoinTask t; int b; + ForkJoinPool p; WorkQueue[] ws; WorkQueue q, sq; + ForkJoinTask[] a; int b; + ForkJoinTask t = null; int k = submitters.get().seed & SQMASK; if ((p = commonPool) != null && + (int)(p.ctl >> AC_SHIFT) < 0 && (ws = p.workQueues) != null && ws.length > (k &= p.submitMask) && - (w = ws[k]) != null && - (q = p.findNonEmptyStealQueue(w)) != null && - (b = q.base) - q.top < 0 && - (t = q.pollAt(b)) != null) - t.doExec(); + (q = ws[k]) != null) { + while (q.top - q.base > 0) { + if ((t = q.sharedPop()) != null) + break; + } + if (t == null && (sq = p.findNonEmptyStealQueue(q)) != null && + (b = sq.base) - sq.top < 0) + t = sq.pollAt(b); + if (t != null) + t.doExec(); + } } /** @@ -2148,7 +2276,7 @@ public class ForkJoinPool extends Abstra for (long c;;) { if (((c = ctl) & STOP_BIT) != 0) { // already terminating if ((short)(c >>> TC_SHIFT) == -parallelism) { - synchronized(this) { + synchronized (this) { notifyAll(); // signal when 0 workers } } @@ -2872,7 +3000,7 @@ public class ForkJoinPool extends Abstra return true; long startTime = System.nanoTime(); boolean terminated = false; - synchronized(this) { + synchronized (this) { for (long waitTime = nanos, millis = 0L;;) { if (terminated = isTerminated() || waitTime <= 0L || @@ -3057,7 +3185,7 @@ public class ForkJoinPool extends Abstra defaultForkJoinWorkerThreadFactory : ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); - Thread.UncaughtExceptionHandler ueh = (up == null)? null : + Thread.UncaughtExceptionHandler ueh = (up == null) ? null : ((Thread.UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(up).newInstance()); int par;