ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.138 by jsr166, Tue Oct 30 16:05:35 2012 UTC vs.
Revision 1.139 by dl, Wed Oct 31 12:49:24 2012 UTC

# Line 735 | Line 735 | public class ForkJoinPool extends Abstra
735              return task;
736          }
737  
738        /**
739         * Version of pop that takes top element only if it
740         * its root is the given CountedCompleter.
741         */
742        final ForkJoinTask<?> popCC(CountedCompleter<?> root) {
743            ForkJoinTask<?>[] a; int m;
744            if (root != null && (a = array) != null && (m = a.length - 1) >= 0) {
745                for (int s; (s = top - 1) - base >= 0;) {
746                    long j = ((m & s) << ASHIFT) + ABASE;
747                    ForkJoinTask<?> t =
748                        (ForkJoinTask<?>)U.getObject(a, j);
749                    if (t == null || !(t instanceof CountedCompleter) ||
750                        ((CountedCompleter<?>)t).getRoot() != root)
751                        break;
752                    if (U.compareAndSwapObject(a, j, t, null)) {
753                        top = s;
754                        return t;
755                    }
756                    if (root.status < 0)
757                        break;
758                }
759            }
760            return null;
761        }
762
763        /**
764         * Shared version of popCC
765         */
766        final ForkJoinTask<?> sharedPopCC(CountedCompleter<?> root) {
767            ForkJoinTask<?> task = null;
768            if (root != null &&
769                runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
770                try {
771                    ForkJoinTask<?>[] a; int m;
772                    if ((a = array) != null && (m = a.length - 1) >= 0) {
773                        for (int s; (s = top - 1) - base >= 0;) {
774                            long j = ((m & s) << ASHIFT) + ABASE;
775                            ForkJoinTask<?> t =
776                                (ForkJoinTask<?>)U.getObject(a, j);
777                            if (t == null || !(t instanceof CountedCompleter) ||
778                                ((CountedCompleter<?>)t).getRoot() != root)
779                                break;
780                            if (U.compareAndSwapObject(a, j, t, null)) {
781                                top = s;
782                                task = t;
783                                break;
784                            }
785                            if (root.status < 0)
786                                break;
787                        }
788                    }
789                } finally {
790                    runState = 0;
791                }
792            }
793            return task;
794        }
738  
739          /**
740           * Takes a task in FIFO order if b is base of queue and a task
# Line 966 | Line 909 | public class ForkJoinPool extends Abstra
909              return seed = r ^= r << 5;
910          }
911  
912 <        // Execution methods
912 >        // Specialized execution methods
913  
914          /**
915           * Pops and runs tasks until empty.
# Line 1045 | Line 988 | public class ForkJoinPool extends Abstra
988          }
989  
990          /**
991 +         * Version of shared pop that takes top element only if it
992 +         * its root is the given CountedCompleter.
993 +         */
994 +        final CountedCompleter<?> sharedPopCC(CountedCompleter<?> root) {
995 +            CountedCompleter<?> task = null;
996 +            if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
997 +                try {
998 +                    ForkJoinTask<?>[] a; int m;
999 +                    if ((a = array) != null && (m = a.length - 1) >= 0) {
1000 +                        outer:for (int s; (s = top - 1) - base >= 0;) {
1001 +                            long j = ((m & s) << ASHIFT) + ABASE;
1002 +                            ForkJoinTask<?> t =
1003 +                                (ForkJoinTask<?>)U.getObject(a, j);
1004 +                            if (t == null || !(t instanceof CountedCompleter))
1005 +                                break;
1006 +                            CountedCompleter<?> cc = (CountedCompleter<?>)t;
1007 +                            for (CountedCompleter<?> q = cc, p;;) {
1008 +                                if (q == root) {
1009 +                                    if (U.compareAndSwapObject(a, j, cc, null)) {
1010 +                                        top = s;
1011 +                                        task = cc;
1012 +                                        break outer;
1013 +                                    }
1014 +                                    break;
1015 +                                }
1016 +                                if ((p = q.completer) == null)
1017 +                                    break outer;
1018 +                                q = p;
1019 +                            }
1020 +                        }
1021 +                    }
1022 +                } finally {
1023 +                    runState = 0;
1024 +                }
1025 +            }
1026 +            return task;
1027 +        }
1028 +
1029 +        /**
1030           * Executes a top-level task and any local tasks remaining
1031           * after execution.
1032           */
# Line 1165 | Line 1147 | public class ForkJoinPool extends Abstra
1147      public static final ForkJoinWorkerThreadFactory
1148          defaultForkJoinWorkerThreadFactory;
1149  
1168
1150      /** Property prefix for constructing common pool */
1151      private static final String propPrefix =
1152          "java.util.concurrent.ForkJoinPool.common.";
# Line 1379 | Line 1360 | public class ForkJoinPool extends Abstra
1360                          try {
1361                              wait();
1362                          } catch (InterruptedException ie) {
1363 <                            Thread.currentThread().interrupt();
1363 >                            try {
1364 >                                Thread.currentThread().interrupt();
1365 >                            } catch (SecurityException ignore) {
1366 >                            }
1367                          }
1368                      }
1369                      else
# Line 1464 | Line 1448 | public class ForkJoinPool extends Abstra
1448                  synchronized (this) { notifyAll(); };
1449              }
1450          }
1467
1451      }
1452  
1453      /**
# Line 1513 | Line 1496 | public class ForkJoinPool extends Abstra
1496          }
1497  
1498          if (ex != null)                     // rethrow
1499 <            U.throwException(ex);
1499 >            ForkJoinTask.rethrow(ex);
1500      }
1501  
1502      // Submissions
# Line 1590 | Line 1573 | public class ForkJoinPool extends Abstra
1573      }
1574  
1575      /**
1593     * Returns true if caller is (or may be) submitter to the common
1594     * pool, and not all workers are active, and there appear to be
1595     * tasks in the associated submission queue.
1596     */
1597    static boolean canHelpCommonPool() {
1598        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1599        int k = submitters.get().seed & SQMASK;
1600        return ((p = commonPool) != null &&
1601                (int)(p.ctl >> AC_SHIFT) < 0 &&
1602                (ws = p.workQueues) != null &&
1603                ws.length > (k &= p.submitMask) &&
1604                (q = ws[k]) != null &&
1605                q.top - q.base > 0);
1606    }
1607
1608    /**
1576       * Returns true if the given task was submitted to common pool
1577       * and has not yet commenced execution, and is available for
1578       * removal according to execution policies; if so removing the
# Line 1615 | Line 1582 | public class ForkJoinPool extends Abstra
1582       * @return true if successful
1583       */
1584      static boolean tryUnsubmitFromCommonPool(ForkJoinTask<?> task) {
1585 <        // Peek, looking for task and eligibility before
1586 <        // using trySharedUnpush to actually take it under lock
1587 <        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1588 <        ForkJoinTask<?>[] a; int s;
1589 <        int k = submitters.get().seed & SQMASK;
1590 <        return ((p = commonPool) != null &&
1591 <                (int)(p.ctl >> AC_SHIFT) < 0 &&
1592 <                (ws = p.workQueues) != null &&
1593 <                ws.length > (k &= p.submitMask) &&
1594 <                (q = ws[k]) != null &&
1595 <                (a = q.array) != null &&
1596 <                (s = q.top - 1) - q.base >= 0 &&
1597 <                s >= 0 && s < a.length &&
1598 <                a[s] == task &&
1599 <                q.trySharedUnpush(task));
1585 >        // If not oversaturating platform, peek, looking for task and
1586 >        // eligibility before using trySharedUnpush to actually take
1587 >        // it under lock
1588 >        ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q;
1589 >        ForkJoinTask<?>[] a; int ac, s, m;
1590 >        if ((p = commonPool) != null && (ws = p.workQueues) != null) {
1591 >            int k = submitters.get().seed & p.submitMask & SQMASK;
1592 >            if ((m = ws.length - 1) >= k && (q = ws[k]) != null &&
1593 >                (ac = (int)(p.ctl >> AC_SHIFT)) <= 0) {
1594 >                if (ac == 0) { // double check if all workers active
1595 >                    for (int i = 1; i <= m; i += 2) {
1596 >                        if ((w = ws[i]) != null && w.parker != null) {
1597 >                            ac = -1;
1598 >                            break;
1599 >                        }
1600 >                    }
1601 >                }
1602 >                return (ac < 0 && (a = q.array) != null &&
1603 >                        (s = q.top - 1) - q.base >= 0 &&
1604 >                        s >= 0 && s < a.length &&
1605 >                        a[s] == task &&
1606 >                        q.trySharedUnpush(task));
1607 >            }
1608 >        }
1609 >        return false;
1610      }
1611  
1612      /**
1613 <     * Tries to pop a task from common pool with given root
1613 >     * Tries to pop and run a task within same computation from common pool
1614       */
1615 <    static ForkJoinTask<?> popCCFromCommonPool(CountedCompleter<?> root) {
1616 <        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1617 <        ForkJoinTask<?> t;
1618 <        int k = submitters.get().seed & SQMASK;
1619 <        if (root != null &&
1620 <            (p = commonPool) != null &&
1621 <            (int)(p.ctl >> AC_SHIFT) < 0 &&
1622 <            (ws = p.workQueues) != null &&
1623 <            ws.length > (k &= p.submitMask) &&
1624 <            (q = ws[k]) != null && q.top - q.base > 0 &&
1625 <            root.status < 0 &&
1626 <            (t = q.sharedPopCC(root)) != null)
1627 <            return t;
1628 <        return null;
1615 >    static void popAndExecCCFromCommonPool(CountedCompleter<?> cc) {
1616 >        ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; int m, ac;
1617 >        CountedCompleter<?> par, task;
1618 >        if ((p = commonPool) != null && (ws = p.workQueues) != null) {
1619 >            while ((par = cc.completer) != null) // find root
1620 >                cc = par;
1621 >            int k = submitters.get().seed & p.submitMask & SQMASK;
1622 >            if ((m = ws.length - 1) >= k && (q = ws[k]) != null &&
1623 >                (ac = (int)(p.ctl >> AC_SHIFT)) <= 0) {
1624 >                if (ac == 0) {
1625 >                    for (int i = 1; i <= m; i += 2) {
1626 >                        if ((w = ws[i]) != null && w.parker != null) {
1627 >                            ac = -1;
1628 >                            break;
1629 >                        }
1630 >                    }
1631 >                }
1632 >                if (ac < 0 && q.top - q.base > 0 &&
1633 >                    (task = q.sharedPopCC(cc)) != null)
1634 >                    task.exec();
1635 >            }
1636 >        }
1637      }
1638  
1654
1639      // Maintaining ctl counts
1640  
1641      /**
# Line 2192 | Line 2176 | public class ForkJoinPool extends Abstra
2176          ForkJoinTask<?> t = null;
2177          int k = submitters.get().seed & SQMASK;
2178          if ((p = commonPool) != null &&
2195            (int)(p.ctl >> AC_SHIFT) < 0 &&
2179              (ws = p.workQueues) != null &&
2180              ws.length > (k &= p.submitMask) &&
2181              (q = ws[k]) != null) {
# Line 2247 | Line 2230 | public class ForkJoinPool extends Abstra
2230      static int getEstimatedSubmitterQueueLength() {
2231          ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
2232          int k = submitters.get().seed & SQMASK;
2233 <        return ((p = commonPool) != null &&
2251 <                p.runState >= 0 &&
2252 <                (ws = p.workQueues) != null &&
2233 >        return ((p = commonPool) != null && (ws = p.workQueues) != null &&
2234                  ws.length > (k &= p.submitMask) &&
2235                  (q = ws[k]) != null) ?
2236              q.queueSize() : 0;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines