ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.261 by jsr166, Sat Aug 8 18:00:51 2015 UTC vs.
Revision 1.262 by dl, Sun Aug 9 14:30:12 2015 UTC

# Line 15 | Line 15 | import java.util.Arrays;
15   import java.util.Collection;
16   import java.util.Collections;
17   import java.util.List;
18 < import java.util.concurrent.atomic.AtomicLong;
18 > import java.util.concurrent.locks.ReentrantLock;
19   import java.util.concurrent.locks.LockSupport;
20  
21   /**
# Line 290 | Line 290 | public class ForkJoinPool extends Abstra
290       * subfields.
291       *
292       * Field "runState" holds state bits (STARTED, STOP, etc).  After
293 <     * starting, the field is updated under a lock (only during
294 <     * shutdown); opportunistically using the "stealCounter" object as
295 <     * a monitor. However stealCounter is itself lazily initialized,
296 <     * so tryInitialize bootstraps this using a private static lock.
293 >     * starting, the field is updated (only during shutdown) under the
294 >     * auxState lock.
295 >     *
296 >     * Field "auxState" is a ReentrantLock subclass that also
297 >     * opportunistically holds some other bookkeeping fields accessed
298 >     * only when locked.  It is mainly used to lock (infrequent)
299 >     * updates to runState and workQueues.  The auxState instance is
300 >     * itself lazily constructed (see tryInitialize), requiring a
301 >     * double-check-style boostrapping use of field runState, and
302 >     * locking a private static.
303       *
304       * Field "workQueues" holds references to WorkQueues.  It is
305 <     * updated only under the lock, but is otherwise concurrently
306 <     * readable, and accessed directly. We also ensure that reads of
307 <     * the array reference itself never become too stale (for example,
308 <     * re-reading before each scan). To simplify index-based
309 <     * operations, the array size is always a power of two, and all
310 <     * readers must tolerate null slots. Worker queues are at odd
311 <     * indices. Shared (submission) queues are at even indices, up to
312 <     * a maximum of 64 slots, to limit growth even if array needs to
313 <     * expand to add more workers. Grouping them together in this way
314 <     * simplifies and speeds up task scanning.
305 >     * updated (only during worker creation and termination) under the
306 >     * lock, but is otherwise concurrently readable, and accessed
307 >     * directly. We also ensure that reads of the array reference
308 >     * itself never become too stale (for example, re-reading before
309 >     * each scan). To simplify index-based operations, the array size
310 >     * is always a power of two, and all readers must tolerate null
311 >     * slots. Worker queues are at odd indices. Shared (submission)
312 >     * queues are at even indices, up to a maximum of 64 slots, to
313 >     * limit growth even if array needs to expand to add more
314 >     * workers. Grouping them together in this way simplifies and
315 >     * speeds up task scanning.
316       *
317       * All worker thread creation is on-demand, triggered by task
318       * submissions, replacement of terminated workers, and/or
# Line 698 | Line 705 | public class ForkJoinPool extends Abstra
705          public final boolean exec() { return true; }
706      }
707  
708 +    /**
709 +     * Additional fields and lock created upon initialization.
710 +     */
711 +    static final class AuxState extends ReentrantLock {
712 +        private static final long serialVersionUID = -6001602636862214147L;
713 +        volatile long stealCount;     // cumulative steal count
714 +        long indexSeed;               // index bits for registerWorker
715 +        AuxState() {}
716 +    }
717 +
718      // Constants shared across ForkJoinPool and WorkQueue
719  
720      // Bounds
# Line 868 | Line 885 | public class ForkJoinPool extends Abstra
885              if ((a = array) != null && b != s && (al = a.length) > 0) {
886                  int index = (al - 1) & --s;
887                  long offset = ((long)index << ASHIFT) + ABASE;
888 <                ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObject(a, offset);
889 <                if (t != null && U.compareAndSwapObject(a, offset, t, null)) {
888 >                ForkJoinTask<?> t = (ForkJoinTask<?>)
889 >                    U.getObject(a, offset);
890 >                if (t != null &&
891 >                    U.compareAndSwapObject(a, offset, t, null)) {
892                      top = s;
893                      U.storeFence();
894                      return t;
# Line 1018 | Line 1037 | public class ForkJoinPool extends Abstra
1037              if ((a = array) != null && (al = a.length) > 0) {
1038                  int index = (al - 1) & s;
1039                  long offset = ((long)index << ASHIFT) + ABASE;
1040 <                ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObject(a, offset);
1040 >                ForkJoinTask<?> t = (ForkJoinTask<?>)
1041 >                    U.getObject(a, offset);
1042                  if (t == task &&
1043                      U.compareAndSwapInt(this, QLOCK, 0, 1)) {
1044                      if (U.compareAndSwapObject(a, offset, task, null)) {
# Line 1119 | Line 1139 | public class ForkJoinPool extends Abstra
1139          }
1140  
1141          /**
1142 <         * Adds steal count to pool stealCounter if it exists, and resets.
1142 >         * Adds steal count to pool steal count if it exists, and resets.
1143           */
1144          final void transferStealCount(ForkJoinPool p) {
1145 <            AtomicLong sc;
1146 <            if (p != null && (sc = p.stealCounter) != null) {
1145 >            AuxState aux;
1146 >            if (p != null && (aux = p.auxState) != null) {
1147                  int s = nsteals;
1148                  nsteals = 0;            // if negative, correct for overflow
1149 <                sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s));
1149 >                long inc = (long)(s < 0 ? Integer.MAX_VALUE : s);
1150 >                aux.lock();
1151 >                try {
1152 >                    aux.stealCount += inc;
1153 >                } finally {
1154 >                    aux.unlock();
1155 >                }
1156              }
1157          }
1158  
# Line 1441 | Line 1467 | public class ForkJoinPool extends Abstra
1467      volatile long ctl;                   // main pool control
1468      volatile int runState;
1469      final int config;                    // parallelism, mode
1470 <    int indexSeed;                       // to generate worker index
1445 <    volatile AtomicLong stealCounter;    // also used as sync monitor
1470 >    AuxState auxState;                   // lock, steal counts
1471      volatile WorkQueue[] workQueues;     // main registry
1472      final String workerNamePrefix;       // to create worker name string
1473      final ForkJoinWorkerThreadFactory factory;
# Line 1460 | Line 1485 | public class ForkJoinPool extends Abstra
1485              int n = (p > 1) ? p - 1 : 1;
1486              n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
1487              n |= n >>> 8; n |= n >>> 16; n = ((n + 1) << 1) & SMASK;
1488 <            AtomicLong sc = new AtomicLong();
1488 >            AuxState aux = new AuxState();
1489              WorkQueue[] ws = new WorkQueue[n];
1490              synchronized (modifyThreadPermission) { // double-check
1491                  if (((rs = runState) & STARTED) == 0) {
1492 +                    rs |= STARTED;
1493                      workQueues = ws;
1494 <                    runState = rs | STARTED;
1495 <                    stealCounter = sc;
1494 >                    auxState = aux;
1495 >                    runState = rs;
1496                  }
1497              }
1498          }
# Line 1533 | Line 1559 | public class ForkJoinPool extends Abstra
1559       */
1560      final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1561          UncaughtExceptionHandler handler;
1562 <        AtomicLong lock = stealCounter;
1562 >        AuxState aux;
1563          wt.setDaemon(true);                           // configure thread
1564          if ((handler = ueh) != null)
1565              wt.setUncaughtExceptionHandler(handler);
1566          WorkQueue w = new WorkQueue(this, wt);
1567          int i = 0;                                    // assign a pool index
1568          int mode = config & MODE_MASK;
1569 <        if (lock != null) {
1570 <            synchronized (lock) {
1569 >        int rs = runState;
1570 >        if ((aux = auxState) != null) {
1571 >            aux.lock();
1572 >            try {
1573 >                int s = (int)(aux.indexSeed += SEED_INCREMENT), n, m;
1574                  WorkQueue[] ws = workQueues;
1546                int s = indexSeed += SEED_INCREMENT, n, m;
1575                  if (ws != null && (n = ws.length) > 0) {
1576                      i = (m = n - 1) & ((s << 1) | 1); // odd-numbered indices
1577                      if (ws[i] != null) {              // collision
# Line 1562 | Line 1590 | public class ForkJoinPool extends Abstra
1590                      w.scanState = i | (s & 0x7fff0000); // random seq bits
1591                      ws[i] = w;
1592                  }
1593 +            } finally {
1594 +                aux.unlock();
1595              }
1596          }
1597          wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
# Line 1579 | Line 1609 | public class ForkJoinPool extends Abstra
1609       */
1610      final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1611          WorkQueue w = null;
1612 +        int rs = runState;
1613          if (wt != null && (w = wt.workQueue) != null) {
1614 <            AtomicLong lock; WorkQueue[] ws;          // remove index from array
1614 >            AuxState aux; WorkQueue[] ws;          // remove index from array
1615              int idx = w.config & SMASK;
1616 <            if ((lock = stealCounter) != null) {
1617 <                synchronized (lock) {
1616 >            int ns = w.nsteals;
1617 >            if ((aux = auxState) != null) {
1618 >                aux.lock();
1619 >                try {
1620                      if ((ws = workQueues) != null && ws.length > idx &&
1621                          ws[idx] == w)
1622                          ws[idx] = null;
1623 +                    aux.stealCount += ns;
1624 +                } finally {
1625 +                    aux.unlock();
1626                  }
1627              }
1628          }
# Line 1599 | Line 1635 | public class ForkJoinPool extends Abstra
1635          }
1636          if (w != null) {
1637              w.qlock = -1;                             // ensure set
1602            w.transferStealCount(this);
1638              w.cancelAll();                            // cancel remaining tasks
1639          }
1640          for (;;) {                                    // possibly replace
# Line 1700 | Line 1735 | public class ForkJoinPool extends Abstra
1735              (v = ws[m & sp]) != null) {
1736              long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + AC_UNIT));
1737              int ns = sp & ~UNSIGNALLED;
1738 <            if ((w == v || w.scanState < 0) &&
1738 >            if (w.scanState < 0 &&
1739                  v.scanState == sp &&
1740                  U.compareAndSwapLong(this, CTL, c, nc)) {
1741                  v.scanState = ns;
# Line 1775 | Line 1810 | public class ForkJoinPool extends Abstra
1810          long deadline = (((scale <= 0) ? 1 : scale) * IDLE_TIMEOUT_MS +
1811                           System.currentTimeMillis());
1812          if (w != null && w.scanState < 0) {
1813 <            int ss; AtomicLong lock;
1813 >            int ss; AuxState aux;
1814              if (runState < 0 && tryTerminate(false, false))
1815                  stat = w.qlock = -1;               // help terminate
1816              else if ((stat = w.qlock) >= 0 && w.scanState < 0) {
# Line 1785 | Line 1820 | public class ForkJoinPool extends Abstra
1820                  w.parker = null;
1821                  if ((stat = w.qlock) >= 0 && (ss = w.scanState) < 0 &&
1822                      !Thread.interrupted() && (int)c == ss &&
1823 <                    (lock = stealCounter) != null && ctl == c &&
1823 >                    (aux = auxState) != null && ctl == c &&
1824                      deadline - System.currentTimeMillis() <= TIMEOUT_SLOP_MS) {
1825 <                    synchronized (lock) {          // pre-deregister
1825 >                    aux.lock();
1826 >                    try {          // pre-deregister
1827                          WorkQueue[] ws;
1828                          int cfg = w.config, idx = cfg & SMASK;
1829                          long nc = ((UC_MASK & (c - TC_UNIT)) |
# Line 1799 | Line 1835 | public class ForkJoinPool extends Abstra
1835                              w.config = cfg | UNREGISTERED;
1836                              stat = w.qlock = -1;
1837                          }
1838 +                    } finally {
1839 +                        aux.unlock();
1840                      }
1841                  }
1842              }
# Line 1911 | Line 1949 | public class ForkJoinPool extends Abstra
1949                      long offset = ((long)index << ASHIFT) + ABASE;
1950                      ForkJoinTask<?> t = (ForkJoinTask<?>)
1951                          U.getObjectVolatile(a, offset);
1952 <                    if (t == null || b++ != q.base)
1953 <                        break;                     // busy or empty
1952 >                    if (t == null)
1953 >                        break;                     // empty or busy
1954 >                    else if (b++ != q.base)
1955 >                        break;                     // busy
1956                      else if (ss < 0) {
1957                          tryReactivate(w, ws, r);
1958                          break;                     // retry upon rescan
# Line 2057 | Line 2097 | public class ForkJoinPool extends Abstra
2097                          if ((a = v.array) != null && (al = a.length) > 0) {
2098                              int index = (al - 1) & b;
2099                              long offset = ((long)index << ASHIFT) + ABASE;
2100 <                            t = (ForkJoinTask<?>)U.getObjectVolatile(a, offset);
2100 >                            t = (ForkJoinTask<?>)
2101 >                                U.getObjectVolatile(a, offset);
2102                              if (t != null && b++ == v.base) {
2103                                  if (j.currentJoin != subtask ||
2104                                      v.currentSteal != subtask ||
# Line 2343 | Line 2384 | public class ForkJoinPool extends Abstra
2384       * @return true if now terminating or terminated
2385       */
2386      private boolean tryTerminate(boolean now, boolean enable) {
2387 <        AtomicLong lock; int rs;
2387 >        AuxState aux; int rs;
2388          if ((rs = runState) >= 0 && (!enable || this == common))
2389              return false;
2390 <        while ((lock = stealCounter) == null)
2390 >        while ((aux = auxState) == null)
2391              tryInitialize(false);
2392 <        synchronized (lock) {
2393 <            rs = runState = runState | SHUTDOWN;
2394 <        }
2354 <
2392 >        aux.lock();
2393 >        rs = runState = runState | SHUTDOWN;
2394 >        aux.unlock();
2395          if ((rs & STOP) == 0) {
2396              if (!now) {                           // check quiescence
2397                  for (long oldSum = 0L;;) {        // repeat until stable
# Line 2379 | Line 2419 | public class ForkJoinPool extends Abstra
2419                          break;
2420                  }
2421              }
2422 <            synchronized (lock) {
2423 <                rs = runState = runState | STOP;
2424 <            }
2422 >            aux.lock();
2423 >            rs = runState = runState | STOP;
2424 >            aux.unlock();
2425          }
2426  
2427          int pass = 0;                             // 3 passes to help terminate
# Line 2390 | Line 2430 | public class ForkJoinPool extends Abstra
2430              long checkSum = ctl;
2431              if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
2432                  (ws = workQueues) == null || (m = ws.length - 1) < 0) {
2433 <                synchronized (lock) {
2434 <                    rs = runState = runState | TERMINATED;
2435 <                }
2433 >                aux.lock();
2434 >                runState |= TERMINATED;
2435 >                aux.unlock();
2436                  synchronized (this) {
2437                      notifyAll();                  // for awaitTermination
2438                  }
# Line 2441 | Line 2481 | public class ForkJoinPool extends Abstra
2481       * @param index the index of the new queue
2482       */
2483      private void tryCreateExternalQueue(int index) {
2484 <        AtomicLong lock;
2485 <        if ((lock = stealCounter) != null && index >= 0) {
2484 >        AuxState aux;
2485 >        if ((aux = auxState) != null && index >= 0) {
2486              WorkQueue q = new WorkQueue(this, null);
2487              q.config = index;
2488              q.scanState = ~UNSIGNALLED;
2489              q.qlock = 1;                   // lock queue
2490              boolean installed = false;
2491 <            synchronized (lock) {          // lock pool to install
2491 >            aux.lock();
2492 >            try {          // lock pool to install
2493                  WorkQueue[] ws;
2494                  if ((ws = workQueues) != null && index < ws.length &&
2495                      ws[index] == null) {
2496                      ws[index] = q;         // else throw away
2497                      installed = true;
2498                  }
2499 +            } finally {
2500 +                aux.unlock();
2501              }
2502              if (installed) {
2503                  try {
# Line 2923 | Line 2966 | public class ForkJoinPool extends Abstra
2966       * @return the number of steals
2967       */
2968      public long getStealCount() {
2969 <        AtomicLong sc = stealCounter;
2970 <        long count = (sc == null) ? 0L : sc.get();
2969 >        AuxState sc = auxState;
2970 >        long count = (sc == null) ? 0L : sc.stealCount;
2971          WorkQueue[] ws; WorkQueue w;
2972          if ((ws = workQueues) != null) {
2973              for (int i = 1; i < ws.length; i += 2) {
# Line 3055 | Line 3098 | public class ForkJoinPool extends Abstra
3098      public String toString() {
3099          // Use a single pass through workQueues to collect counts
3100          long qt = 0L, qs = 0L; int rc = 0;
3101 <        AtomicLong sc = stealCounter;
3102 <        long st = (sc == null) ? 0L : sc.get();
3101 >        AuxState sc = auxState;
3102 >        long st = (sc == null) ? 0L : sc.stealCount;
3103          long c = ctl;
3104          WorkQueue[] ws; WorkQueue w;
3105          if ((ws = workQueues) != null) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines