ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166e/ForkJoinPool.java (file contents):
Revision 1.10 by jsr166, Tue Oct 30 16:05:35 2012 UTC vs.
Revision 1.11 by dl, Wed Oct 31 12:49:13 2012 UTC

# Line 736 | Line 736 | public class ForkJoinPool extends Abstra
736              return task;
737          }
738  
739        /**
740         * Version of pop that takes top element only if it
741         * its root is the given CountedCompleter.
742         */
743        final ForkJoinTask<?> popCC(CountedCompleter<?> root) {
744            ForkJoinTask<?>[] a; int m;
745            if (root != null && (a = array) != null && (m = a.length - 1) >= 0) {
746                for (int s; (s = top - 1) - base >= 0;) {
747                    long j = ((m & s) << ASHIFT) + ABASE;
748                    ForkJoinTask<?> t =
749                        (ForkJoinTask<?>)U.getObject(a, j);
750                    if (t == null || !(t instanceof CountedCompleter) ||
751                        ((CountedCompleter<?>)t).getRoot() != root)
752                        break;
753                    if (U.compareAndSwapObject(a, j, t, null)) {
754                        top = s;
755                        return t;
756                    }
757                    if (root.status < 0)
758                        break;
759                }
760            }
761            return null;
762        }
763
764        /**
765         * Shared version of popCC
766         */
767        final ForkJoinTask<?> sharedPopCC(CountedCompleter<?> root) {
768            ForkJoinTask<?> task = null;
769            if (root != null &&
770                runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
771                try {
772                    ForkJoinTask<?>[] a; int m;
773                    if ((a = array) != null && (m = a.length - 1) >= 0) {
774                        for (int s; (s = top - 1) - base >= 0;) {
775                            long j = ((m & s) << ASHIFT) + ABASE;
776                            ForkJoinTask<?> t =
777                                (ForkJoinTask<?>)U.getObject(a, j);
778                            if (t == null || !(t instanceof CountedCompleter) ||
779                                ((CountedCompleter<?>)t).getRoot() != root)
780                                break;
781                            if (U.compareAndSwapObject(a, j, t, null)) {
782                                top = s;
783                                task = t;
784                                break;
785                            }
786                            if (root.status < 0)
787                                break;
788                        }
789                    }
790                } finally {
791                    runState = 0;
792                }
793            }
794            return task;
795        }
739  
740          /**
741           * Takes a task in FIFO order if b is base of queue and a task
# Line 967 | Line 910 | public class ForkJoinPool extends Abstra
910              return seed = r ^= r << 5;
911          }
912  
913 <        // Execution methods
913 >        // Specialized execution methods
914  
915          /**
916           * Pops and runs tasks until empty.
# Line 1046 | Line 989 | public class ForkJoinPool extends Abstra
989          }
990  
991          /**
992 +         * Version of shared pop that takes top element only if it
993 +         * its root is the given CountedCompleter.
994 +         */
995 +        final CountedCompleter<?> sharedPopCC(CountedCompleter<?> root) {
996 +            CountedCompleter<?> task = null;
997 +            if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
998 +                try {
999 +                    ForkJoinTask<?>[] a; int m;
1000 +                    if ((a = array) != null && (m = a.length - 1) >= 0) {
1001 +                        outer:for (int s; (s = top - 1) - base >= 0;) {
1002 +                            long j = ((m & s) << ASHIFT) + ABASE;
1003 +                            ForkJoinTask<?> t =
1004 +                                (ForkJoinTask<?>)U.getObject(a, j);
1005 +                            if (t == null || !(t instanceof CountedCompleter))
1006 +                                break;
1007 +                            CountedCompleter<?> cc = (CountedCompleter<?>)t;
1008 +                            for (CountedCompleter<?> q = cc, p;;) {
1009 +                                if (q == root) {
1010 +                                    if (U.compareAndSwapObject(a, j, cc, null)) {
1011 +                                        top = s;
1012 +                                        task = cc;
1013 +                                        break outer;
1014 +                                    }
1015 +                                    break;
1016 +                                }
1017 +                                if ((p = q.completer) == null)
1018 +                                    break outer;
1019 +                                q = p;
1020 +                            }
1021 +                        }
1022 +                    }
1023 +                } finally {
1024 +                    runState = 0;
1025 +                }
1026 +            }
1027 +            return task;
1028 +        }
1029 +
1030 +        /**
1031           * Executes a top-level task and any local tasks remaining
1032           * after execution.
1033           */
# Line 1166 | Line 1148 | public class ForkJoinPool extends Abstra
1148      public static final ForkJoinWorkerThreadFactory
1149          defaultForkJoinWorkerThreadFactory;
1150  
1169
1151      /** Property prefix for constructing common pool */
1152      private static final String propPrefix =
1153          "java.util.concurrent.ForkJoinPool.common.";
# Line 1380 | Line 1361 | public class ForkJoinPool extends Abstra
1361                          try {
1362                              wait();
1363                          } catch (InterruptedException ie) {
1364 <                            Thread.currentThread().interrupt();
1364 >                            try {
1365 >                                Thread.currentThread().interrupt();
1366 >                            } catch (SecurityException ignore) {
1367 >                            }
1368                          }
1369                      }
1370                      else
# Line 1465 | Line 1449 | public class ForkJoinPool extends Abstra
1449                  synchronized (this) { notifyAll(); };
1450              }
1451          }
1468
1452      }
1453  
1454      /**
# Line 1514 | Line 1497 | public class ForkJoinPool extends Abstra
1497          }
1498  
1499          if (ex != null)                     // rethrow
1500 <            U.throwException(ex);
1500 >            ForkJoinTask.rethrow(ex);
1501      }
1502  
1503      // Submissions
# Line 1591 | Line 1574 | public class ForkJoinPool extends Abstra
1574      }
1575  
1576      /**
1594     * Returns true if caller is (or may be) submitter to the common
1595     * pool, and not all workers are active, and there appear to be
1596     * tasks in the associated submission queue.
1597     */
1598    static boolean canHelpCommonPool() {
1599        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1600        int k = submitters.get().seed & SQMASK;
1601        return ((p = commonPool) != null &&
1602                (int)(p.ctl >> AC_SHIFT) < 0 &&
1603                (ws = p.workQueues) != null &&
1604                ws.length > (k &= p.submitMask) &&
1605                (q = ws[k]) != null &&
1606                q.top - q.base > 0);
1607    }
1608
1609    /**
1577       * Returns true if the given task was submitted to common pool
1578       * and has not yet commenced execution, and is available for
1579       * removal according to execution policies; if so removing the
# Line 1616 | Line 1583 | public class ForkJoinPool extends Abstra
1583       * @return true if successful
1584       */
1585      static boolean tryUnsubmitFromCommonPool(ForkJoinTask<?> task) {
1586 <        // Peek, looking for task and eligibility before
1587 <        // using trySharedUnpush to actually take it under lock
1588 <        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1589 <        ForkJoinTask<?>[] a; int s;
1590 <        int k = submitters.get().seed & SQMASK;
1591 <        return ((p = commonPool) != null &&
1592 <                (int)(p.ctl >> AC_SHIFT) < 0 &&
1593 <                (ws = p.workQueues) != null &&
1594 <                ws.length > (k &= p.submitMask) &&
1595 <                (q = ws[k]) != null &&
1596 <                (a = q.array) != null &&
1597 <                (s = q.top - 1) - q.base >= 0 &&
1598 <                s >= 0 && s < a.length &&
1599 <                a[s] == task &&
1600 <                q.trySharedUnpush(task));
1586 >        // If not oversaturating platform, peek, looking for task and
1587 >        // eligibility before using trySharedUnpush to actually take
1588 >        // it under lock
1589 >        ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q;
1590 >        ForkJoinTask<?>[] a; int ac, s, m;
1591 >        if ((p = commonPool) != null && (ws = p.workQueues) != null) {
1592 >            int k = submitters.get().seed & p.submitMask & SQMASK;
1593 >            if ((m = ws.length - 1) >= k && (q = ws[k]) != null &&
1594 >                (ac = (int)(p.ctl >> AC_SHIFT)) <= 0) {
1595 >                if (ac == 0) { // double check if all workers active
1596 >                    for (int i = 1; i <= m; i += 2) {
1597 >                        if ((w = ws[i]) != null && w.parker != null) {
1598 >                            ac = -1;
1599 >                            break;
1600 >                        }
1601 >                    }
1602 >                }
1603 >                return (ac < 0 && (a = q.array) != null &&
1604 >                        (s = q.top - 1) - q.base >= 0 &&
1605 >                        s >= 0 && s < a.length &&
1606 >                        a[s] == task &&
1607 >                        q.trySharedUnpush(task));
1608 >            }
1609 >        }
1610 >        return false;
1611      }
1612  
1613      /**
1614 <     * Tries to pop a task from common pool with given root
1614 >     * Tries to pop and run a task within same computation from common pool
1615       */
1616 <    static ForkJoinTask<?> popCCFromCommonPool(CountedCompleter<?> root) {
1617 <        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1618 <        ForkJoinTask<?> t;
1619 <        int k = submitters.get().seed & SQMASK;
1620 <        if (root != null &&
1621 <            (p = commonPool) != null &&
1622 <            (int)(p.ctl >> AC_SHIFT) < 0 &&
1623 <            (ws = p.workQueues) != null &&
1624 <            ws.length > (k &= p.submitMask) &&
1625 <            (q = ws[k]) != null && q.top - q.base > 0 &&
1626 <            root.status < 0 &&
1627 <            (t = q.sharedPopCC(root)) != null)
1628 <            return t;
1629 <        return null;
1616 >    static void popAndExecCCFromCommonPool(CountedCompleter<?> cc) {
1617 >        ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; int m, ac;
1618 >        CountedCompleter<?> par, task;
1619 >        if ((p = commonPool) != null && (ws = p.workQueues) != null) {
1620 >            while ((par = cc.completer) != null) // find root
1621 >                cc = par;
1622 >            int k = submitters.get().seed & p.submitMask & SQMASK;
1623 >            if ((m = ws.length - 1) >= k && (q = ws[k]) != null &&
1624 >                (ac = (int)(p.ctl >> AC_SHIFT)) <= 0) {
1625 >                if (ac == 0) {
1626 >                    for (int i = 1; i <= m; i += 2) {
1627 >                        if ((w = ws[i]) != null && w.parker != null) {
1628 >                            ac = -1;
1629 >                            break;
1630 >                        }
1631 >                    }
1632 >                }
1633 >                if (ac < 0 && q.top - q.base > 0 &&
1634 >                    (task = q.sharedPopCC(cc)) != null)
1635 >                    task.exec();
1636 >            }
1637 >        }
1638      }
1639  
1655
1640      // Maintaining ctl counts
1641  
1642      /**
# Line 2193 | Line 2177 | public class ForkJoinPool extends Abstra
2177          ForkJoinTask<?> t = null;
2178          int k = submitters.get().seed & SQMASK;
2179          if ((p = commonPool) != null &&
2196            (int)(p.ctl >> AC_SHIFT) < 0 &&
2180              (ws = p.workQueues) != null &&
2181              ws.length > (k &= p.submitMask) &&
2182              (q = ws[k]) != null) {
# Line 2248 | Line 2231 | public class ForkJoinPool extends Abstra
2231      static int getEstimatedSubmitterQueueLength() {
2232          ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
2233          int k = submitters.get().seed & SQMASK;
2234 <        return ((p = commonPool) != null &&
2252 <                p.runState >= 0 &&
2253 <                (ws = p.workQueues) != null &&
2234 >        return ((p = commonPool) != null && (ws = p.workQueues) != null &&
2235                  ws.length > (k &= p.submitMask) &&
2236                  (q = ws[k]) != null) ?
2237              q.queueSize() : 0;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines