ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.126 by jsr166, Tue Feb 21 00:44:53 2012 UTC vs.
Revision 1.127 by dl, Sun Mar 4 15:52:45 2012 UTC

# Line 721 | Line 721 | public class ForkJoinPool extends Abstra
721           * version of this method because it is never needed.)
722           */
723          final ForkJoinTask<?> pop() {
724 <            ForkJoinTask<?> t; int m;
725 <            ForkJoinTask<?>[] a = array;
726 <            if (a != null && (m = a.length - 1) >= 0) {
724 >            ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
725 >            if ((a = array) != null && (m = a.length - 1) >= 0) {
726                  for (int s; (s = top - 1) - base >= 0;) {
727 <                    int j = ((m & s) << ASHIFT) + ABASE;
728 <                    if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) == null)
727 >                    long j = ((m & s) << ASHIFT) + ABASE;
728 >                    if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
729                          break;
730                      if (U.compareAndSwapObject(a, j, t, null)) {
731                          top = s;
# Line 830 | Line 829 | public class ForkJoinPool extends Abstra
829          }
830  
831          /**
833         * If present, removes from queue and executes the given task, or
834         * any other cancelled task. Returns (true) immediately on any CAS
835         * or consistency check failure so caller can retry.
836         *
837         * @return false if no progress can be made
838         */
839        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
840            boolean removed = false, empty = true, progress = true;
841            ForkJoinTask<?>[] a; int m, s, b, n;
842            if ((a = array) != null && (m = a.length - 1) >= 0 &&
843                (n = (s = top) - (b = base)) > 0) {
844                for (ForkJoinTask<?> t;;) {           // traverse from s to b
845                    int j = ((--s & m) << ASHIFT) + ABASE;
846                    t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
847                    if (t == null)                    // inconsistent length
848                        break;
849                    else if (t == task) {
850                        if (s + 1 == top) {           // pop
851                            if (!U.compareAndSwapObject(a, j, task, null))
852                                break;
853                            top = s;
854                            removed = true;
855                        }
856                        else if (base == b)           // replace with proxy
857                            removed = U.compareAndSwapObject(a, j, task,
858                                                             new EmptyTask());
859                        break;
860                    }
861                    else if (t.status >= 0)
862                        empty = false;
863                    else if (s + 1 == top) {          // pop and throw away
864                        if (U.compareAndSwapObject(a, j, t, null))
865                            top = s;
866                        break;
867                    }
868                    if (--n == 0) {
869                        if (!empty && base == b)
870                            progress = false;
871                        break;
872                    }
873                }
874            }
875            if (removed)
876                task.doExec();
877            return progress;
878        }
879
880        /**
832           * Initializes or doubles the capacity of array. Call either
833           * by owner or with lock held -- it is OK for base, but not
834           * top, to move while resizings are in progress.
# Line 939 | Line 890 | public class ForkJoinPool extends Abstra
890          // Execution methods
891  
892          /**
893 <         * Removes and runs tasks until empty, using local mode
943 <         * ordering. Normally called only after checking for apparent
944 <         * non-emptiness.
893 >         * Pops and runs tasks until empty.
894           */
895 <        final void runLocalTasks() {
896 <            // hoist checks from repeated pop/poll
897 <            ForkJoinTask<?>[] a; int m;
898 <            if ((a = array) != null && (m = a.length - 1) >= 0) {
899 <                if (mode == 0) {
900 <                    for (int s; (s = top - 1) - base >= 0;) {
901 <                        int j = ((m & s) << ASHIFT) + ABASE;
902 <                        ForkJoinTask<?> t =
903 <                            (ForkJoinTask<?>)U.getObjectVolatile(a, j);
904 <                        if (t != null) {
905 <                            if (U.compareAndSwapObject(a, j, t, null)) {
957 <                                top = s;
958 <                                t.doExec();
959 <                            }
960 <                        }
961 <                        else
962 <                            break;
963 <                    }
895 >        private void popAndExecAll() {
896 >            // A bit faster than repeated pop calls
897 >            ForkJoinTask<?>[] a; int m, s; long j; ForkJoinTask<?> t;
898 >            while ((a = array) != null && (m = a.length - 1) >= 0 &&
899 >                   (s = top - 1) - base >= 0 &&
900 >                   (t = ((ForkJoinTask<?>)
901 >                         U.getObject(a, j = ((m & s) << ASHIFT) + ABASE)))
902 >                   != null) {
903 >                if (U.compareAndSwapObject(a, j, t, null)) {
904 >                    top = s;
905 >                    t.doExec();
906                  }
907 <                else {
908 <                    for (int b; (b = base) - top < 0;) {
909 <                        int j = ((m & b) << ASHIFT) + ABASE;
910 <                        ForkJoinTask<?> t =
911 <                            (ForkJoinTask<?>)U.getObjectVolatile(a, j);
912 <                        if (t != null) {
913 <                            if (base == b &&
914 <                                U.compareAndSwapObject(a, j, t, null)) {
915 <                                base = b + 1;
916 <                                t.doExec();
917 <                            }
918 <                        } else if (base == b) {
919 <                            if (b + 1 == top)
907 >            }
908 >        }
909 >
910 >        /**
911 >         * Polls and runs tasks until empty.
912 >         */
913 >        private void pollAndExecAll() {
914 >            for (ForkJoinTask<?> t; (t = poll()) != null;)
915 >                t.doExec();
916 >        }
917 >
918 >        /**
919 >         * If present, removes from queue and executes the given task, or
920 >         * any other cancelled task. Returns (true) immediately on any CAS
921 >         * or consistency check failure so caller can retry.
922 >         *
923 >         * @return false if no progress can be made
924 >         */
925 >        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
926 >            boolean removed = false, empty = true, progress = true;
927 >            ForkJoinTask<?>[] a; int m, s, b, n;
928 >            if ((a = array) != null && (m = a.length - 1) >= 0 &&
929 >                (n = (s = top) - (b = base)) > 0) {
930 >                for (ForkJoinTask<?> t;;) {           // traverse from s to b
931 >                    int j = ((--s & m) << ASHIFT) + ABASE;
932 >                    t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
933 >                    if (t == null)                    // inconsistent length
934 >                        break;
935 >                    else if (t == task) {
936 >                        if (s + 1 == top) {           // pop
937 >                            if (!U.compareAndSwapObject(a, j, task, null))
938                                  break;
939 <                            Thread.yield(); // wait for lagging update
939 >                            top = s;
940 >                            removed = true;
941                          }
942 +                        else if (base == b)           // replace with proxy
943 +                            removed = U.compareAndSwapObject(a, j, task,
944 +                                                             new EmptyTask());
945 +                        break;
946 +                    }
947 +                    else if (t.status >= 0)
948 +                        empty = false;
949 +                    else if (s + 1 == top) {          // pop and throw away
950 +                        if (U.compareAndSwapObject(a, j, t, null))
951 +                            top = s;
952 +                        break;
953 +                    }
954 +                    if (--n == 0) {
955 +                        if (!empty && base == b)
956 +                            progress = false;
957 +                        break;
958                      }
959                  }
960              }
961 +            if (removed)
962 +                task.doExec();
963 +            return progress;
964          }
965  
966          /**
967           * Executes a top-level task and any local tasks remaining
968           * after execution.
989         *
990         * @return true unless terminating
969           */
970 <        final boolean runTask(ForkJoinTask<?> t) {
993 <            boolean alive = true;
970 >        final void runTask(ForkJoinTask<?> t) {
971              if (t != null) {
972                  currentSteal = t;
973                  t.doExec();
974 <                if (top != base)        // conservative guard
975 <                    runLocalTasks();
974 >                if (top != base) {       // process remaining local tasks
975 >                    if (mode == 0)
976 >                        popAndExecAll();
977 >                    else
978 >                        pollAndExecAll();
979 >                }
980                  ++nsteals;
981                  currentSteal = null;
982              }
1002            else if (runState < 0)      // terminating
1003                alive = false;
1004            return alive;
983          }
984  
985          /**
# Line 1321 | Line 1299 | public class ForkJoinPool extends Abstra
1299       *
1300       * @param w the worker's queue
1301       */
1302 +
1303      final void registerWorker(WorkQueue w) {
1304          Mutex lock = this.lock;
1305          lock.lock();
1306          try {
1307              WorkQueue[] ws = workQueues;
1308              if (w != null && ws != null) {          // skip on shutdown/failure
1309 <                int rs, n;
1331 <                while ((n = ws.length) <            // ensure can hold total
1332 <                       (parallelism + (short)(ctl >>> TC_SHIFT) << 1))
1333 <                    workQueues = ws = Arrays.copyOf(ws, n << 1);
1334 <                int m = n - 1;
1309 >                int rs, n =  ws.length, m = n - 1;
1310                  int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence
1311                  w.seed = (s == 0) ? 1 : s;          // ensure non-zero seed
1312                  int r = (s << 1) | 1;               // use odd-numbered indices
1313 <                while (ws[r &= m] != null)          // step by approx half size
1314 <                    r += ((n >>> 1) & SQMASK) + 2;
1313 >                if (ws[r &= m] != null) {           // collision
1314 >                    int probes = 0;                 // step by approx half size
1315 >                    int step = (n <= 4) ? 2 : ((n >>> 1) & SQMASK) + 2;
1316 >                    while (ws[r = (r + step) & m] != null) {
1317 >                        if (++probes >= n) {
1318 >                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1319 >                            m = n - 1;
1320 >                            probes = 0;
1321 >                        }
1322 >                    }
1323 >                }
1324                  w.eventCount = w.poolIndex = r;     // establish before recording
1325                  ws[r] = w;                          // also update seq
1326                  runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN);
# Line 1483 | Line 1467 | public class ForkJoinPool extends Abstra
1467          }
1468      }
1469  
1486
1470      // Scanning for tasks
1471  
1472      /**
# Line 1491 | Line 1474 | public class ForkJoinPool extends Abstra
1474       */
1475      final void runWorker(WorkQueue w) {
1476          w.growArray(false);         // initialize queue array in this thread
1477 <        do {} while (w.runTask(scan(w)));
1477 >        do { w.runTask(scan(w)); } while (w.runState >= 0);
1478      }
1479  
1480      /**
# Line 1554 | Line 1537 | public class ForkJoinPool extends Abstra
1537                          q.base = b + 1;       // specialization of pollAt
1538                          return t;
1539                      }
1540 <                    else if ((t != null || b + 1 != q.top) &&
1558 <                             (ec < 0 || j <= m)) {
1540 >                    else if (ec < 0 || j <= m) {
1541                          rs = 0;               // mark scan as imcomplete
1542                          break;                // caller can retry after release
1543                      }
# Line 1630 | Line 1612 | public class ForkJoinPool extends Abstra
1612       */
1613      private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1614          if (w.eventCount < 0 && !tryTerminate(false, false) &&
1615 <            (int)prevCtl != 0 && ctl == currentCtl) {
1615 >            (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) {
1616              Thread wt = Thread.currentThread();
1617              Thread.yield();            // yield before block
1618              while (ctl == currentCtl) {
# Line 1828 | Line 1810 | public class ForkJoinPool extends Abstra
1810       * @return task status on exit
1811       */
1812      final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
1813 +        int s;
1814          ForkJoinTask<?> prevJoin = joiner.currentJoin;
1815 <        joiner.currentJoin = task;
1816 <        long startTime = 0L;
1817 <        for (int k = 0, s; ; ++k) {
1818 <            if ((joiner.isEmpty() ?                  // try to help
1819 <                 !tryHelpStealer(joiner, task) :
1820 <                 !joiner.tryRemoveAndExec(task))) {
1821 <                if (k == 0) {
1822 <                    startTime = System.nanoTime();
1823 <                    tryPollForAndExec(joiner, task); // check uncommon case
1824 <                }
1825 <                else if ((k & (MAX_HELP - 1)) == 0 &&
1826 <                         System.nanoTime() - startTime >= COMPENSATION_DELAY &&
1827 <                         tryCompensate(task, null)) {
1828 <                    if (task.trySetSignal() && task.status >= 0) {
1829 <                        synchronized (task) {
1830 <                            if (task.status >= 0) {
1831 <                                try {                // see ForkJoinTask
1832 <                                    task.wait();     //  for explanation
1833 <                                } catch (InterruptedException ie) {
1815 >        if ((s = task.status) >= 0) {
1816 >            joiner.currentJoin = task;
1817 >            long startTime = 0L;
1818 >            for (int k = 0;;) {
1819 >                if ((joiner.isEmpty() ?                  // try to help
1820 >                     !tryHelpStealer(joiner, task) :
1821 >                     !joiner.tryRemoveAndExec(task))) {
1822 >                    if (k == 0) {
1823 >                        startTime = System.nanoTime();
1824 >                        tryPollForAndExec(joiner, task); // check uncommon case
1825 >                    }
1826 >                    else if ((k & (MAX_HELP - 1)) == 0 &&
1827 >                             System.nanoTime() - startTime >=
1828 >                             COMPENSATION_DELAY &&
1829 >                             tryCompensate(task, null)) {
1830 >                        if (task.trySetSignal() && task.status >= 0) {
1831 >                            synchronized (task) {
1832 >                                if (task.status >= 0) {
1833 >                                    try {                // see ForkJoinTask
1834 >                                        task.wait();     //  for explanation
1835 >                                    } catch (InterruptedException ie) {
1836 >                                    }
1837                                  }
1838 +                                else
1839 +                                    task.notifyAll();
1840                              }
1853                            else
1854                                task.notifyAll();
1841                          }
1842 +                        long c;                          // re-activate
1843 +                        do {} while (!U.compareAndSwapLong
1844 +                                     (this, CTL, c = ctl, c + AC_UNIT));
1845                      }
1857                    long c;                          // re-activate
1858                    do {} while (!U.compareAndSwapLong
1859                                 (this, CTL, c = ctl, c + AC_UNIT));
1846                  }
1847 +                if ((s = task.status) < 0) {
1848 +                    joiner.currentJoin = prevJoin;
1849 +                    break;
1850 +                }
1851 +                else if ((k++ & (MAX_HELP - 1)) == MAX_HELP >>> 1)
1852 +                    Thread.yield();                     // for politeness
1853              }
1862            if ((s = task.status) < 0) {
1863                joiner.currentJoin = prevJoin;
1864                return s;
1865            }
1866            else if ((k & (MAX_HELP - 1)) == MAX_HELP >>> 1)
1867                Thread.yield();                     // for politeness
1854          }
1855 +        return s;
1856      }
1857  
1858      /**
# Line 1922 | Line 1909 | public class ForkJoinPool extends Abstra
1909       */
1910      final void helpQuiescePool(WorkQueue w) {
1911          for (boolean active = true;;) {
1912 <            if (w.base - w.top < 0)
1913 <                w.runLocalTasks();  // exhaust local queue
1912 >            ForkJoinTask<?> localTask; // exhaust local queue
1913 >            while ((localTask = w.nextLocalTask()) != null)
1914 >                localTask.doExec();
1915              WorkQueue q = findNonEmptyStealQueue(w);
1916              if (q != null) {
1917                  ForkJoinTask<?> t; int b;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines