--- jsr166/src/jsr166y/ForkJoinPool.java 2012/10/29 17:23:34 1.136 +++ jsr166/src/jsr166y/ForkJoinPool.java 2012/10/30 14:23:11 1.137 @@ -691,8 +691,7 @@ public class ForkJoinPool extends Abstra /** * Takes next task, if one exists, in LIFO order. Call only - * by owner in unshared queues. (We do not have a shared - * version of this method because it is never needed.) + * by owner in unshared queues. */ final ForkJoinTask pop() { ForkJoinTask[] a; ForkJoinTask t; int m; @@ -710,6 +709,90 @@ public class ForkJoinPool extends Abstra return null; } + final ForkJoinTask sharedPop() { + ForkJoinTask task = null; + if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { + try { + ForkJoinTask[] a; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { + for (int s; (s = top - 1) - base >= 0;) { + long j = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObject(a, j); + if (t == null) + break; + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + task = t; + break; + } + } + } + } finally { + runState = 0; + } + } + return task; + } + + /** + * Version of pop that takes top element only if it + * its root is the given CountedCompleter. + */ + final ForkJoinTask popCC(CountedCompleter root) { + ForkJoinTask[] a; int m; + if (root != null && (a = array) != null && (m = a.length - 1) >= 0) { + for (int s; (s = top - 1) - base >= 0;) { + long j = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObject(a, j); + if (t == null || !(t instanceof CountedCompleter) || + ((CountedCompleter)t).getRoot() != root) + break; + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + return t; + } + if (root.status < 0) + break; + } + } + return null; + } + + /** + * Shared version of popCC + */ + final ForkJoinTask sharedPopCC(CountedCompleter root) { + ForkJoinTask task = null; + if (root != null && + runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { + try { + ForkJoinTask[] a; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { + for (int s; (s = top - 1) - base >= 0;) { + long j = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObject(a, j); + if (t == null || !(t instanceof CountedCompleter) || + ((CountedCompleter)t).getRoot() != root) + break; + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + task = t; + break; + } + if (root.status < 0) + break; + } + } + } finally { + runState = 0; + } + } + return task; + } + /** * Takes a task in FIFO order if b is base of queue and a task * can be claimed without contention. Specialized versions @@ -1507,6 +1590,22 @@ public class ForkJoinPool extends Abstra } /** + * Returns true if caller is (or may be) submitter to the common + * pool, and not all workers are active, and there appear to be + * tasks in the associated submission queue. + */ + static boolean canHelpCommonPool() { + ForkJoinPool p; WorkQueue[] ws; WorkQueue q; + int k = submitters.get().seed & SQMASK; + return ((p = commonPool) != null && + (int)(p.ctl >> AC_SHIFT) < 0 && + (ws = p.workQueues) != null && + ws.length > (k &= p.submitMask) && + (q = ws[k]) != null && + q.top - q.base > 0); + } + + /** * Returns true if the given task was submitted to common pool * and has not yet commenced execution, and is available for * removal according to execution policies; if so removing the @@ -1519,19 +1618,40 @@ public class ForkJoinPool extends Abstra // Peek, looking for task and eligibility before // using trySharedUnpush to actually take it under lock ForkJoinPool p; WorkQueue[] ws; WorkQueue q; - ForkJoinTask[] a; int t, s, n; + ForkJoinTask[] a; int s; int k = submitters.get().seed & SQMASK; return ((p = commonPool) != null && + (int)(p.ctl >> AC_SHIFT) < 0 && (ws = p.workQueues) != null && ws.length > (k &= p.submitMask) && (q = ws[k]) != null && (a = q.array) != null && - (n = (t = q.top) - q.base) > 0 && - (n > 1 || (int)(p.ctl >> AC_SHIFT) < 0) && - (s = t - 1) >= 0 && s < a.length && a[s] == task && + (s = q.top - 1) - q.base >= 0 && + s >= 0 && s < a.length && + a[s] == task && q.trySharedUnpush(task)); } + /** + * Tries to pop a task from common pool with given root + */ + static ForkJoinTask popCCFromCommonPool(CountedCompleter root) { + ForkJoinPool p; WorkQueue[] ws; WorkQueue q; + ForkJoinTask t; + int k = submitters.get().seed & SQMASK; + if (root != null && + (p = commonPool) != null && + (int)(p.ctl >> AC_SHIFT) < 0 && + (ws = p.workQueues) != null && + ws.length > (k &= p.submitMask) && + (q = ws[k]) != null && q.top - q.base > 0 && + root.status < 0 && + (t = q.sharedPopCC(root)) != null) + return t; + return null; + } + + // Maintaining ctl counts /** @@ -2067,17 +2187,25 @@ public class ForkJoinPool extends Abstra * Restricted version of helpQuiescePool for non-FJ callers */ static void externalHelpQuiescePool() { - ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q; - ForkJoinTask t; int b; + ForkJoinPool p; WorkQueue[] ws; WorkQueue q, sq; + ForkJoinTask[] a; int b; + ForkJoinTask t = null; int k = submitters.get().seed & SQMASK; if ((p = commonPool) != null && + (int)(p.ctl >> AC_SHIFT) < 0 && (ws = p.workQueues) != null && ws.length > (k &= p.submitMask) && - (w = ws[k]) != null && - (q = p.findNonEmptyStealQueue(w)) != null && - (b = q.base) - q.top < 0 && - (t = q.pollAt(b)) != null) - t.doExec(); + (q = ws[k]) != null) { + while (q.top - q.base > 0) { + if ((t = q.sharedPop()) != null) + break; + } + if (t == null && (sq = p.findNonEmptyStealQueue(q)) != null && + (b = sq.base) - sq.top < 0) + t = sq.pollAt(b); + if (t != null) + t.doExec(); + } } /**