ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.214 by jsr166, Sun Jul 13 18:42:13 2014 UTC vs.
Revision 1.215 by dl, Sun Jul 13 22:43:54 2014 UTC

# Line 20 | Line 20 | import java.util.concurrent.RejectedExec
20   import java.util.concurrent.RunnableFuture;
21   import java.util.concurrent.ThreadLocalRandom;
22   import java.util.concurrent.TimeUnit;
23 + import java.util.concurrent.atomic.AtomicLong;
24   import java.security.AccessControlContext;
25   import java.security.ProtectionDomain;
26   import java.security.Permissions;
# Line 279 | Line 280 | public class ForkJoinPool extends Abstra
280       * often maintaining atomicity without blocking or locking.
281       * Nearly all essentially atomic control state is held in two
282       * volatile variables that are by far most often read (not
283 <     * written) as status and consistency checks.
283 >     * written) as status and consistency checks. (Also, field
284 >     * "config" holds unchanging configuration state.)
285       *
286       * Field "ctl" contains 64 bits holding information needed to
287       * atomically decide to add, inactivate, enqueue (on an event
# Line 287 | Line 289 | public class ForkJoinPool extends Abstra
289       * packing, we restrict maximum parallelism to (1<<15)-1 (which is
290       * far in excess of normal operating range) to allow ids, counts,
291       * and their negations (used for thresholding) to fit into 16bit
292 <     * subfields.  Field "runState" holds lockable state bits
293 <     * (STARTED, STOP, etc) also protecting updates to the workQueues
294 <     * array.  When used as a lock, it is normally held only for a few
295 <     * instructions (the only exceptions are one-time array
296 <     * initialization and uncommon resizing), so is nearly always
297 <     * available after at most a brief spin. But to be extra-cautious,
298 <     * we use a monitor-based backup strategy to block when needed
299 <     * (see awaitRunStateLock).  Usages of "runState" vs "ctl"
300 <     * interact in only one case: deciding to add a worker thread (see
301 <     * tryAddWorker), in which case the ctl CAS is performed while the
302 <     * lock is held.  Field "config" holds unchanging configuration
303 <     * state.
292 >     * subfields.
293 >     *
294 >     * Field "runState" holds lockable state bits (STARTED, STOP, etc)
295 >     * also protecting updates to the workQueues array.  When used as
296 >     * a lock, it is normally held only for a few instructions (the
297 >     * only exceptions are one-time array initialization and uncommon
298 >     * resizing), so is nearly always available after at most a brief
299 >     * spin. But to be extra-cautious, after spinning, method
300 >     * awaitRunStateLock (called only if an initial CAS fails), uses a
301 >     * wait/notify mechanics on a builtin monitor to block when
302 >     * (rarely) needed. This would be a terrible idea for a highly
303 >     * contended lock, but most pools run without the lock ever
304 >     * contending after the spin limit, so this works fine as a more
305 >     * conservative alternative. Because we don't otherwise have an
306 >     * internal Object to use as a monitor, the "stealCounter" (an
307 >     * AtomicLong) is used when available (it too must be lazily
308 >     * initialized; see externalSubmit).
309 >
310 >     * Usages of "runState" vs "ctl" interact in only one case:
311 >     * deciding to add a worker thread (see tryAddWorker), in which
312 >     * case the ctl CAS is performed while the lock is held.
313       *
314       * Recording WorkQueues.  WorkQueues are recorded in the
315       * "workQueues" array. The array is created upon first use (see
# Line 1020 | Line 1031 | public class ForkJoinPool extends Abstra
1031                  U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
1032                  execLocalTasks();
1033                  ForkJoinWorkerThread thread = owner;
1034 <                ++nsteals;
1034 >                if (++nsteals < 0)      // collect on overflow
1035 >                    transferStealCount(pool);
1036                  scanState |= SCANNING;
1037                  if (thread != null)
1038                      thread.afterTopLevelExec();
# Line 1028 | Line 1040 | public class ForkJoinPool extends Abstra
1040          }
1041  
1042          /**
1043 +         * Adds steal count to pool stealCounter if it exists, and resets.
1044 +         */
1045 +        final void transferStealCount(ForkJoinPool p) {
1046 +            AtomicLong sc;
1047 +            if (p != null && (sc = p.stealCounter) != null) {
1048 +                int s = nsteals;
1049 +                nsteals = 0;            // if negative, correct for overflow
1050 +                sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s));
1051 +            }
1052 +        }
1053 +
1054 +        /**
1055           * If present, removes from queue and executes the given task,
1056           * or any other cancelled task. Used only by awaitJoin.
1057           *
# Line 1342 | Line 1366 | public class ForkJoinPool extends Abstra
1366      private static final int  SHUTDOWN   = 1 << 31;
1367  
1368      // Instance fields
1345    volatile long stealCount;            // collects worker counts
1369      volatile long ctl;                   // main pool control
1370      volatile int runState;               // lockable status
1371      final int config;                    // parallelism, mode
# Line 1351 | Line 1374 | public class ForkJoinPool extends Abstra
1374      final ForkJoinWorkerThreadFactory factory;
1375      final UncaughtExceptionHandler ueh;  // per-worker UEH
1376      final String workerNamePrefix;       // to create worker name string
1377 +    volatile AtomicLong stealCounter;    // also used as sync monitor
1378  
1379      /**
1380       * Acquires the runState lock; returns current (locked) runState.
# Line 1363 | Line 1387 | public class ForkJoinPool extends Abstra
1387      }
1388  
1389      /**
1390 <     * Spins and/or blocks until runstate lock is available.  This
1391 <     * method is called only if an initial CAS fails. This acts as a
1368 <     * spinlock for normal cases, but falls back to builtin monitor to
1369 <     * block when (rarely) needed. This would be a terrible idea for a
1370 <     * highly contended lock, but most pools run without the lock ever
1371 <     * contending after the spin limit, so this works fine as a more
1372 <     * conservative alternative to a pure spinlock.
1390 >     * Spins and/or blocks until runstate lock is available.  See
1391 >     * above for explanation.
1392       */
1393      private int awaitRunStateLock() {
1394 +        Object lock;
1395          boolean wasInterrupted = false;
1396          for (int spins = SPINS, r = 0, rs, ns;;) {
1397              if (((rs = runState) & RSLOCK) == 0) {
# Line 1392 | Line 1412 | public class ForkJoinPool extends Abstra
1412                  if (r >= 0)
1413                      --spins;
1414              }
1415 +            else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
1416 +                Thread.yield();   // initialization race
1417              else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
1418 <                synchronized (this) {
1418 >                synchronized (lock) {
1419                      if ((runState & RSIGNAL) != 0) {
1420                          try {
1421 <                            wait();
1421 >                            lock.wait();
1422                          } catch (InterruptedException ie) {
1423                              if (!(Thread.currentThread() instanceof
1424                                    ForkJoinWorkerThread))
# Line 1404 | Line 1426 | public class ForkJoinPool extends Abstra
1426                          }
1427                      }
1428                      else
1429 <                        notifyAll();
1429 >                        lock.notifyAll();
1430                  }
1431              }
1432          }
# Line 1418 | Line 1440 | public class ForkJoinPool extends Abstra
1440       */
1441      private void unlockRunState(int oldRunState, int newRunState) {
1442          if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
1443 +            Object lock = stealCounter;
1444              runState = newRunState;              // clears RSIGNAL bit
1445 <            synchronized (this) { notifyAll(); }
1445 >            if (lock != null)
1446 >                synchronized (lock) { lock.notifyAll(); }
1447          }
1448      }
1449  
# Line 1547 | Line 1571 | public class ForkJoinPool extends Abstra
1571                                             (SP_MASK & c))));
1572          if (w != null) {
1573              w.qlock = -1;                             // ensure set
1574 +            w.transferStealCount(this);
1575              w.cancelAll();                            // cancel remaining tasks
1551            U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals
1576          }
1577          for (;;) {                                    // possibly replace
1578              WorkQueue[] ws; int m, sp;
# Line 1746 | Line 1770 | public class ForkJoinPool extends Abstra
1770              else if (spins > 0) {
1771                  r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
1772                  if (r >= 0 && --spins == 0) {         // randomize spins
1773 <                    WorkQueue v; WorkQueue[] ws; int s, j;
1773 >                    WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
1774                      if (pred != 0 && (ws = workQueues) != null &&
1775                          (j = pred & SMASK) < ws.length &&
1776                          (v = ws[j]) != null &&        // see if pred parking
1777                          (v.parker == null || v.scanState >= 0))
1778                          spins = SPINS;                // continue spinning
1755                    else if ((s = w.nsteals) != 0) {
1756                        w.nsteals = 0;                // collect steals
1757                        U.getAndAddLong(this, STEALCOUNT, s);
1758                    }
1779                  }
1780              }
1781              else if (w.qlock < 0)                     // recheck after spins
# Line 2070 | Line 2090 | public class ForkJoinPool extends Abstra
2090                  if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2091                      U.putOrderedObject(w, QCURRENTSTEAL, t);
2092                      t.doExec();
2093 <                    ++w.nsteals;
2093 >                    if (++w.nsteals < 0)
2094 >                        w.transferStealCount(this);
2095                  }
2096              }
2097              else if (active) {      // decrement active count without queuing
# Line 2285 | Line 2306 | public class ForkJoinPool extends Abstra
2306                  tryTerminate(false, false);     // help terminate
2307                  throw new RejectedExecutionException();
2308              }
2309 <            else if ((rs & STARTED) == 0 ||     // initialize workQueues array
2309 >            else if ((rs & STARTED) == 0 ||     // initialize
2310                       ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
2311                  int ns = 0;
2312                  rs = lockRunState();
2313                  try {
2314 <                    if ((rs & STARTED) == 0) {  // find power of two table size
2314 >                    if ((rs & STARTED) == 0) {
2315 >                        U.compareAndSwapObject(this, STEALCOUNTER, null,
2316 >                                               new AtomicLong());
2317 >                        // create workQueues array with size a power of two
2318                          int p = config & SMASK; // ensure at least 2 slots
2319                          int n = (p > 1) ? p - 1 : 1;
2320                          n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
# Line 2807 | Line 2831 | public class ForkJoinPool extends Abstra
2831       * @return the number of steals
2832       */
2833      public long getStealCount() {
2834 <        long count = stealCount;
2834 >        AtomicLong sc = stealCounter;
2835 >        long count = (sc == null) ? 0L : sc.get();
2836          WorkQueue[] ws; WorkQueue w;
2837          if ((ws = workQueues) != null) {
2838              for (int i = 1; i < ws.length; i += 2) {
# Line 2937 | Line 2962 | public class ForkJoinPool extends Abstra
2962      public String toString() {
2963          // Use a single pass through workQueues to collect counts
2964          long qt = 0L, qs = 0L; int rc = 0;
2965 <        long st = stealCount;
2965 >        AtomicLong sc = stealCounter;
2966 >        long st = (sc == null) ? 0L : sc.get();
2967          long c = ctl;
2968          WorkQueue[] ws; WorkQueue w;
2969          if ((ws = workQueues) != null) {
# Line 3288 | Line 3314 | public class ForkJoinPool extends Abstra
3314      private static final int  ASHIFT;
3315      private static final long CTL;
3316      private static final long RUNSTATE;
3317 <    private static final long STEALCOUNT;
3317 >    private static final long STEALCOUNTER;
3318      private static final long PARKBLOCKER;
3319      private static final long QBASE;      // these must be same as in WorkQueue
3320      private static final long QTOP;
# Line 3307 | Line 3333 | public class ForkJoinPool extends Abstra
3333                  (k.getDeclaredField("ctl"));
3334              RUNSTATE = U.objectFieldOffset
3335                  (k.getDeclaredField("runState"));
3336 <            STEALCOUNT = U.objectFieldOffset
3337 <                (k.getDeclaredField("stealCount"));
3336 >            STEALCOUNTER = U.objectFieldOffset
3337 >                (k.getDeclaredField("stealCounter"));
3338              Class<?> tk = Thread.class;
3339              PARKBLOCKER = U.objectFieldOffset
3340                  (tk.getDeclaredField("parkBlocker"));

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines