ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.184 by jsr166, Mon May 20 16:16:42 2013 UTC vs.
Revision 1.185 by dl, Wed May 22 13:39:40 2013 UTC

# Line 635 | Line 635 | public class ForkJoinPool extends Abstra
635  
636          volatile int eventCount;   // encoded inactivation count; < 0 if inactive
637          int nextWait;              // encoded record of next event waiter
638        int hint;                  // stability hash or steal index hint
638          int nsteals;               // number of steals
639 <        int poolIndex;             // index of this queue in pool
640 <        final int mode;            // 0: lifo, > 0: fifo, < 0: shared
639 >        int hint;                  // steal index hint
640 >        short poolIndex;           // index of this queue in pool
641 >        final short mode;          // 0: lifo, > 0: fifo, < 0: shared
642          volatile int qlock;        // 1: locked, -1: terminate; else 0
643          volatile int base;         // index of next slot for poll
644          int top;                   // index of next slot for push
# Line 653 | Line 653 | public class ForkJoinPool extends Abstra
653                    int seed) {
654              this.pool = pool;
655              this.owner = owner;
656 <            this.mode = mode;
656 >            this.mode = (short)mode;
657              this.hint = seed; // store initial seed for runWorker
658              // Place indices in the center of array (that is not yet allocated)
659              base = top = INITIAL_QUEUE_CAPACITY >>> 1;
# Line 1071 | Line 1071 | public class ForkJoinPool extends Abstra
1071      /**
1072       * Common pool parallelism. To allow simpler use and management
1073       * when common pool threads are disabled, we allow the underlying
1074 <     * common.config field to be zero, but in that case still report
1074 >     * common.parallelism field to be zero, but in that case still report
1075       * parallelism as 1 to reflect resulting caller-runs mechanics.
1076       */
1077      static final int commonParallelism;
# Line 1216 | Line 1216 | public class ForkJoinPool extends Abstra
1216      volatile long ctl;                         // main pool control
1217      volatile int plock;                        // shutdown status and seqLock
1218      volatile int indexSeed;                    // worker/submitter index seed
1219 <    final int config;                          // mode and parallelism level
1219 >    final short parallelism;                   // parallelism level
1220 >    final short mode;                          // LIFO/FIFO
1221      WorkQueue[] workQueues;                    // main registry
1222      final ForkJoinWorkerThreadFactory factory;
1223      final UncaughtExceptionHandler ueh;        // per-worker UEH
# Line 1317 | Line 1318 | public class ForkJoinPool extends Abstra
1318          do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
1319                                            s += SEED_INCREMENT) ||
1320                       s == 0); // skip 0
1321 <        WorkQueue w = new WorkQueue(this, wt, config >>> 16, s);
1321 >        WorkQueue w = new WorkQueue(this, wt, mode, s);
1322          if (((ps = plock) & PL_LOCK) != 0 ||
1323              !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1324              ps = acquirePlock();
# Line 1337 | Line 1338 | public class ForkJoinPool extends Abstra
1338                          }
1339                      }
1340                  }
1341 <                w.eventCount = w.poolIndex = r; // volatile write orders
1341 >                w.poolIndex = (short)r;
1342 >                w.eventCount = r; // volatile write orders
1343                  ws[r] = w;
1344              }
1345          } finally {
1346              if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1347                  releasePlock(nps);
1348          }
1349 <        wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex)));
1349 >        wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1)));
1350          return w;
1351      }
1352  
# Line 1480 | Line 1482 | public class ForkJoinPool extends Abstra
1482                  throw new RejectedExecutionException();
1483              else if (ps == 0 || (ws = workQueues) == null ||
1484                       (m = ws.length - 1) < 0) { // initialize workQueues
1485 <                int p = config & SMASK;         // find power of two table size
1485 >                int p = parallelism;            // find power of two table size
1486                  int n = (p > 1) ? p - 1 : 1;    // ensure at least 2 slots
1487                  n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
1488                  n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
# Line 1519 | Line 1521 | public class ForkJoinPool extends Abstra
1521                  move = true; // move on failure
1522              }
1523              else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
1524 <                (q = new WorkQueue(this, null, SHARED_QUEUE, r)).poolIndex = k;
1524 >                q = new WorkQueue(this, null, SHARED_QUEUE, r);
1525 >                q.poolIndex = (short)k;
1526                  if (((ps = plock) & PL_LOCK) != 0 ||
1527                      !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1528                      ps = acquirePlock();
# Line 1542 | Line 1545 | public class ForkJoinPool extends Abstra
1545       * Increments active count; mainly called upon return from blocking.
1546       */
1547      final void incrementActiveCount() {
1548 <        U.getAndAddLong(this, CTL, AC_UNIT);
1548 >        long c;
1549 >        do {} while(!U.compareAndSwapLong
1550 >                    (this, CTL, c = ctl, ((c & ~AC_MASK) |
1551 >                                          ((c & AC_MASK) + AC_UNIT))));
1552      }
1553  
1554      /**
# Line 1673 | Line 1679 | public class ForkJoinPool extends Abstra
1679              !Thread.interrupted()) {
1680              int e = (int)c;
1681              int u = (int)(c >>> 32);
1682 <            int d = (u >> UAC_SHIFT) + (config & SMASK); // 0 if quiescent
1682 >            int d = (u >> UAC_SHIFT) + parallelism; // active count
1683  
1684 <            if (e < 0 || (d == 0 && tryTerminate(false, false)))
1684 >            if (e < 0 || (d <= 0 && tryTerminate(false, false)))
1685                  stat = w.qlock = -1;          // pool is terminating
1686              else if ((ns = w.nsteals) != 0) { // collect steals and retry
1687                  w.nsteals = 0;
1688                  U.getAndAddLong(this, STEALCOUNT, (long)ns);
1689              }
1690              else {
1691 <                long pc = ((d != 0 || ec != (e | INT_SIGN)) ? 0L :
1691 >                long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L :
1692                             ((long)(w.nextWait & E_MASK)) | // ctl to restore
1693                             ((long)(u + UAC_UNIT)) << 32);
1694                  if (pc != 0L) {               // timed wait if last waiter
# Line 1873 | Line 1879 | public class ForkJoinPool extends Abstra
1879       */
1880      final boolean tryCompensate(long c) {
1881          WorkQueue[] ws = workQueues;
1882 <        int pc = config & SMASK, e = (int)c, m, tc;
1882 >        int pc = parallelism, e = (int)c, m, tc;
1883          if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) {
1884              WorkQueue w = ws[e & m];
1885              if (e != 0 && w != null) {
# Line 1952 | Line 1958 | public class ForkJoinPool extends Abstra
1958                                      task.notifyAll();
1959                              }
1960                          }
1961 <                        U.getAndAddLong(this, CTL, AC_UNIT); // reactivate
1961 >                        long c; // reactivate
1962 >                        do {} while(!U.compareAndSwapLong
1963 >                                    (this, CTL, c = ctl,
1964 >                                     ((c & ~AC_MASK) |
1965 >                                      ((c & AC_MASK) + AC_UNIT))));
1966                      }
1967                  }
1968              }
# Line 2022 | Line 2032 | public class ForkJoinPool extends Abstra
2032              if ((q = findNonEmptyStealQueue()) != null) {
2033                  if (!active) {      // re-establish active count
2034                      active = true;
2035 <                    U.getAndAddLong(this, CTL, AC_UNIT);
2035 >                    do {} while(!U.compareAndSwapLong
2036 >                                (this, CTL, c = ctl,
2037 >                                 ((c & ~AC_MASK) |
2038 >                                  ((c & AC_MASK) + AC_UNIT))));
2039                  }
2040                  if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2041                      (w.currentSteal = t).doExec();
# Line 2030 | Line 2043 | public class ForkJoinPool extends Abstra
2043                  }
2044              }
2045              else if (active) {       // decrement active count without queuing
2046 <                long nc = (c = ctl) - AC_UNIT;
2047 <                if ((int)(nc >> AC_SHIFT) + (config & SMASK) == 0)
2048 <                    return;          // bypass decrement-then-increment
2046 >                long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT);
2047 >                if ((int)(nc >> AC_SHIFT) + parallelism == 0)
2048 >                    break;          // bypass decrement-then-increment
2049                  if (U.compareAndSwapLong(this, CTL, c, nc))
2050                      active = false;
2051              }
2052 <            else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) == 0 &&
2053 <                     U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
2054 <                return;
2052 >            else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 &&
2053 >                     U.compareAndSwapLong
2054 >                     (this, CTL, c, ((c & ~AC_MASK) |
2055 >                                     ((c & AC_MASK) + AC_UNIT))))
2056 >                break;
2057          }
2058      }
2059  
# Line 2108 | Line 2123 | public class ForkJoinPool extends Abstra
2123      static int getSurplusQueuedTaskCount() {
2124          Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2125          if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
2126 <            int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK;
2126 >            int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).parallelism;
2127              int n = (q = wt.workQueue).top - q.base;
2128              int a = (int)(pool.ctl >> AC_SHIFT) + p;
2129              return n - (a > (p >>>= 1) ? 0 :
# Line 2152 | Line 2167 | public class ForkJoinPool extends Abstra
2167          }
2168          for (long c;;) {
2169              if (((c = ctl) & STOP_BIT) != 0) {     // already terminating
2170 <                if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
2170 >                if ((short)(c >>> TC_SHIFT) + parallelism <= 0) {
2171                      synchronized (this) {
2172                          notifyAll();               // signal when 0 workers
2173                      }
# Line 2161 | Line 2176 | public class ForkJoinPool extends Abstra
2176              }
2177              if (!now) {                            // check if idle & no tasks
2178                  WorkQueue[] ws; WorkQueue w;
2179 <                if ((int)(c >> AC_SHIFT) != -(config & SMASK))
2179 >                if ((int)(c >> AC_SHIFT) + parallelism > 0)
2180                      return false;
2181                  if ((ws = workQueues) != null) {
2182                      for (int i = 0; i < ws.length; ++i) {
2183 <                        if ((w = ws[i]) != null) {
2184 <                            if (!w.isEmpty())
2185 <                                return false;
2186 <                            if ((i & 1) != 0 && w.eventCount >= 0)
2187 <                                return false;      // unqueued inactive worker
2183 >                        if ((w = ws[i]) != null &&
2184 >                            (!w.isEmpty() ||
2185 >                             ((i & 1) != 0 && w.eventCount >= 0))) {
2186 >                            signalWork(ws, w);
2187 >                            return false;
2188                          }
2189                      }
2190                  }
# Line 2358 | Line 2373 | public class ForkJoinPool extends Abstra
2373          this(checkParallelism(parallelism),
2374               checkFactory(factory),
2375               handler,
2376 <             asyncMode,
2376 >             (asyncMode ? FIFO_QUEUE : LIFO_QUEUE),
2377               "ForkJoinPool-" + nextPoolId() + "-worker-");
2378          checkPermission();
2379      }
# Line 2384 | Line 2399 | public class ForkJoinPool extends Abstra
2399      private ForkJoinPool(int parallelism,
2400                           ForkJoinWorkerThreadFactory factory,
2401                           UncaughtExceptionHandler handler,
2402 <                         boolean asyncMode,
2402 >                         int mode,
2403                           String workerNamePrefix) {
2404          this.workerNamePrefix = workerNamePrefix;
2405          this.factory = factory;
2406          this.ueh = handler;
2407 <        this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
2407 >        this.mode = (short)mode;
2408 >        this.parallelism = (short)parallelism;
2409          long np = (long)(-parallelism); // offset ctl counts
2410          this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2411      }
# Line 2577 | Line 2593 | public class ForkJoinPool extends Abstra
2593       * @return the targeted parallelism level of this pool
2594       */
2595      public int getParallelism() {
2596 <        int par = (config & SMASK);
2597 <        return (par > 0) ? par : 1;
2596 >        int par;
2597 >        return ((par = parallelism) > 0) ? par : 1;
2598      }
2599  
2600      /**
# Line 2600 | Line 2616 | public class ForkJoinPool extends Abstra
2616       * @return the number of worker threads
2617       */
2618      public int getPoolSize() {
2619 <        return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2619 >        return parallelism + (short)(ctl >>> TC_SHIFT);
2620      }
2621  
2622      /**
# Line 2610 | Line 2626 | public class ForkJoinPool extends Abstra
2626       * @return {@code true} if this pool uses async mode
2627       */
2628      public boolean getAsyncMode() {
2629 <        return (config >>> 16) == FIFO_QUEUE;
2629 >        return mode == FIFO_QUEUE;
2630      }
2631  
2632      /**
# Line 2641 | Line 2657 | public class ForkJoinPool extends Abstra
2657       * @return the number of active threads
2658       */
2659      public int getActiveThreadCount() {
2660 <        int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
2660 >        int r = parallelism + (int)(ctl >> AC_SHIFT);
2661          return (r <= 0) ? 0 : r; // suppress momentarily negative values
2662      }
2663  
# Line 2657 | Line 2673 | public class ForkJoinPool extends Abstra
2673       * @return {@code true} if all threads are currently idle
2674       */
2675      public boolean isQuiescent() {
2676 <        return (int)(ctl >> AC_SHIFT) + (config & SMASK) == 0;
2676 >        return parallelism + (int)(ctl >> AC_SHIFT) <= 0;
2677      }
2678  
2679      /**
# Line 2820 | Line 2836 | public class ForkJoinPool extends Abstra
2836                  }
2837              }
2838          }
2839 <        int pc = (config & SMASK);
2839 >        int pc = parallelism;
2840          int tc = pc + (short)(c >>> TC_SHIFT);
2841          int ac = pc + (int)(c >> AC_SHIFT);
2842          if (ac < 0) // ignore transient negative
# Line 2893 | Line 2909 | public class ForkJoinPool extends Abstra
2909      public boolean isTerminated() {
2910          long c = ctl;
2911          return ((c & STOP_BIT) != 0L &&
2912 <                (short)(c >>> TC_SHIFT) == -(config & SMASK));
2912 >                (short)(c >>> TC_SHIFT) + parallelism <= 0);
2913      }
2914  
2915      /**
# Line 2912 | Line 2928 | public class ForkJoinPool extends Abstra
2928      public boolean isTerminating() {
2929          long c = ctl;
2930          return ((c & STOP_BIT) != 0L &&
2931 <                (short)(c >>> TC_SHIFT) != -(config & SMASK));
2931 >                (short)(c >>> TC_SHIFT) + parallelism > 0);
2932      }
2933  
2934      /**
# Line 3197 | Line 3213 | public class ForkJoinPool extends Abstra
3213          common = java.security.AccessController.doPrivileged
3214              (new java.security.PrivilegedAction<ForkJoinPool>() {
3215                  public ForkJoinPool run() { return makeCommonPool(); }});
3216 <        int par = common.config; // report 1 even if threads disabled
3216 >        int par = common.parallelism; // report 1 even if threads disabled
3217          commonParallelism = par > 0 ? par : 1;
3218      }
3219  
# Line 3233 | Line 3249 | public class ForkJoinPool extends Abstra
3249              parallelism = 0;
3250          if (parallelism > MAX_CAP)
3251              parallelism = MAX_CAP;
3252 <        return new ForkJoinPool(parallelism, factory, handler, false,
3252 >        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
3253                                  "ForkJoinPool.commonPool-worker-");
3254      }
3255  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines