ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166e/ForkJoinPool.java (file contents):
Revision 1.8 by dl, Mon Oct 29 17:23:26 2012 UTC vs.
Revision 1.9 by dl, Tue Oct 30 14:23:03 2012 UTC

# Line 692 | Line 692 | public class ForkJoinPool extends Abstra
692  
693          /**
694           * Takes next task, if one exists, in LIFO order.  Call only
695 <         * by owner in unshared queues. (We do not have a shared
696 <         * version of this method because it is never needed.)
695 >         * by owner in unshared queues.
696           */
697          final ForkJoinTask<?> pop() {
698              ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
# Line 711 | Line 710 | public class ForkJoinPool extends Abstra
710              return null;
711          }
712  
713 +        final ForkJoinTask<?> sharedPop() {
714 +            ForkJoinTask<?> task = null;
715 +            if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
716 +                try {
717 +                    ForkJoinTask<?>[] a; int m;
718 +                    if ((a = array) != null && (m = a.length - 1) >= 0) {
719 +                        for (int s; (s = top - 1) - base >= 0;) {
720 +                            long j = ((m & s) << ASHIFT) + ABASE;
721 +                            ForkJoinTask<?> t =
722 +                                (ForkJoinTask<?>)U.getObject(a, j);
723 +                            if (t == null)
724 +                                break;
725 +                            if (U.compareAndSwapObject(a, j, t, null)) {
726 +                                top = s;
727 +                                task = t;
728 +                                break;
729 +                            }
730 +                        }
731 +                    }
732 +                } finally {
733 +                    runState = 0;
734 +                }
735 +            }
736 +            return task;
737 +        }
738 +
739 +        /**
740 +         * Version of pop that takes top element only if it
741 +         * its root is the given CountedCompleter.
742 +         */
743 +        final ForkJoinTask<?> popCC(CountedCompleter<?> root) {
744 +            ForkJoinTask<?>[] a; int m;
745 +            if (root != null && (a = array) != null && (m = a.length - 1) >= 0) {
746 +                for (int s; (s = top - 1) - base >= 0;) {
747 +                    long j = ((m & s) << ASHIFT) + ABASE;
748 +                    ForkJoinTask<?> t =
749 +                        (ForkJoinTask<?>)U.getObject(a, j);
750 +                    if (t == null || !(t instanceof CountedCompleter) ||
751 +                        ((CountedCompleter<?>)t).getRoot() != root)
752 +                        break;
753 +                    if (U.compareAndSwapObject(a, j, t, null)) {
754 +                        top = s;
755 +                        return t;
756 +                    }
757 +                    if (root.status < 0)
758 +                        break;
759 +                }
760 +            }
761 +            return null;
762 +        }
763 +
764 +        /**
765 +         * Shared version of popCC
766 +         */
767 +        final ForkJoinTask<?> sharedPopCC(CountedCompleter<?> root) {
768 +            ForkJoinTask<?> task = null;
769 +            if (root != null &&
770 +                runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
771 +                try {
772 +                    ForkJoinTask<?>[] a; int m;
773 +                    if ((a = array) != null && (m = a.length - 1) >= 0) {
774 +                        for (int s; (s = top - 1) - base >= 0;) {
775 +                            long j = ((m & s) << ASHIFT) + ABASE;
776 +                            ForkJoinTask<?> t =
777 +                                (ForkJoinTask<?>)U.getObject(a, j);
778 +                            if (t == null || !(t instanceof CountedCompleter) ||
779 +                                ((CountedCompleter<?>)t).getRoot() != root)
780 +                                break;
781 +                            if (U.compareAndSwapObject(a, j, t, null)) {
782 +                                top = s;
783 +                                task = t;
784 +                                break;
785 +                            }
786 +                            if (root.status < 0)
787 +                                break;
788 +                        }
789 +                    }
790 +                } finally {
791 +                    runState = 0;
792 +                }
793 +            }
794 +            return task;
795 +        }
796 +
797          /**
798           * Takes a task in FIFO order if b is base of queue and a task
799           * can be claimed without contention. Specialized versions
# Line 1508 | Line 1591 | public class ForkJoinPool extends Abstra
1591      }
1592  
1593      /**
1594 +     * Returns true if caller is (or may be) submitter to the common
1595 +     * pool, and not all workers are active, and there appear to be
1596 +     * tasks in the associated submission queue.
1597 +     */
1598 +    static boolean canHelpCommonPool() {
1599 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1600 +        int k = submitters.get().seed & SQMASK;
1601 +        return ((p = commonPool) != null &&
1602 +                (int)(p.ctl >> AC_SHIFT) < 0 &&
1603 +                (ws = p.workQueues) != null &&
1604 +                ws.length > (k &= p.submitMask) &&
1605 +                (q = ws[k]) != null &&
1606 +                q.top - q.base > 0);
1607 +    }
1608 +
1609 +    /**
1610       * Returns true if the given task was submitted to common pool
1611       * and has not yet commenced execution, and is available for
1612       * removal according to execution policies; if so removing the
# Line 1520 | Line 1619 | public class ForkJoinPool extends Abstra
1619          // Peek, looking for task and eligibility before
1620          // using trySharedUnpush to actually take it under lock
1621          ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1622 <        ForkJoinTask<?>[] a; int t, s, n;
1622 >        ForkJoinTask<?>[] a; int s;
1623          int k = submitters.get().seed & SQMASK;
1624          return ((p = commonPool) != null &&
1625 +                (int)(p.ctl >> AC_SHIFT) < 0 &&
1626                  (ws = p.workQueues) != null &&
1627                  ws.length > (k &= p.submitMask) &&
1628                  (q = ws[k]) != null &&
1629                  (a = q.array) != null &&
1630 <                (n = (t = q.top) - q.base) > 0 &&
1631 <                (n > 1 || (int)(p.ctl >> AC_SHIFT) < 0) &&
1632 <                (s = t - 1) >= 0 && s < a.length && a[s] == task &&
1630 >                (s = q.top - 1) - q.base >= 0 &&
1631 >                s >= 0 && s < a.length &&
1632 >                a[s] == task &&
1633                  q.trySharedUnpush(task));
1634      }
1635  
1636 +    /**
1637 +     * Tries to pop a task from common pool with given root
1638 +     */
1639 +    static ForkJoinTask<?> popCCFromCommonPool(CountedCompleter<?> root) {
1640 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1641 +        ForkJoinTask<?> t;
1642 +        int k = submitters.get().seed & SQMASK;
1643 +        if (root != null &&
1644 +            (p = commonPool) != null &&
1645 +            (int)(p.ctl >> AC_SHIFT) < 0 &&
1646 +            (ws = p.workQueues) != null &&
1647 +            ws.length > (k &= p.submitMask) &&
1648 +            (q = ws[k]) != null && q.top - q.base > 0 &&
1649 +            root.status < 0 &&
1650 +            (t = q.sharedPopCC(root)) != null)
1651 +            return t;
1652 +        return null;
1653 +    }
1654 +
1655 +
1656      // Maintaining ctl counts
1657  
1658      /**
# Line 2068 | Line 2188 | public class ForkJoinPool extends Abstra
2188       * Restricted version of helpQuiescePool for non-FJ callers
2189       */
2190      static void externalHelpQuiescePool() {
2191 <        ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q;
2192 <        ForkJoinTask<?> t; int b;
2191 >        ForkJoinPool p; WorkQueue[] ws; WorkQueue q, sq;
2192 >        ForkJoinTask<?>[] a; int b;
2193 >        ForkJoinTask<?> t = null;
2194          int k = submitters.get().seed & SQMASK;
2195          if ((p = commonPool) != null &&
2196 +            (int)(p.ctl >> AC_SHIFT) < 0 &&
2197              (ws = p.workQueues) != null &&
2198              ws.length > (k &= p.submitMask) &&
2199 <            (w = ws[k]) != null &&
2200 <            (q = p.findNonEmptyStealQueue(w)) != null &&
2201 <            (b = q.base) - q.top < 0 &&
2202 <            (t = q.pollAt(b)) != null)
2203 <            t.doExec();
2199 >            (q = ws[k]) != null) {
2200 >            while (q.top - q.base > 0) {
2201 >                if ((t = q.sharedPop()) != null)
2202 >                    break;
2203 >            }
2204 >            if (t == null && (sq = p.findNonEmptyStealQueue(q)) != null &&
2205 >                (b = sq.base) - sq.top < 0)
2206 >                t = sq.pollAt(b);
2207 >            if (t != null)
2208 >                t.doExec();
2209 >        }
2210      }
2211  
2212      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines