ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.293 by jsr166, Sun Oct 11 20:47:31 2015 UTC vs.
Revision 1.294 by dl, Mon Oct 12 13:34:03 2015 UTC

# Line 289 | Line 289 | public class ForkJoinPool extends Abstra
289       * and their negations (used for thresholding) to fit into 16bit
290       * subfields.
291       *
292 <     * Field "runState" holds state bits (STARTED, STOP, etc).  After
293 <     * starting, the field is updated (only during shutdown) under the
294 <     * auxState lock.
292 >     * Field "runState" holds lifetime status, atomically and
293 >     * monotonically setting STARTED, SHUTDOWN, STOP, and finally
294 >     * TERMINATED bits.
295       *
296       * Field "auxState" is a ReentrantLock subclass that also
297       * opportunistically holds some other bookkeeping fields accessed
298       * only when locked.  It is mainly used to lock (infrequent)
299 <     * updates to runState and workQueues.  The auxState instance is
300 <     * itself lazily constructed (see tryInitialize), requiring a
301 <     * double-check-style bootstrapping use of field runState, and
302 <     * locking a private static.
299 >     * updates to workQueues.  The auxState instance is itself lazily
300 >     * constructed (see tryInitialize), requiring a double-check-style
301 >     * bootstrapping use of field runState, and locking a private
302 >     * static.
303       *
304       * Field "workQueues" holds references to WorkQueues.  It is
305       * updated (only during worker creation and termination) under the
# Line 460 | Line 460 | public class ForkJoinPool extends Abstra
460       * thread, as well as every other worker thereafter terminating,
461       * helps terminate others by setting their (qlock) status,
462       * cancelling their unprocessed tasks, and waking them up, doing
463 <     * so repeatedly until stable (but with a loop bounded by the
464 <     * number of workers).  Calls to non-abrupt shutdown() preface
465 <     * this by checking whether termination should commence. This
466 <     * relies primarily on the active count bits of "ctl" maintaining
467 <     * consensus -- tryTerminate is called from awaitWork whenever
468 <     * quiescent. However, external submitters do not take part in
469 <     * this consensus.  So, tryTerminate sweeps through queues (until
470 <     * stable) to ensure lack of in-flight submissions and workers
471 <     * about to process them before triggering the "STOP" phase of
472 <     * termination. (Note: there is an intrinsic conflict if
473 <     * helpQuiescePool is called when shutdown is enabled. Both wait
474 <     * for quiescence, but tryTerminate is biased to not trigger until
475 <     * helpQuiescePool completes.)
463 >     * so repeatedly until stable. Calls to non-abrupt shutdown()
464 >     * preface this by checking whether termination should
465 >     * commence. This relies primarily on the active count bits of
466 >     * "ctl" maintaining consensus -- tryTerminate is called from
467 >     * awaitWork whenever quiescent. However, external submitters do
468 >     * not take part in this consensus.  So, tryTerminate sweeps
469 >     * through queues (until stable) to ensure lack of in-flight
470 >     * submissions and workers about to process them before triggering
471 >     * the "STOP" phase of termination. (Note: there is an intrinsic
472 >     * conflict if helpQuiescePool is called when shutdown is
473 >     * enabled. Both wait for quiescence, but tryTerminate is biased
474 >     * to not trigger until helpQuiescePool completes.)
475       *
476       * Joining Tasks
477       * =============
# Line 1469 | Line 1468 | public class ForkJoinPool extends Abstra
1468       * termination by external calls submitting tasks.
1469       */
1470      private void tryInitialize(boolean checkTermination) {
1471 <        if ((runState & STARTED) == 0) { // bootstrap by locking static field
1472 <            int rs; // create workQueues array with size a power of two
1473 <            int p = config & SMASK; // ensure at least 2 slots
1474 <            int n = (p > 1) ? p - 1 : 1;
1476 <            n |= n >>> 1;
1471 >        if (runState == 0) { // bootstrap by locking static field
1472 >            int p = config & SMASK;
1473 >            int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
1474 >            n |= n >>> 1;    // create workQueues array with size a power of two
1475              n |= n >>> 2;
1476              n |= n >>> 4;
1477              n |= n >>> 8;
# Line 1482 | Line 1480 | public class ForkJoinPool extends Abstra
1480              AuxState aux = new AuxState();
1481              WorkQueue[] ws = new WorkQueue[n];
1482              synchronized (modifyThreadPermission) { // double-check
1483 <                if (((rs = runState) & STARTED) == 0) {
1486 <                    rs |= STARTED;
1483 >                if (runState == 0) {
1484                      workQueues = ws;
1485                      auxState = aux;
1486 <                    runState = rs;
1486 >                    runState = STARTED;
1487                  }
1488              }
1489          }
# Line 1560 | Line 1557 | public class ForkJoinPool extends Abstra
1557          WorkQueue w = new WorkQueue(this, wt);
1558          int i = 0;                                    // assign a pool index
1559          int mode = config & MODE_MASK;
1563        int rs = runState;
1560          if ((aux = auxState) != null) {
1561              aux.lock();
1562              try {
# Line 1603 | Line 1599 | public class ForkJoinPool extends Abstra
1599       */
1600      final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1601          WorkQueue w = null;
1606        int rs = runState;
1602          if (wt != null && (w = wt.workQueue) != null) {
1603              AuxState aux; WorkQueue[] ws;          // remove index from array
1604              int idx = w.config & SMASK;
# Line 2375 | Line 2370 | public class ForkJoinPool extends Abstra
2370       * @param now if true, unconditionally terminate, else only
2371       * if no work and no active workers
2372       * @param enable if true, terminate when next possible
2373 <     * @return -1 : terminating or terminated, 0: retry if internal caller, else 1
2373 >     * @return -1: terminating/terminated, 0: retry if internal caller, else 1
2374       */
2375      private int tryTerminate(boolean now, boolean enable) {
2376 <        AuxState aux;
2382 <        while ((aux = auxState) == null)
2383 <            tryInitialize(false);                 // ensure initialized
2376 >        int rs; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2377  
2378 <        if ((runState & SHUTDOWN) == 0) {
2379 <            if (!enable || this == common)
2378 >        while ((rs = runState) >= 0) {
2379 >            if (!enable || this == common)        // cannot shutdown
2380                  return 1;
2381 <            aux.lock();
2382 <            runState = runState | SHUTDOWN;
2383 <            aux.unlock();
2381 >            else if (rs == 0)
2382 >                tryInitialize(false);             // ensure initialized
2383 >            else
2384 >                U.compareAndSwapInt(this, RUNSTATE, rs, rs | SHUTDOWN);
2385          }
2386  
2387 <        if ((runState & STOP) == 0) {
2387 >        if ((rs & STOP) == 0) {                   // try to initiate termination
2388              if (!now) {                           // check quiescence
2389                  for (long oldSum = 0L;;) {        // repeat until stable
2390                      WorkQueue[] ws; WorkQueue w; int b;
# Line 2410 | Line 2404 | public class ForkJoinPool extends Abstra
2404                          break;
2405                  }
2406              }
2407 <            aux.lock();
2408 <            runState = runState | STOP;
2415 <            aux.unlock();
2407 >            do {} while (!U.compareAndSwapInt(this, RUNSTATE,
2408 >                                              rs = runState, rs | STOP));
2409          }
2410  
2411          for (long oldSum = 0L;;) {                // repeat until stable
# Line 2440 | Line 2433 | public class ForkJoinPool extends Abstra
2433          }
2434  
2435          if ((short)(ctl >>> TC_SHIFT) + (config & SMASK) <= 0) {
2436 <            aux.lock();
2444 <            runState |= TERMINATED;
2445 <            aux.unlock();
2436 >            runState = (STARTED | SHUTDOWN | STOP | TERMINATED); // final write
2437              synchronized (this) {
2438                  notifyAll();                      // for awaitTermination
2439              }
# Line 2469 | Line 2460 | public class ForkJoinPool extends Abstra
2460              q.qlock = 1;                   // lock queue
2461              boolean installed = false;
2462              aux.lock();
2463 <            try {          // lock pool to install
2463 >            try {                          // lock pool to install
2464                  WorkQueue[] ws;
2465                  if ((ws = workQueues) != null && index < ws.length &&
2466                      ws[index] == null) {
# Line 2512 | Line 2503 | public class ForkJoinPool extends Abstra
2503                  tryInitialize(true);
2504              else if ((q = ws[k = (wl - 1) & r & SQMASK]) == null)
2505                  tryCreateExternalQueue(k);
2506 <            else if ((stat = q.sharedPush(task)) <= 0) {
2507 <                if ((runState & STOP) != 0)
2508 <                    tryTerminate(false, false);
2509 <                else if (stat == 0)
2519 <                    signalWork();
2506 >            else if ((stat = q.sharedPush(task)) < 0)
2507 >                break;
2508 >            else if (stat == 0) {
2509 >                signalWork();
2510                  break;
2511              }
2512              else                          // move if busy
# Line 3435 | Line 3425 | public class ForkJoinPool extends Abstra
3425      // Unsafe mechanics
3426      private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
3427      private static final long CTL;
3428 +    private static final long RUNSTATE;
3429      private static final int ABASE;
3430      private static final int ASHIFT;
3431  
# Line 3442 | Line 3433 | public class ForkJoinPool extends Abstra
3433          try {
3434              CTL = U.objectFieldOffset
3435                  (ForkJoinPool.class.getDeclaredField("ctl"));
3436 +            RUNSTATE = U.objectFieldOffset
3437 +                (ForkJoinPool.class.getDeclaredField("runState"));
3438              ABASE = U.arrayBaseOffset(ForkJoinTask[].class);
3439              int scale = U.arrayIndexScale(ForkJoinTask[].class);
3440              if ((scale & (scale - 1)) != 0)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines