ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.257 by jsr166, Thu Aug 6 23:59:34 2015 UTC vs.
Revision 1.258 by dl, Fri Aug 7 17:20:22 2015 UTC

# Line 289 | Line 289 | public class ForkJoinPool extends Abstra
289       * and their negations (used for thresholding) to fit into 16bit
290       * subfields.
291       *
292 <     * Field "runState" holds CASed state bits (STARTED, STOP, etc).
293 <     *
294 <     * Field "workQueues" holds references to WorkQueues.  We need a
295 <     * lock to create, resize, and install workers in this array. We
296 <     * opportunistically use the "stealCounter" object as a monitor
297 <     * protecting access; although it too is lazily initialized (see
298 <     * tryInitialize) and only exists when runstate is STARTED.  The
299 <     * workQueues array is otherwise concurrently readable, and
300 <     * accessed directly. We also ensure that reads of the array
301 <     * reference itself never become too stale (for example,
292 >     * Field "runState" holds state bits (STARTED, STOP, etc).  After
293 >     * starting, the field is updated under a lock (only during
294 >     * shutdown); opportunistically using the "stealCounter" object as
295 >     * a monitor. However stealCounter is itself lazily initialized,
296 >     * so tryInitialize bootstraps this using a private static lock.
297 >     *
298 >     * Field "workQueues" holds references to WorkQueues.  It is
299 >     * updated only under the lock, but is otherwise concurrently
300 >     * readable, and accessed directly. We also ensure that reads of
301 >     * the array reference itself never become too stale (for example,
302       * re-reading before each scan). To simplify index-based
303       * operations, the array size is always a power of two, and all
304       * readers must tolerate null slots. Worker queues are at odd
# Line 1158 | Line 1158 | public class ForkJoinPool extends Abstra
1158                                  }
1159                              }
1160                              else if (base == b)      // replace with proxy
1161 <                                removed =
1162 <                                    U.compareAndSwapObject(a, offset, t,
1163 <                                                           new EmptyTask());
1161 >                                removed = U.compareAndSwapObject(a, offset, t,
1162 >                                                                 new EmptyTask());
1163                              if (removed) {
1164                                  ForkJoinTask<?> ps = currentSteal;
1165                                  (currentSteal = task).doExec();
# Line 1450 | Line 1449 | public class ForkJoinPool extends Abstra
1449      final UncaughtExceptionHandler ueh;  // per-worker UEH
1450  
1451      /**
1452 <     * Instantiates fields upon first submission, and/or throws
1453 <     * exception if terminating. Called only by externalPush.
1452 >     * Instantiates fields upon first submission, or upon shutdown if
1453 >     * no submissions. If checkTermination true, also responds to
1454 >     * termination by external calls submitting tasks.
1455       */
1456 <    private void tryInitialize() {
1456 >    private void tryInitialize(boolean checkTermination) {
1457          if ((runState & STARTED) == 0) { // bootstrap by locking static field
1458              int rs; // create workQueues array with size a power of two
1459              int p = config & SMASK; // ensure at least 2 slots
1460              int n = (p > 1) ? p - 1 : 1;
1461              n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
1462 <            n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
1463 <            synchronized (modifyThreadPermission) {
1464 <                if ((runState & STARTED) == 0) {
1465 <                    stealCounter = new AtomicLong();
1466 <                    workQueues = new WorkQueue[n];
1467 <                    do {} while (!U.compareAndSwapInt(this, RUNSTATE,
1468 <                                                      rs = runState,
1469 <                                                      rs | STARTED));
1462 >            n |= n >>> 8; n |= n >>> 16; n = ((n + 1) << 1) & SMASK;
1463 >            AtomicLong sc = new AtomicLong();
1464 >            WorkQueue[] ws = new WorkQueue[n];
1465 >            synchronized(modifyThreadPermission) { // double-check
1466 >                if (((rs = runState) & STARTED) == 0) {
1467 >                    workQueues = ws;
1468 >                    runState = rs | STARTED;
1469 >                    stealCounter = sc;
1470                  }
1471              }
1472          }
1473 <        if (runState < 0) {
1473 >        if (checkTermination && runState < 0) {
1474              tryTerminate(false, false); // help terminate
1475              throw new RejectedExecutionException();
1476          }
# Line 1533 | Line 1533 | public class ForkJoinPool extends Abstra
1533       */
1534      final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1535          UncaughtExceptionHandler handler;
1536 <        Object lock = stealCounter;
1536 >        AtomicLong lock = stealCounter;
1537          wt.setDaemon(true);                           // configure thread
1538          if ((handler = ueh) != null)
1539              wt.setUncaughtExceptionHandler(handler);
# Line 1580 | Line 1580 | public class ForkJoinPool extends Abstra
1580      final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1581          WorkQueue w = null;
1582          if (wt != null && (w = wt.workQueue) != null) {
1583 <            Object lock; WorkQueue[] ws;              // remove index from array
1583 >            AtomicLong lock; WorkQueue[] ws;          // remove index from array
1584              int idx = w.config & SMASK;
1585              if ((lock = stealCounter) != null) {
1586                  synchronized (lock) {
# Line 1775 | Line 1775 | public class ForkJoinPool extends Abstra
1775          long deadline = (((scale <= 0) ? 1 : scale) * IDLE_TIMEOUT_MS +
1776                           System.currentTimeMillis());
1777          if (w != null && w.scanState < 0) {
1778 <            int ss; Object lock;
1778 >            int ss; AtomicLong lock;
1779              if (runState < 0 && tryTerminate(false, false))
1780                  stat = w.qlock = -1;               // help terminate
1781              else if ((stat = w.qlock) >= 0 && w.scanState < 0) {
# Line 1914 | Line 1914 | public class ForkJoinPool extends Abstra
1914                      if (t == null || b++ != q.base)
1915                          break;                     // busy or empty
1916                      else if (ss < 0) {
1917 <                        if ((ss = w.scanState) >= 0) {
1918 <                            tryReactivate(w, ws, r);
1919 <                            break;                 // retry upon rescan
1920 <                        }
1921 <                        r |= (1 << 31);            // ensure full scan
1917 >                        tryReactivate(w, ws, r);
1918 >                        break;                     // retry upon rescan
1919                      }
1920                      else if (!U.compareAndSwapObject(a, offset, t, null))
1921                          break;                     // contended
# Line 2346 | Line 2343 | public class ForkJoinPool extends Abstra
2343       * @return true if now terminating or terminated
2344       */
2345      private boolean tryTerminate(boolean now, boolean enable) {
2346 <        int rs;
2347 <        for (;;) {
2348 <            if ((rs = runState) < 0)              // already shut down
2349 <                break;
2350 <            if (!enable || this == common)
2351 <                return false;
2352 <            if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | SHUTDOWN))
2356 <                break;
2346 >        AtomicLong lock; int rs;
2347 >        if ((rs = runState) >= 0 && (!enable || this == common))
2348 >            return false;
2349 >        while ((lock = stealCounter) == null)
2350 >            tryInitialize(false);
2351 >        synchronized(lock) {
2352 >            rs = runState = runState | SHUTDOWN;
2353          }
2354  
2355          if ((rs & STOP) == 0) {
# Line 2383 | Line 2379 | public class ForkJoinPool extends Abstra
2379                          break;
2380                  }
2381              }
2382 <            while (((rs = runState) & STOP) == 0) // enter STOP phase
2383 <                U.compareAndSwapInt(this, RUNSTATE, rs, rs | STOP);
2382 >            synchronized(lock) {
2383 >                rs = runState = runState | STOP;
2384 >            }
2385          }
2386  
2387          int pass = 0;                             // 3 passes to help terminate
# Line 2393 | Line 2390 | public class ForkJoinPool extends Abstra
2390              long checkSum = ctl;
2391              if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
2392                  (ws = workQueues) == null || (m = ws.length - 1) < 0) {
2393 <                while (((rs = runState) & TERMINATED) == 0) {
2394 <                    if (U.compareAndSwapInt(this, RUNSTATE, rs,
2395 <                                            rs | TERMINATED)) {
2396 <                        synchronized (this) {
2397 <                            notifyAll();           // for awaitTermination
2401 <                        }
2402 <                        break;
2403 <                    }
2393 >                synchronized(lock) {
2394 >                    rs = runState = runState | TERMINATED;
2395 >                }
2396 >                synchronized (this) {
2397 >                    notifyAll();                   // for awaitTermination
2398                  }
2399                  break;
2400              }
# Line 2447 | Line 2441 | public class ForkJoinPool extends Abstra
2441       * @param index the index of the new queue
2442       */
2443      private void tryCreateExternalQueue(int index) {
2444 <        Object lock;
2444 >        AtomicLong lock;
2445          if ((lock = stealCounter) != null && index >= 0) {
2446              WorkQueue q = new WorkQueue(this, null);
2447              q.config = index;
# Line 2492 | Line 2486 | public class ForkJoinPool extends Abstra
2486              int rs = runState;
2487              WorkQueue[] ws = workQueues;
2488              if (rs <= 0 || ws == null || (wl = ws.length) <= 0)
2489 <                tryInitialize();
2489 >                tryInitialize(true);
2490              else if ((q = ws[k = (wl - 1) & r & SQMASK]) == null)
2491                  tryCreateExternalQueue(k);
2492              else if ((stat = q.sharedPush(task)) < 0)
# Line 3418 | Line 3412 | public class ForkJoinPool extends Abstra
3412      // Unsafe mechanics
3413      private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
3414      private static final long CTL;
3421    private static final long RUNSTATE;
3415      private static final int  ABASE;
3416      private static final int  ASHIFT;
3417  
# Line 3426 | Line 3419 | public class ForkJoinPool extends Abstra
3419          try {
3420              CTL = U.objectFieldOffset
3421                  (ForkJoinPool.class.getDeclaredField("ctl"));
3429            RUNSTATE = U.objectFieldOffset
3430                (ForkJoinPool.class.getDeclaredField("runState"));
3431
3422              ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
3423              int scale = U.arrayIndexScale(ForkJoinTask[].class);
3424              if ((scale & (scale - 1)) != 0)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines