--- jsr166/src/jsr166e/ForkJoinPool.java 2012/10/29 17:23:26 1.8 +++ jsr166/src/jsr166e/ForkJoinPool.java 2012/10/31 12:49:13 1.11 @@ -692,8 +692,7 @@ public class ForkJoinPool extends Abstra /** * Takes next task, if one exists, in LIFO order. Call only - * by owner in unshared queues. (We do not have a shared - * version of this method because it is never needed.) + * by owner in unshared queues. */ final ForkJoinTask pop() { ForkJoinTask[] a; ForkJoinTask t; int m; @@ -711,6 +710,33 @@ public class ForkJoinPool extends Abstra return null; } + final ForkJoinTask sharedPop() { + ForkJoinTask task = null; + if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { + try { + ForkJoinTask[] a; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { + for (int s; (s = top - 1) - base >= 0;) { + long j = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObject(a, j); + if (t == null) + break; + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + task = t; + break; + } + } + } + } finally { + runState = 0; + } + } + return task; + } + + /** * Takes a task in FIFO order if b is base of queue and a task * can be claimed without contention. Specialized versions @@ -884,7 +910,7 @@ public class ForkJoinPool extends Abstra return seed = r ^= r << 5; } - // Execution methods + // Specialized execution methods /** * Pops and runs tasks until empty. @@ -963,6 +989,45 @@ public class ForkJoinPool extends Abstra } /** + * Version of shared pop that takes top element only if it + * its root is the given CountedCompleter. + */ + final CountedCompleter sharedPopCC(CountedCompleter root) { + CountedCompleter task = null; + if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) { + try { + ForkJoinTask[] a; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { + outer:for (int s; (s = top - 1) - base >= 0;) { + long j = ((m & s) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObject(a, j); + if (t == null || !(t instanceof CountedCompleter)) + break; + CountedCompleter cc = (CountedCompleter)t; + for (CountedCompleter q = cc, p;;) { + if (q == root) { + if (U.compareAndSwapObject(a, j, cc, null)) { + top = s; + task = cc; + break outer; + } + break; + } + if ((p = q.completer) == null) + break outer; + q = p; + } + } + } + } finally { + runState = 0; + } + } + return task; + } + + /** * Executes a top-level task and any local tasks remaining * after execution. */ @@ -1083,7 +1148,6 @@ public class ForkJoinPool extends Abstra public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; - /** Property prefix for constructing common pool */ private static final String propPrefix = "java.util.concurrent.ForkJoinPool.common."; @@ -1297,7 +1361,10 @@ public class ForkJoinPool extends Abstra try { wait(); } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); + try { + Thread.currentThread().interrupt(); + } catch (SecurityException ignore) { + } } } else @@ -1335,8 +1402,8 @@ public class ForkJoinPool extends Abstra */ final String nextWorkerName() { int n; - do {} while(!U.compareAndSwapInt(this, NEXTWORKERNUMBER, - n = nextWorkerNumber, ++n)); + do {} while (!U.compareAndSwapInt(this, NEXTWORKERNUMBER, + n = nextWorkerNumber, ++n)); return workerNamePrefix.concat(Integer.toString(n)); } @@ -1382,7 +1449,6 @@ public class ForkJoinPool extends Abstra synchronized (this) { notifyAll(); }; } } - } /** @@ -1399,8 +1465,8 @@ public class ForkJoinPool extends Abstra if (wt != null && (w = wt.workQueue) != null) { w.runState = -1; // ensure runState is set long steals = w.totalSteals + w.nsteals, sc; - do {} while(!U.compareAndSwapLong(this, STEALCOUNT, - sc = stealCount, sc + steals)); + do {} while (!U.compareAndSwapLong(this, STEALCOUNT, + sc = stealCount, sc + steals)); int idx = w.poolIndex; while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1)) tryAwaitMainLock(); @@ -1431,7 +1497,7 @@ public class ForkJoinPool extends Abstra } if (ex != null) // rethrow - U.throwException(ex); + ForkJoinTask.rethrow(ex); } // Submissions @@ -1517,20 +1583,58 @@ public class ForkJoinPool extends Abstra * @return true if successful */ static boolean tryUnsubmitFromCommonPool(ForkJoinTask task) { - // Peek, looking for task and eligibility before - // using trySharedUnpush to actually take it under lock - ForkJoinPool p; WorkQueue[] ws; WorkQueue q; - ForkJoinTask[] a; int t, s, n; - int k = submitters.get().seed & SQMASK; - return ((p = commonPool) != null && - (ws = p.workQueues) != null && - ws.length > (k &= p.submitMask) && - (q = ws[k]) != null && - (a = q.array) != null && - (n = (t = q.top) - q.base) > 0 && - (n > 1 || (int)(p.ctl >> AC_SHIFT) < 0) && - (s = t - 1) >= 0 && s < a.length && a[s] == task && - q.trySharedUnpush(task)); + // If not oversaturating platform, peek, looking for task and + // eligibility before using trySharedUnpush to actually take + // it under lock + ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q; + ForkJoinTask[] a; int ac, s, m; + if ((p = commonPool) != null && (ws = p.workQueues) != null) { + int k = submitters.get().seed & p.submitMask & SQMASK; + if ((m = ws.length - 1) >= k && (q = ws[k]) != null && + (ac = (int)(p.ctl >> AC_SHIFT)) <= 0) { + if (ac == 0) { // double check if all workers active + for (int i = 1; i <= m; i += 2) { + if ((w = ws[i]) != null && w.parker != null) { + ac = -1; + break; + } + } + } + return (ac < 0 && (a = q.array) != null && + (s = q.top - 1) - q.base >= 0 && + s >= 0 && s < a.length && + a[s] == task && + q.trySharedUnpush(task)); + } + } + return false; + } + + /** + * Tries to pop and run a task within same computation from common pool + */ + static void popAndExecCCFromCommonPool(CountedCompleter cc) { + ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; int m, ac; + CountedCompleter par, task; + if ((p = commonPool) != null && (ws = p.workQueues) != null) { + while ((par = cc.completer) != null) // find root + cc = par; + int k = submitters.get().seed & p.submitMask & SQMASK; + if ((m = ws.length - 1) >= k && (q = ws[k]) != null && + (ac = (int)(p.ctl >> AC_SHIFT)) <= 0) { + if (ac == 0) { + for (int i = 1; i <= m; i += 2) { + if ((w = ws[i]) != null && w.parker != null) { + ac = -1; + break; + } + } + } + if (ac < 0 && q.top - q.base > 0 && + (task = q.sharedPopCC(cc)) != null) + task.exec(); + } + } } // Maintaining ctl counts @@ -2068,17 +2172,24 @@ public class ForkJoinPool extends Abstra * Restricted version of helpQuiescePool for non-FJ callers */ static void externalHelpQuiescePool() { - ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q; - ForkJoinTask t; int b; + ForkJoinPool p; WorkQueue[] ws; WorkQueue q, sq; + ForkJoinTask[] a; int b; + ForkJoinTask t = null; int k = submitters.get().seed & SQMASK; if ((p = commonPool) != null && (ws = p.workQueues) != null && ws.length > (k &= p.submitMask) && - (w = ws[k]) != null && - (q = p.findNonEmptyStealQueue(w)) != null && - (b = q.base) - q.top < 0 && - (t = q.pollAt(b)) != null) - t.doExec(); + (q = ws[k]) != null) { + while (q.top - q.base > 0) { + if ((t = q.sharedPop()) != null) + break; + } + if (t == null && (sq = p.findNonEmptyStealQueue(q)) != null && + (b = sq.base) - sq.top < 0) + t = sq.pollAt(b); + if (t != null) + t.doExec(); + } } /** @@ -2120,9 +2231,7 @@ public class ForkJoinPool extends Abstra static int getEstimatedSubmitterQueueLength() { ForkJoinPool p; WorkQueue[] ws; WorkQueue q; int k = submitters.get().seed & SQMASK; - return ((p = commonPool) != null && - p.runState >= 0 && - (ws = p.workQueues) != null && + return ((p = commonPool) != null && (ws = p.workQueues) != null && ws.length > (k &= p.submitMask) && (q = ws[k]) != null) ? q.queueSize() : 0; @@ -2148,7 +2257,7 @@ public class ForkJoinPool extends Abstra for (long c;;) { if (((c = ctl) & STOP_BIT) != 0) { // already terminating if ((short)(c >>> TC_SHIFT) == -parallelism) { - synchronized(this) { + synchronized (this) { notifyAll(); // signal when 0 workers } } @@ -2872,7 +2981,7 @@ public class ForkJoinPool extends Abstra return true; long startTime = System.nanoTime(); boolean terminated = false; - synchronized(this) { + synchronized (this) { for (long waitTime = nanos, millis = 0L;;) { if (terminated = isTerminated() || waitTime <= 0L || @@ -3057,7 +3166,7 @@ public class ForkJoinPool extends Abstra defaultForkJoinWorkerThreadFactory : ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); - Thread.UncaughtExceptionHandler ueh = (up == null)? null : + Thread.UncaughtExceptionHandler ueh = (up == null) ? null : ((Thread.UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(up).newInstance()); int par;