ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.288 by dl, Tue Oct 6 23:01:52 2015 UTC vs.
Revision 1.289 by dl, Sat Oct 10 12:12:00 2015 UTC

# Line 841 | Line 841 | public class ForkJoinPool extends Abstra
841                  a[(al - 1) & s] = task;  // relaxed writes OK
842                  top = s + 1;
843                  ForkJoinPool p = pool;
844 <                U.storeFence();          // ensure fields written
845 <                if (((d = b - s) == 0 || b != base) && p != null) {
844 >                if ((d = b - s) == 0 && p != null) {
845                      U.fullFence();
846                      p.signalWork();
847                  }
# Line 1636 | Line 1635 | public class ForkJoinPool extends Abstra
1635          while (tryTerminate(false, false) >= 0) {     // possibly replace
1636              WorkQueue[] ws; int wl, sp; long c;
1637              if (w == null || w.array == null ||
1638 <                (runState & STOP) != 0 || (ws = workQueues) == null ||
1640 <                (wl = ws.length) <= 0)                // already terminating
1638 >                (ws = workQueues) == null || (wl = ws.length) <= 0)
1639                  break;
1640              else if ((sp = (int)(c = ctl)) != 0) {    // wake up replacement
1641                  if (tryRelease(c, ws[(wl - 1) & sp], AC_UNIT))
# Line 1775 | Line 1773 | public class ForkJoinPool extends Abstra
1773                  stat = timedAwaitWork(w, c);     // possibly quiescent
1774              else if ((runState & STOP) != 0)
1775                  stat = w.qlock = -1;             // pool terminating
1776 <            else if ((stat = w.qlock) >= 0 && w.scanState < 0) {
1776 >            else if (w.scanState < 0) {
1777                  w.parker = Thread.currentThread();
1778 <                if (w.scanState < 0) {           // recheck after write
1778 >                if (w.scanState < 0)             // recheck after write
1779                      LockSupport.park(this);
1782                    if ((stat = w.qlock) >= 0 && w.scanState < 0) {
1783                        Thread.interrupted();    // clear status and retry once
1784                        if ((runState & STOP) != 0)
1785                            stat = w.qlock = -1;
1786                        else
1787                            LockSupport.park(this);
1788                    }
1789                }
1780                  w.parker = null;
1781 +                if ((runState & STOP) != 0)
1782 +                    stat = w.qlock = -1;         // recheck
1783 +                else if (w.scanState < 0)
1784 +                    Thread.interrupted();        // clear status
1785              }
1786          }
1787          return stat;
# Line 1808 | Line 1802 | public class ForkJoinPool extends Abstra
1802          int scale = 1 - (short)(c >>> TC_SHIFT);
1803          long deadline = (((scale <= 0) ? 1 : scale) * IDLE_TIMEOUT_MS +
1804                           System.currentTimeMillis());
1805 <        if (w != null && w.scanState < 0) {
1805 >        if ((runState >= 0 || (stat = tryTerminate(false, false)) > 0) &&
1806 >            w != null && w.scanState < 0) {
1807              int ss; AuxState aux;
1808 <            if ((runState >= 0 ||
1809 <                 (stat = tryTerminate(false, false)) > 0) &&
1810 <                ((stat = w.qlock) >= 0 && w.scanState < 0)) {
1811 <                w.parker = Thread.currentThread();
1812 <                if (w.scanState < 0)
1813 <                    LockSupport.parkUntil(this, deadline);
1814 <                w.parker = null;
1815 <                if ((stat = w.qlock) >= 0 && (ss = w.scanState) < 0 &&
1816 <                    !Thread.interrupted() && (int)c == ss &&
1817 <                    (aux = auxState) != null && ctl == c &&
1818 <                    deadline - System.currentTimeMillis() <= TIMEOUT_SLOP_MS) {
1819 <                    aux.lock();
1820 <                    try {          // pre-deregister
1821 <                        WorkQueue[] ws;
1822 <                        int cfg = w.config, idx = cfg & SMASK;
1823 <                        long nc = ((UC_MASK & (c - TC_UNIT)) |
1824 <                                   (SP_MASK & w.stackPred));
1825 <                        if ((runState & STOP) == 0 &&
1826 <                            (ws = workQueues) != null &&
1827 <                            idx < ws.length && idx >= 0 && ws[idx] == w &&
1828 <                            U.compareAndSwapLong(this, CTL, c, nc)) {
1829 <                            ws[idx] = null;
1835 <                            w.config = cfg | UNREGISTERED;
1836 <                            stat = w.qlock = -1;
1837 <                        }
1838 <                    } finally {
1839 <                        aux.unlock();
1808 >            w.parker = Thread.currentThread();
1809 >            if (w.scanState < 0)
1810 >                LockSupport.parkUntil(this, deadline);
1811 >            w.parker = null;
1812 >            if ((runState & STOP) != 0)
1813 >                stat = w.qlock = -1;         // pool terminating
1814 >            else if ((ss = w.scanState) < 0 && !Thread.interrupted() &&
1815 >                     (int)c == ss && (aux = auxState) != null && ctl == c &&
1816 >                     deadline - System.currentTimeMillis() <= TIMEOUT_SLOP_MS) {
1817 >                aux.lock();
1818 >                try {                        // pre-deregister
1819 >                    WorkQueue[] ws;
1820 >                    int cfg = w.config, idx = cfg & SMASK;
1821 >                    long nc = ((UC_MASK & (c - TC_UNIT)) |
1822 >                               (SP_MASK & w.stackPred));
1823 >                    if ((runState & STOP) == 0 &&
1824 >                        (ws = workQueues) != null &&
1825 >                        idx < ws.length && idx >= 0 && ws[idx] == w &&
1826 >                        U.compareAndSwapLong(this, CTL, c, nc)) {
1827 >                        ws[idx] = null;
1828 >                        w.config = cfg | UNREGISTERED;
1829 >                        stat = w.qlock = -1;
1830                      }
1831 +                } finally {
1832 +                    aux.unlock();
1833                  }
1834              }
1835          }
# Line 1907 | Line 1899 | public class ForkJoinPool extends Abstra
1899          w.growArray();                                  // allocate queue
1900          int bound = (w.config & SPARE_WORKER) != 0 ? 0 : POLL_LIMIT;
1901          long seed = w.hint * 0xdaba0b6eb09322e3L;       // initial random seed
1902 <        for (long r = (seed == 0L) ? 1L : seed;;) {     // ensure nonzero
1903 <            if (bound == 0 && tryDropSpare(w))
1904 <                break;
1905 <            // high bits of prev seed for step; current low bits for idx
1906 <            int step = (int)(r >>> 48) | 1;
1907 <            r ^= r >>> 12; r ^= r << 25; r ^= r >>> 27; // xorshift
1908 <            if (scan(w, bound, step, (int)r) < 0 && awaitWork(w) < 0)
1909 <                break;
1902 >        if ((runState & STOP) == 0) {
1903 >            for (long r = (seed == 0L) ? 1L : seed;;) { // ensure nonzero
1904 >                if (bound == 0 && tryDropSpare(w))
1905 >                    break;
1906 >                // high bits of prev seed for step; current low bits for idx
1907 >                int step = (int)(r >>> 48) | 1;
1908 >                r ^= r >>> 12; r ^= r << 25; r ^= r >>> 27; // xorshift
1909 >                if (scan(w, bound, step, (int)r) < 0 && awaitWork(w) < 0)
1910 >                    break;
1911 >            }
1912          }
1913      }
1914  
# Line 1942 | Line 1936 | public class ForkJoinPool extends Abstra
1936                       origin = m & r, idx = origin,
1937                       npolls = 0,
1938                       ss = w.scanState;;) {         // negative if inactive
1939 <                WorkQueue q; ForkJoinTask<?>[] a; int b, d, al;
1940 <                if ((q = ws[idx]) != null && (d = (b = q.base) - q.top) < 0 &&
1939 >                WorkQueue q; ForkJoinTask<?>[] a; int b, al;
1940 >                if ((q = ws[idx]) != null && (b = q.base) - q.top < 0 &&
1941                      (a = q.array) != null && (al = a.length) > 0) {
1942                      int index = (al - 1) & b;
1943                      long offset = ((long)index << ASHIFT) + ABASE;
# Line 1962 | Line 1956 | public class ForkJoinPool extends Abstra
1956                      else {
1957                          q.base = b;
1958                          w.currentSteal = t;
1959 <                        if (d != -1 || b != q.top) // propagate signal
1959 >                        if (b != q.top)            // propagate signal
1960                              signalWork();
1961                          w.runTask(t);
1962                          if (++npolls > bound)
# Line 2384 | Line 2378 | public class ForkJoinPool extends Abstra
2378       * @return -1 : terminating or terminated, 0: retry if internal caller, else 1
2379       */
2380      private int tryTerminate(boolean now, boolean enable) {
2381 <        AuxState aux; int rs;
2388 <        if ((rs = runState) >= 0 && (!enable || this == common))
2389 <            return 1;
2381 >        AuxState aux;
2382          while ((aux = auxState) == null)
2383 <            tryInitialize(false);
2384 <        aux.lock();
2385 <        rs = runState = runState | SHUTDOWN;
2386 <        aux.unlock();
2387 <        if ((rs & STOP) == 0) {
2383 >            tryInitialize(false);                 // ensure initialized
2384 >
2385 >        if ((runState & SHUTDOWN) == 0) {
2386 >            if (!enable || this == common)
2387 >                return 1;
2388 >            aux.lock();
2389 >            runState = runState | SHUTDOWN;
2390 >            aux.unlock();
2391 >        }
2392 >
2393 >        if ((runState & STOP) == 0) {
2394              if (!now) {                           // check quiescence
2395                  for (long oldSum = 0L;;) {        // repeat until stable
2396 <                    WorkQueue[] ws; WorkQueue w; int m, b;
2396 >                    WorkQueue[] ws; WorkQueue w; int b;
2397                      long checkSum = ctl;
2398                      if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
2399                          return 0;                 // still active workers
2400 <                    if ((ws = workQueues) == null || (m = ws.length - 1) < 0)
2401 <                        break;                    // check queues
2402 <                    for (int i = 0; i <= m; ++i) {
2403 <                        if ((w = ws[i]) != null) {
2404 <                            checkSum += (b = w.base);
2405 <                            if (w.currentSteal != null || b != w.top)
2406 <                                return 0;         // retry if internal caller
2400 >                    if ((ws = workQueues) != null) {
2401 >                        for (int i = 0; i < ws.length; ++i) {
2402 >                            if ((w = ws[i]) != null) {
2403 >                                checkSum += (b = w.base);
2404 >                                if (w.currentSteal != null || b != w.top)
2405 >                                    return 0;     // retry if internal caller
2406 >                            }
2407                          }
2408                      }
2409                      if (oldSum == (oldSum = checkSum))
# Line 2413 | Line 2411 | public class ForkJoinPool extends Abstra
2411                  }
2412              }
2413              aux.lock();
2414 <            rs = runState = runState | STOP;
2414 >            runState = runState | STOP;
2415              aux.unlock();
2416          }
2417  
2418 <        int pass = 0;                             // 3 passes to help terminate
2419 <        for (long oldSum = 0L;;) {                // or until done or stable
2422 <            WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
2418 >        for (long oldSum = 0L;;) {                // repeat until stable
2419 >            WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt;
2420              long checkSum = ctl;
2421 <            if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
2422 <                (ws = workQueues) == null || (m = ws.length - 1) < 0) {
2423 <                aux.lock();
2424 <                runState |= TERMINATED;
2425 <                aux.unlock();
2426 <                synchronized (this) {
2427 <                    notifyAll();                  // for awaitTermination
2428 <                }
2429 <                break;
2433 <            }
2434 <            for (int i = 0; i <= m; ++i) {
2435 <                if ((w = ws[i]) != null) {
2436 <                    checkSum += w.base;
2437 <                    w.qlock = -1;                 // try to disable
2438 <                    if (pass > 0) {
2439 <                        w.cancelAll();            // clear queue
2440 <                        if (pass > 1 && (wt = w.owner) != null) {
2441 <                            if (!wt.isInterrupted()) {
2442 <                                try {             // unblock join
2421 >            if ((ws = workQueues) != null) {      // help terminate others
2422 >                for (int i = 0; i < ws.length; ++i) {
2423 >                    if ((w = ws[i]) != null) {
2424 >                        w.cancelAll();            // clear queues
2425 >                        checkSum += w.base;
2426 >                        if (w.qlock >= 0) {
2427 >                            w.qlock = -1;         // racy set OK
2428 >                            if ((wt = w.owner) != null) {
2429 >                                try {             // unblock join or park
2430                                      wt.interrupt();
2431                                  } catch (Throwable ignore) {
2432                                  }
2433                              }
2447                            LockSupport.unpark(wt);
2434                          }
2435                      }
2436                  }
2437              }
2438 <            if (checkSum != oldSum) {             // unstable
2453 <                oldSum = checkSum;
2454 <                pass = 0;
2455 <            }
2456 <            else if (pass > 3 && pass > m)        // can't further help
2438 >            if (oldSum == (oldSum = checkSum))
2439                  break;
2440 <            else if (++pass > 1) {                // try to dequeue
2441 <                long c; int j = 0, sp;            // bound attempts
2442 <                while (j++ <= m && (sp = (int)(c = ctl)) != 0)
2443 <                    tryRelease(c, ws[sp & m], AC_UNIT);
2440 >        }
2441 >
2442 >        if ((short)(ctl >>> TC_SHIFT) + (config & SMASK) <= 0) {
2443 >            aux.lock();
2444 >            runState |= TERMINATED;
2445 >            aux.unlock();
2446 >            synchronized (this) {
2447 >                notifyAll();                      // for awaitTermination
2448              }
2449          }
2450 +
2451          return -1;
2452      }
2453  
# Line 2525 | Line 2512 | public class ForkJoinPool extends Abstra
2512                  tryInitialize(true);
2513              else if ((q = ws[k = (wl - 1) & r & SQMASK]) == null)
2514                  tryCreateExternalQueue(k);
2515 <            else if ((stat = q.sharedPush(task)) < 0)
2516 <                break;
2517 <            else if (stat == 0) {
2518 <                signalWork();
2515 >            else if ((stat = q.sharedPush(task)) <= 0) {
2516 >                if ((runState & STOP) != 0)
2517 >                    tryTerminate(false, false);
2518 >                else if (stat == 0)
2519 >                    signalWork();
2520                  break;
2521              }
2522              else                          // move if busy

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines