ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.93 by jsr166, Tue Feb 21 00:44:53 2012 UTC vs.
Revision 1.94 by dl, Sun Mar 4 15:52:41 2012 UTC

# Line 722 | Line 722 | public class ForkJoinPool extends Abstra
722           * version of this method because it is never needed.)
723           */
724          final ForkJoinTask<?> pop() {
725 <            ForkJoinTask<?> t; int m;
726 <            ForkJoinTask<?>[] a = array;
727 <            if (a != null && (m = a.length - 1) >= 0) {
725 >            ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
726 >            if ((a = array) != null && (m = a.length - 1) >= 0) {
727                  for (int s; (s = top - 1) - base >= 0;) {
728 <                    int j = ((m & s) << ASHIFT) + ABASE;
729 <                    if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) == null)
728 >                    long j = ((m & s) << ASHIFT) + ABASE;
729 >                    if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
730                          break;
731                      if (U.compareAndSwapObject(a, j, t, null)) {
732                          top = s;
# Line 831 | Line 830 | public class ForkJoinPool extends Abstra
830          }
831  
832          /**
834         * If present, removes from queue and executes the given task, or
835         * any other cancelled task. Returns (true) immediately on any CAS
836         * or consistency check failure so caller can retry.
837         *
838         * @return false if no progress can be made
839         */
840        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
841            boolean removed = false, empty = true, progress = true;
842            ForkJoinTask<?>[] a; int m, s, b, n;
843            if ((a = array) != null && (m = a.length - 1) >= 0 &&
844                (n = (s = top) - (b = base)) > 0) {
845                for (ForkJoinTask<?> t;;) {           // traverse from s to b
846                    int j = ((--s & m) << ASHIFT) + ABASE;
847                    t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
848                    if (t == null)                    // inconsistent length
849                        break;
850                    else if (t == task) {
851                        if (s + 1 == top) {           // pop
852                            if (!U.compareAndSwapObject(a, j, task, null))
853                                break;
854                            top = s;
855                            removed = true;
856                        }
857                        else if (base == b)           // replace with proxy
858                            removed = U.compareAndSwapObject(a, j, task,
859                                                             new EmptyTask());
860                        break;
861                    }
862                    else if (t.status >= 0)
863                        empty = false;
864                    else if (s + 1 == top) {          // pop and throw away
865                        if (U.compareAndSwapObject(a, j, t, null))
866                            top = s;
867                        break;
868                    }
869                    if (--n == 0) {
870                        if (!empty && base == b)
871                            progress = false;
872                        break;
873                    }
874                }
875            }
876            if (removed)
877                task.doExec();
878            return progress;
879        }
880
881        /**
833           * Initializes or doubles the capacity of array. Call either
834           * by owner or with lock held -- it is OK for base, but not
835           * top, to move while resizings are in progress.
# Line 940 | Line 891 | public class ForkJoinPool extends Abstra
891          // Execution methods
892  
893          /**
894 <         * Removes and runs tasks until empty, using local mode
944 <         * ordering. Normally called only after checking for apparent
945 <         * non-emptiness.
894 >         * Pops and runs tasks until empty.
895           */
896 <        final void runLocalTasks() {
897 <            // hoist checks from repeated pop/poll
898 <            ForkJoinTask<?>[] a; int m;
899 <            if ((a = array) != null && (m = a.length - 1) >= 0) {
900 <                if (mode == 0) {
901 <                    for (int s; (s = top - 1) - base >= 0;) {
902 <                        int j = ((m & s) << ASHIFT) + ABASE;
903 <                        ForkJoinTask<?> t =
904 <                            (ForkJoinTask<?>)U.getObjectVolatile(a, j);
905 <                        if (t != null) {
906 <                            if (U.compareAndSwapObject(a, j, t, null)) {
958 <                                top = s;
959 <                                t.doExec();
960 <                            }
961 <                        }
962 <                        else
963 <                            break;
964 <                    }
896 >        private void popAndExecAll() {
897 >            // A bit faster than repeated pop calls
898 >            ForkJoinTask<?>[] a; int m, s; long j; ForkJoinTask<?> t;
899 >            while ((a = array) != null && (m = a.length - 1) >= 0 &&
900 >                   (s = top - 1) - base >= 0 &&
901 >                   (t = ((ForkJoinTask<?>)
902 >                         U.getObject(a, j = ((m & s) << ASHIFT) + ABASE)))
903 >                   != null) {
904 >                if (U.compareAndSwapObject(a, j, t, null)) {
905 >                    top = s;
906 >                    t.doExec();
907                  }
908 <                else {
909 <                    for (int b; (b = base) - top < 0;) {
910 <                        int j = ((m & b) << ASHIFT) + ABASE;
911 <                        ForkJoinTask<?> t =
912 <                            (ForkJoinTask<?>)U.getObjectVolatile(a, j);
913 <                        if (t != null) {
914 <                            if (base == b &&
915 <                                U.compareAndSwapObject(a, j, t, null)) {
916 <                                base = b + 1;
917 <                                t.doExec();
918 <                            }
919 <                        } else if (base == b) {
920 <                            if (b + 1 == top)
908 >            }
909 >        }
910 >
911 >        /**
912 >         * Polls and runs tasks until empty.
913 >         */
914 >        private void pollAndExecAll() {
915 >            for (ForkJoinTask<?> t; (t = poll()) != null;)
916 >                t.doExec();
917 >        }
918 >
919 >        /**
920 >         * If present, removes from queue and executes the given task, or
921 >         * any other cancelled task. Returns (true) immediately on any CAS
922 >         * or consistency check failure so caller can retry.
923 >         *
924 >         * @return false if no progress can be made
925 >         */
926 >        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
927 >            boolean removed = false, empty = true, progress = true;
928 >            ForkJoinTask<?>[] a; int m, s, b, n;
929 >            if ((a = array) != null && (m = a.length - 1) >= 0 &&
930 >                (n = (s = top) - (b = base)) > 0) {
931 >                for (ForkJoinTask<?> t;;) {           // traverse from s to b
932 >                    int j = ((--s & m) << ASHIFT) + ABASE;
933 >                    t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
934 >                    if (t == null)                    // inconsistent length
935 >                        break;
936 >                    else if (t == task) {
937 >                        if (s + 1 == top) {           // pop
938 >                            if (!U.compareAndSwapObject(a, j, task, null))
939                                  break;
940 <                            Thread.yield(); // wait for lagging update
940 >                            top = s;
941 >                            removed = true;
942                          }
943 +                        else if (base == b)           // replace with proxy
944 +                            removed = U.compareAndSwapObject(a, j, task,
945 +                                                             new EmptyTask());
946 +                        break;
947 +                    }
948 +                    else if (t.status >= 0)
949 +                        empty = false;
950 +                    else if (s + 1 == top) {          // pop and throw away
951 +                        if (U.compareAndSwapObject(a, j, t, null))
952 +                            top = s;
953 +                        break;
954 +                    }
955 +                    if (--n == 0) {
956 +                        if (!empty && base == b)
957 +                            progress = false;
958 +                        break;
959                      }
960                  }
961              }
962 +            if (removed)
963 +                task.doExec();
964 +            return progress;
965          }
966  
967          /**
968           * Executes a top-level task and any local tasks remaining
969           * after execution.
990         *
991         * @return true unless terminating
970           */
971 <        final boolean runTask(ForkJoinTask<?> t) {
994 <            boolean alive = true;
971 >        final void runTask(ForkJoinTask<?> t) {
972              if (t != null) {
973                  currentSteal = t;
974                  t.doExec();
975 <                if (top != base)        // conservative guard
976 <                    runLocalTasks();
975 >                if (top != base) {       // process remaining local tasks
976 >                    if (mode == 0)
977 >                        popAndExecAll();
978 >                    else
979 >                        pollAndExecAll();
980 >                }
981                  ++nsteals;
982                  currentSteal = null;
983              }
1003            else if (runState < 0)      // terminating
1004                alive = false;
1005            return alive;
984          }
985  
986          /**
# Line 1322 | Line 1300 | public class ForkJoinPool extends Abstra
1300       *
1301       * @param w the worker's queue
1302       */
1303 +
1304      final void registerWorker(WorkQueue w) {
1305          Mutex lock = this.lock;
1306          lock.lock();
1307          try {
1308              WorkQueue[] ws = workQueues;
1309              if (w != null && ws != null) {          // skip on shutdown/failure
1310 <                int rs, n;
1332 <                while ((n = ws.length) <            // ensure can hold total
1333 <                       (parallelism + (short)(ctl >>> TC_SHIFT) << 1))
1334 <                    workQueues = ws = Arrays.copyOf(ws, n << 1);
1335 <                int m = n - 1;
1310 >                int rs, n =  ws.length, m = n - 1;
1311                  int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence
1312                  w.seed = (s == 0) ? 1 : s;          // ensure non-zero seed
1313                  int r = (s << 1) | 1;               // use odd-numbered indices
1314 <                while (ws[r &= m] != null)          // step by approx half size
1315 <                    r += ((n >>> 1) & SQMASK) + 2;
1314 >                if (ws[r &= m] != null) {           // collision
1315 >                    int probes = 0;                 // step by approx half size
1316 >                    int step = (n <= 4) ? 2 : ((n >>> 1) & SQMASK) + 2;
1317 >                    while (ws[r = (r + step) & m] != null) {
1318 >                        if (++probes >= n) {
1319 >                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1320 >                            m = n - 1;
1321 >                            probes = 0;
1322 >                        }
1323 >                    }
1324 >                }
1325                  w.eventCount = w.poolIndex = r;     // establish before recording
1326                  ws[r] = w;                          // also update seq
1327                  runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN);
# Line 1484 | Line 1468 | public class ForkJoinPool extends Abstra
1468          }
1469      }
1470  
1487
1471      // Scanning for tasks
1472  
1473      /**
# Line 1492 | Line 1475 | public class ForkJoinPool extends Abstra
1475       */
1476      final void runWorker(WorkQueue w) {
1477          w.growArray(false);         // initialize queue array in this thread
1478 <        do {} while (w.runTask(scan(w)));
1478 >        do { w.runTask(scan(w)); } while (w.runState >= 0);
1479      }
1480  
1481      /**
# Line 1555 | Line 1538 | public class ForkJoinPool extends Abstra
1538                          q.base = b + 1;       // specialization of pollAt
1539                          return t;
1540                      }
1541 <                    else if ((t != null || b + 1 != q.top) &&
1559 <                             (ec < 0 || j <= m)) {
1541 >                    else if (ec < 0 || j <= m) {
1542                          rs = 0;               // mark scan as imcomplete
1543                          break;                // caller can retry after release
1544                      }
# Line 1631 | Line 1613 | public class ForkJoinPool extends Abstra
1613       */
1614      private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1615          if (w.eventCount < 0 && !tryTerminate(false, false) &&
1616 <            (int)prevCtl != 0 && ctl == currentCtl) {
1616 >            (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) {
1617              Thread wt = Thread.currentThread();
1618              Thread.yield();            // yield before block
1619              while (ctl == currentCtl) {
# Line 1829 | Line 1811 | public class ForkJoinPool extends Abstra
1811       * @return task status on exit
1812       */
1813      final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
1814 +        int s;
1815          ForkJoinTask<?> prevJoin = joiner.currentJoin;
1816 <        joiner.currentJoin = task;
1817 <        long startTime = 0L;
1818 <        for (int k = 0, s; ; ++k) {
1819 <            if ((joiner.isEmpty() ?                  // try to help
1820 <                 !tryHelpStealer(joiner, task) :
1821 <                 !joiner.tryRemoveAndExec(task))) {
1822 <                if (k == 0) {
1823 <                    startTime = System.nanoTime();
1824 <                    tryPollForAndExec(joiner, task); // check uncommon case
1825 <                }
1826 <                else if ((k & (MAX_HELP - 1)) == 0 &&
1827 <                         System.nanoTime() - startTime >= COMPENSATION_DELAY &&
1828 <                         tryCompensate(task, null)) {
1829 <                    if (task.trySetSignal() && task.status >= 0) {
1830 <                        synchronized (task) {
1831 <                            if (task.status >= 0) {
1832 <                                try {                // see ForkJoinTask
1833 <                                    task.wait();     //  for explanation
1834 <                                } catch (InterruptedException ie) {
1816 >        if ((s = task.status) >= 0) {
1817 >            joiner.currentJoin = task;
1818 >            long startTime = 0L;
1819 >            for (int k = 0;;) {
1820 >                if ((joiner.isEmpty() ?                  // try to help
1821 >                     !tryHelpStealer(joiner, task) :
1822 >                     !joiner.tryRemoveAndExec(task))) {
1823 >                    if (k == 0) {
1824 >                        startTime = System.nanoTime();
1825 >                        tryPollForAndExec(joiner, task); // check uncommon case
1826 >                    }
1827 >                    else if ((k & (MAX_HELP - 1)) == 0 &&
1828 >                             System.nanoTime() - startTime >=
1829 >                             COMPENSATION_DELAY &&
1830 >                             tryCompensate(task, null)) {
1831 >                        if (task.trySetSignal() && task.status >= 0) {
1832 >                            synchronized (task) {
1833 >                                if (task.status >= 0) {
1834 >                                    try {                // see ForkJoinTask
1835 >                                        task.wait();     //  for explanation
1836 >                                    } catch (InterruptedException ie) {
1837 >                                    }
1838                                  }
1839 +                                else
1840 +                                    task.notifyAll();
1841                              }
1854                            else
1855                                task.notifyAll();
1842                          }
1843 +                        long c;                          // re-activate
1844 +                        do {} while (!U.compareAndSwapLong
1845 +                                     (this, CTL, c = ctl, c + AC_UNIT));
1846                      }
1858                    long c;                          // re-activate
1859                    do {} while (!U.compareAndSwapLong
1860                                 (this, CTL, c = ctl, c + AC_UNIT));
1847                  }
1848 +                if ((s = task.status) < 0) {
1849 +                    joiner.currentJoin = prevJoin;
1850 +                    break;
1851 +                }
1852 +                else if ((k++ & (MAX_HELP - 1)) == MAX_HELP >>> 1)
1853 +                    Thread.yield();                     // for politeness
1854              }
1863            if ((s = task.status) < 0) {
1864                joiner.currentJoin = prevJoin;
1865                return s;
1866            }
1867            else if ((k & (MAX_HELP - 1)) == MAX_HELP >>> 1)
1868                Thread.yield();                     // for politeness
1855          }
1856 +        return s;
1857      }
1858  
1859      /**
# Line 1923 | Line 1910 | public class ForkJoinPool extends Abstra
1910       */
1911      final void helpQuiescePool(WorkQueue w) {
1912          for (boolean active = true;;) {
1913 <            if (w.base - w.top < 0)
1914 <                w.runLocalTasks();  // exhaust local queue
1913 >            ForkJoinTask<?> localTask; // exhaust local queue
1914 >            while ((localTask = w.nextLocalTask()) != null)
1915 >                localTask.doExec();
1916              WorkQueue q = findNonEmptyStealQueue(w);
1917              if (q != null) {
1918                  ForkJoinTask<?> t; int b;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines