--- jsr166/src/jsr166y/ForkJoinPool.java 2012/10/30 16:05:35 1.138 +++ jsr166/src/jsr166y/ForkJoinPool.java 2012/10/31 12:49:24 1.139 @@ -735,63 +735,6 @@ public class ForkJoinPool extends Abstra return task; } - /** - * Version of pop that takes top element only if it - * its root is the given CountedCompleter. - */ - final ForkJoinTask popCC(CountedCompleter root) { - ForkJoinTask[] a; int m; - if (root != null && (a = array) != null && (m = a.length - 1) >= 0) { - for (int s; (s = top - 1) - base >= 0;) { - long j = ((m & s) << ASHIFT) + ABASE; - ForkJoinTask t = - (ForkJoinTask)U.getObject(a, j); - if (t == null || !(t instanceof CountedCompleter) || - ((CountedCompleter)t).getRoot() != root) - break; - if (U.compareAndSwapObject(a, j, t, null)) { - top = s; - return t; - } - if (root.status < 0) - break; - } - } - return null; - } - - /** - * Shared version of popCC - */ - final ForkJoinTask sharedPopCC(CountedCompleter root) { - ForkJoinTask task = null; - if (root != null && - runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { - try { - ForkJoinTask[] a; int m; - if ((a = array) != null && (m = a.length - 1) >= 0) { - for (int s; (s = top - 1) - base >= 0;) { - long j = ((m & s) << ASHIFT) + ABASE; - ForkJoinTask t = - (ForkJoinTask)U.getObject(a, j); - if (t == null || !(t instanceof CountedCompleter) || - ((CountedCompleter)t).getRoot() != root) - break; - if (U.compareAndSwapObject(a, j, t, null)) { - top = s; - task = t; - break; - } - if (root.status < 0) - break; - } - } - } finally { - runState = 0; - } - } - return task; - } /** * Takes a task in FIFO order if b is base of queue and a task @@ -966,7 +909,7 @@ public class ForkJoinPool extends Abstra return seed = r ^= r << 5; } - // Execution methods + // Specialized execution methods /** * Pops and runs tasks until empty. @@ -1045,6 +988,45 @@ public class ForkJoinPool extends Abstra } /** + * Version of shared pop that takes top element only if it + * its root is the given CountedCompleter. + */ + final CountedCompleter sharedPopCC(CountedCompleter root) { + CountedCompleter task = null; + if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { + try { + ForkJoinTask[] a; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { + outer:for (int s; (s = top - 1) - base >= 0;) { + long j = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObject(a, j); + if (t == null || !(t instanceof CountedCompleter)) + break; + CountedCompleter cc = (CountedCompleter)t; + for (CountedCompleter q = cc, p;;) { + if (q == root) { + if (U.compareAndSwapObject(a, j, cc, null)) { + top = s; + task = cc; + break outer; + } + break; + } + if ((p = q.completer) == null) + break outer; + q = p; + } + } + } + } finally { + runState = 0; + } + } + return task; + } + + /** * Executes a top-level task and any local tasks remaining * after execution. */ @@ -1165,7 +1147,6 @@ public class ForkJoinPool extends Abstra public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; - /** Property prefix for constructing common pool */ private static final String propPrefix = "java.util.concurrent.ForkJoinPool.common."; @@ -1379,7 +1360,10 @@ public class ForkJoinPool extends Abstra try { wait(); } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); + try { + Thread.currentThread().interrupt(); + } catch (SecurityException ignore) { + } } } else @@ -1464,7 +1448,6 @@ public class ForkJoinPool extends Abstra synchronized (this) { notifyAll(); }; } } - } /** @@ -1513,7 +1496,7 @@ public class ForkJoinPool extends Abstra } if (ex != null) // rethrow - U.throwException(ex); + ForkJoinTask.rethrow(ex); } // Submissions @@ -1590,22 +1573,6 @@ public class ForkJoinPool extends Abstra } /** - * Returns true if caller is (or may be) submitter to the common - * pool, and not all workers are active, and there appear to be - * tasks in the associated submission queue. - */ - static boolean canHelpCommonPool() { - ForkJoinPool p; WorkQueue[] ws; WorkQueue q; - int k = submitters.get().seed & SQMASK; - return ((p = commonPool) != null && - (int)(p.ctl >> AC_SHIFT) < 0 && - (ws = p.workQueues) != null && - ws.length > (k &= p.submitMask) && - (q = ws[k]) != null && - q.top - q.base > 0); - } - - /** * Returns true if the given task was submitted to common pool * and has not yet commenced execution, and is available for * removal according to execution policies; if so removing the @@ -1615,43 +1582,60 @@ public class ForkJoinPool extends Abstra * @return true if successful */ static boolean tryUnsubmitFromCommonPool(ForkJoinTask task) { - // Peek, looking for task and eligibility before - // using trySharedUnpush to actually take it under lock - ForkJoinPool p; WorkQueue[] ws; WorkQueue q; - ForkJoinTask[] a; int s; - int k = submitters.get().seed & SQMASK; - return ((p = commonPool) != null && - (int)(p.ctl >> AC_SHIFT) < 0 && - (ws = p.workQueues) != null && - ws.length > (k &= p.submitMask) && - (q = ws[k]) != null && - (a = q.array) != null && - (s = q.top - 1) - q.base >= 0 && - s >= 0 && s < a.length && - a[s] == task && - q.trySharedUnpush(task)); + // If not oversaturating platform, peek, looking for task and + // eligibility before using trySharedUnpush to actually take + // it under lock + ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q; + ForkJoinTask[] a; int ac, s, m; + if ((p = commonPool) != null && (ws = p.workQueues) != null) { + int k = submitters.get().seed & p.submitMask & SQMASK; + if ((m = ws.length - 1) >= k && (q = ws[k]) != null && + (ac = (int)(p.ctl >> AC_SHIFT)) <= 0) { + if (ac == 0) { // double check if all workers active + for (int i = 1; i <= m; i += 2) { + if ((w = ws[i]) != null && w.parker != null) { + ac = -1; + break; + } + } + } + return (ac < 0 && (a = q.array) != null && + (s = q.top - 1) - q.base >= 0 && + s >= 0 && s < a.length && + a[s] == task && + q.trySharedUnpush(task)); + } + } + return false; } /** - * Tries to pop a task from common pool with given root + * Tries to pop and run a task within same computation from common pool */ - static ForkJoinTask popCCFromCommonPool(CountedCompleter root) { - ForkJoinPool p; WorkQueue[] ws; WorkQueue q; - ForkJoinTask t; - int k = submitters.get().seed & SQMASK; - if (root != null && - (p = commonPool) != null && - (int)(p.ctl >> AC_SHIFT) < 0 && - (ws = p.workQueues) != null && - ws.length > (k &= p.submitMask) && - (q = ws[k]) != null && q.top - q.base > 0 && - root.status < 0 && - (t = q.sharedPopCC(root)) != null) - return t; - return null; + static void popAndExecCCFromCommonPool(CountedCompleter cc) { + ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; int m, ac; + CountedCompleter par, task; + if ((p = commonPool) != null && (ws = p.workQueues) != null) { + while ((par = cc.completer) != null) // find root + cc = par; + int k = submitters.get().seed & p.submitMask & SQMASK; + if ((m = ws.length - 1) >= k && (q = ws[k]) != null && + (ac = (int)(p.ctl >> AC_SHIFT)) <= 0) { + if (ac == 0) { + for (int i = 1; i <= m; i += 2) { + if ((w = ws[i]) != null && w.parker != null) { + ac = -1; + break; + } + } + } + if (ac < 0 && q.top - q.base > 0 && + (task = q.sharedPopCC(cc)) != null) + task.exec(); + } + } } - // Maintaining ctl counts /** @@ -2192,7 +2176,6 @@ public class ForkJoinPool extends Abstra ForkJoinTask t = null; int k = submitters.get().seed & SQMASK; if ((p = commonPool) != null && - (int)(p.ctl >> AC_SHIFT) < 0 && (ws = p.workQueues) != null && ws.length > (k &= p.submitMask) && (q = ws[k]) != null) { @@ -2247,9 +2230,7 @@ public class ForkJoinPool extends Abstra static int getEstimatedSubmitterQueueLength() { ForkJoinPool p; WorkQueue[] ws; WorkQueue q; int k = submitters.get().seed & SQMASK; - return ((p = commonPool) != null && - p.runState >= 0 && - (ws = p.workQueues) != null && + return ((p = commonPool) != null && (ws = p.workQueues) != null && ws.length > (k &= p.submitMask) && (q = ws[k]) != null) ? q.queueSize() : 0;