ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.137 by dl, Tue Oct 30 14:23:11 2012 UTC vs.
Revision 1.139 by dl, Wed Oct 31 12:49:24 2012 UTC

# Line 735 | Line 735 | public class ForkJoinPool extends Abstra
735              return task;
736          }
737  
738        /**
739         * Version of pop that takes top element only if it
740         * its root is the given CountedCompleter.
741         */
742        final ForkJoinTask<?> popCC(CountedCompleter<?> root) {
743            ForkJoinTask<?>[] a; int m;
744            if (root != null && (a = array) != null && (m = a.length - 1) >= 0) {
745                for (int s; (s = top - 1) - base >= 0;) {
746                    long j = ((m & s) << ASHIFT) + ABASE;
747                    ForkJoinTask<?> t =
748                        (ForkJoinTask<?>)U.getObject(a, j);
749                    if (t == null || !(t instanceof CountedCompleter) ||
750                        ((CountedCompleter<?>)t).getRoot() != root)
751                        break;
752                    if (U.compareAndSwapObject(a, j, t, null)) {
753                        top = s;
754                        return t;
755                    }
756                    if (root.status < 0)
757                        break;
758                }
759            }
760            return null;
761        }
762
763        /**
764         * Shared version of popCC
765         */
766        final ForkJoinTask<?> sharedPopCC(CountedCompleter<?> root) {
767            ForkJoinTask<?> task = null;
768            if (root != null &&
769                runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
770                try {
771                    ForkJoinTask<?>[] a; int m;
772                    if ((a = array) != null && (m = a.length - 1) >= 0) {
773                        for (int s; (s = top - 1) - base >= 0;) {
774                            long j = ((m & s) << ASHIFT) + ABASE;
775                            ForkJoinTask<?> t =
776                                (ForkJoinTask<?>)U.getObject(a, j);
777                            if (t == null || !(t instanceof CountedCompleter) ||
778                                ((CountedCompleter<?>)t).getRoot() != root)
779                                break;
780                            if (U.compareAndSwapObject(a, j, t, null)) {
781                                top = s;
782                                task = t;
783                                break;
784                            }
785                            if (root.status < 0)
786                                break;
787                        }
788                    }
789                } finally {
790                    runState = 0;
791                }
792            }
793            return task;
794        }
738  
739          /**
740           * Takes a task in FIFO order if b is base of queue and a task
# Line 966 | Line 909 | public class ForkJoinPool extends Abstra
909              return seed = r ^= r << 5;
910          }
911  
912 <        // Execution methods
912 >        // Specialized execution methods
913  
914          /**
915           * Pops and runs tasks until empty.
# Line 1045 | Line 988 | public class ForkJoinPool extends Abstra
988          }
989  
990          /**
991 +         * Version of shared pop that takes top element only if it
992 +         * its root is the given CountedCompleter.
993 +         */
994 +        final CountedCompleter<?> sharedPopCC(CountedCompleter<?> root) {
995 +            CountedCompleter<?> task = null;
996 +            if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
997 +                try {
998 +                    ForkJoinTask<?>[] a; int m;
999 +                    if ((a = array) != null && (m = a.length - 1) >= 0) {
1000 +                        outer:for (int s; (s = top - 1) - base >= 0;) {
1001 +                            long j = ((m & s) << ASHIFT) + ABASE;
1002 +                            ForkJoinTask<?> t =
1003 +                                (ForkJoinTask<?>)U.getObject(a, j);
1004 +                            if (t == null || !(t instanceof CountedCompleter))
1005 +                                break;
1006 +                            CountedCompleter<?> cc = (CountedCompleter<?>)t;
1007 +                            for (CountedCompleter<?> q = cc, p;;) {
1008 +                                if (q == root) {
1009 +                                    if (U.compareAndSwapObject(a, j, cc, null)) {
1010 +                                        top = s;
1011 +                                        task = cc;
1012 +                                        break outer;
1013 +                                    }
1014 +                                    break;
1015 +                                }
1016 +                                if ((p = q.completer) == null)
1017 +                                    break outer;
1018 +                                q = p;
1019 +                            }
1020 +                        }
1021 +                    }
1022 +                } finally {
1023 +                    runState = 0;
1024 +                }
1025 +            }
1026 +            return task;
1027 +        }
1028 +
1029 +        /**
1030           * Executes a top-level task and any local tasks remaining
1031           * after execution.
1032           */
# Line 1165 | Line 1147 | public class ForkJoinPool extends Abstra
1147      public static final ForkJoinWorkerThreadFactory
1148          defaultForkJoinWorkerThreadFactory;
1149  
1168
1150      /** Property prefix for constructing common pool */
1151      private static final String propPrefix =
1152          "java.util.concurrent.ForkJoinPool.common.";
# Line 1379 | Line 1360 | public class ForkJoinPool extends Abstra
1360                          try {
1361                              wait();
1362                          } catch (InterruptedException ie) {
1363 <                            Thread.currentThread().interrupt();
1363 >                            try {
1364 >                                Thread.currentThread().interrupt();
1365 >                            } catch (SecurityException ignore) {
1366 >                            }
1367                          }
1368                      }
1369                      else
# Line 1417 | Line 1401 | public class ForkJoinPool extends Abstra
1401       */
1402      final String nextWorkerName() {
1403          int n;
1404 <        do {} while(!U.compareAndSwapInt(this, NEXTWORKERNUMBER,
1405 <                                         n = nextWorkerNumber, ++n));
1404 >        do {} while (!U.compareAndSwapInt(this, NEXTWORKERNUMBER,
1405 >                                          n = nextWorkerNumber, ++n));
1406          return workerNamePrefix.concat(Integer.toString(n));
1407      }
1408  
# Line 1464 | Line 1448 | public class ForkJoinPool extends Abstra
1448                  synchronized (this) { notifyAll(); };
1449              }
1450          }
1467
1451      }
1452  
1453      /**
# Line 1481 | Line 1464 | public class ForkJoinPool extends Abstra
1464          if (wt != null && (w = wt.workQueue) != null) {
1465              w.runState = -1;                // ensure runState is set
1466              long steals = w.totalSteals + w.nsteals, sc;
1467 <            do {} while(!U.compareAndSwapLong(this, STEALCOUNT,
1468 <                                              sc = stealCount, sc + steals));
1467 >            do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
1468 >                                               sc = stealCount, sc + steals));
1469              int idx = w.poolIndex;
1470              while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1))
1471                  tryAwaitMainLock();
# Line 1513 | Line 1496 | public class ForkJoinPool extends Abstra
1496          }
1497  
1498          if (ex != null)                     // rethrow
1499 <            U.throwException(ex);
1499 >            ForkJoinTask.rethrow(ex);
1500      }
1501  
1502      // Submissions
# Line 1590 | Line 1573 | public class ForkJoinPool extends Abstra
1573      }
1574  
1575      /**
1593     * Returns true if caller is (or may be) submitter to the common
1594     * pool, and not all workers are active, and there appear to be
1595     * tasks in the associated submission queue.
1596     */
1597    static boolean canHelpCommonPool() {
1598        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1599        int k = submitters.get().seed & SQMASK;
1600        return ((p = commonPool) != null &&
1601                (int)(p.ctl >> AC_SHIFT) < 0 &&
1602                (ws = p.workQueues) != null &&
1603                ws.length > (k &= p.submitMask) &&
1604                (q = ws[k]) != null &&
1605                q.top - q.base > 0);
1606    }
1607
1608    /**
1576       * Returns true if the given task was submitted to common pool
1577       * and has not yet commenced execution, and is available for
1578       * removal according to execution policies; if so removing the
# Line 1615 | Line 1582 | public class ForkJoinPool extends Abstra
1582       * @return true if successful
1583       */
1584      static boolean tryUnsubmitFromCommonPool(ForkJoinTask<?> task) {
1585 <        // Peek, looking for task and eligibility before
1586 <        // using trySharedUnpush to actually take it under lock
1587 <        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1588 <        ForkJoinTask<?>[] a; int s;
1589 <        int k = submitters.get().seed & SQMASK;
1590 <        return ((p = commonPool) != null &&
1591 <                (int)(p.ctl >> AC_SHIFT) < 0 &&
1592 <                (ws = p.workQueues) != null &&
1593 <                ws.length > (k &= p.submitMask) &&
1594 <                (q = ws[k]) != null &&
1595 <                (a = q.array) != null &&
1596 <                (s = q.top - 1) - q.base >= 0 &&
1597 <                s >= 0 && s < a.length &&
1598 <                a[s] == task &&
1599 <                q.trySharedUnpush(task));
1585 >        // If not oversaturating platform, peek, looking for task and
1586 >        // eligibility before using trySharedUnpush to actually take
1587 >        // it under lock
1588 >        ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q;
1589 >        ForkJoinTask<?>[] a; int ac, s, m;
1590 >        if ((p = commonPool) != null && (ws = p.workQueues) != null) {
1591 >            int k = submitters.get().seed & p.submitMask & SQMASK;
1592 >            if ((m = ws.length - 1) >= k && (q = ws[k]) != null &&
1593 >                (ac = (int)(p.ctl >> AC_SHIFT)) <= 0) {
1594 >                if (ac == 0) { // double check if all workers active
1595 >                    for (int i = 1; i <= m; i += 2) {
1596 >                        if ((w = ws[i]) != null && w.parker != null) {
1597 >                            ac = -1;
1598 >                            break;
1599 >                        }
1600 >                    }
1601 >                }
1602 >                return (ac < 0 && (a = q.array) != null &&
1603 >                        (s = q.top - 1) - q.base >= 0 &&
1604 >                        s >= 0 && s < a.length &&
1605 >                        a[s] == task &&
1606 >                        q.trySharedUnpush(task));
1607 >            }
1608 >        }
1609 >        return false;
1610      }
1611  
1612      /**
1613 <     * Tries to pop a task from common pool with given root
1613 >     * Tries to pop and run a task within same computation from common pool
1614       */
1615 <    static ForkJoinTask<?> popCCFromCommonPool(CountedCompleter<?> root) {
1616 <        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1617 <        ForkJoinTask<?> t;
1618 <        int k = submitters.get().seed & SQMASK;
1619 <        if (root != null &&
1620 <            (p = commonPool) != null &&
1621 <            (int)(p.ctl >> AC_SHIFT) < 0 &&
1622 <            (ws = p.workQueues) != null &&
1623 <            ws.length > (k &= p.submitMask) &&
1624 <            (q = ws[k]) != null && q.top - q.base > 0 &&
1625 <            root.status < 0 &&
1626 <            (t = q.sharedPopCC(root)) != null)
1627 <            return t;
1628 <        return null;
1615 >    static void popAndExecCCFromCommonPool(CountedCompleter<?> cc) {
1616 >        ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; int m, ac;
1617 >        CountedCompleter<?> par, task;
1618 >        if ((p = commonPool) != null && (ws = p.workQueues) != null) {
1619 >            while ((par = cc.completer) != null) // find root
1620 >                cc = par;
1621 >            int k = submitters.get().seed & p.submitMask & SQMASK;
1622 >            if ((m = ws.length - 1) >= k && (q = ws[k]) != null &&
1623 >                (ac = (int)(p.ctl >> AC_SHIFT)) <= 0) {
1624 >                if (ac == 0) {
1625 >                    for (int i = 1; i <= m; i += 2) {
1626 >                        if ((w = ws[i]) != null && w.parker != null) {
1627 >                            ac = -1;
1628 >                            break;
1629 >                        }
1630 >                    }
1631 >                }
1632 >                if (ac < 0 && q.top - q.base > 0 &&
1633 >                    (task = q.sharedPopCC(cc)) != null)
1634 >                    task.exec();
1635 >            }
1636 >        }
1637      }
1638  
1654
1639      // Maintaining ctl counts
1640  
1641      /**
# Line 2192 | Line 2176 | public class ForkJoinPool extends Abstra
2176          ForkJoinTask<?> t = null;
2177          int k = submitters.get().seed & SQMASK;
2178          if ((p = commonPool) != null &&
2195            (int)(p.ctl >> AC_SHIFT) < 0 &&
2179              (ws = p.workQueues) != null &&
2180              ws.length > (k &= p.submitMask) &&
2181              (q = ws[k]) != null) {
# Line 2247 | Line 2230 | public class ForkJoinPool extends Abstra
2230      static int getEstimatedSubmitterQueueLength() {
2231          ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
2232          int k = submitters.get().seed & SQMASK;
2233 <        return ((p = commonPool) != null &&
2251 <                p.runState >= 0 &&
2252 <                (ws = p.workQueues) != null &&
2233 >        return ((p = commonPool) != null && (ws = p.workQueues) != null &&
2234                  ws.length > (k &= p.submitMask) &&
2235                  (q = ws[k]) != null) ?
2236              q.queueSize() : 0;
# Line 2275 | Line 2256 | public class ForkJoinPool extends Abstra
2256          for (long c;;) {
2257              if (((c = ctl) & STOP_BIT) != 0) {      // already terminating
2258                  if ((short)(c >>> TC_SHIFT) == -parallelism) {
2259 <                    synchronized(this) {
2259 >                    synchronized (this) {
2260                          notifyAll();                // signal when 0 workers
2261                      }
2262                  }
# Line 2999 | Line 2980 | public class ForkJoinPool extends Abstra
2980              return true;
2981          long startTime = System.nanoTime();
2982          boolean terminated = false;
2983 <        synchronized(this) {
2983 >        synchronized (this) {
2984              for (long waitTime = nanos, millis = 0L;;) {
2985                  if (terminated = isTerminated() ||
2986                      waitTime <= 0L ||
# Line 3184 | Line 3165 | public class ForkJoinPool extends Abstra
3165                  defaultForkJoinWorkerThreadFactory :
3166                  ((ForkJoinWorkerThreadFactory)ClassLoader.
3167                   getSystemClassLoader().loadClass(fp).newInstance());
3168 <            Thread.UncaughtExceptionHandler ueh = (up == null)? null :
3168 >            Thread.UncaughtExceptionHandler ueh = (up == null) ? null :
3169                  ((Thread.UncaughtExceptionHandler)ClassLoader.
3170                   getSystemClassLoader().loadClass(up).newInstance());
3171              int par;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines