ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.210 by dl, Fri Jul 11 16:10:38 2014 UTC vs.
Revision 1.211 by dl, Sat Jul 12 14:28:03 2014 UTC

# Line 113 | Line 113 | import java.security.Permissions;
113   * - the class name of a {@link ForkJoinWorkerThreadFactory}
114   * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
115   * - the class name of a {@link UncaughtExceptionHandler}
116 * <!--
116   * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
117   * - the maximum number of alloed extra threads to maintain target
118   * parallelism (default 256).
120 * -->
119   * </ul>
120   * If a {@link SecurityManager} is present and no factory is
121   * specified, then the default pool uses a factory supplying
# Line 216 | Line 214 | public class ForkJoinPool extends Abstra
214       * but otherwise may err on the side of possibly making the queue
215       * appear nonempty when a push, pop, or poll have not fully
216       * committed. (Method isEmpty() checks the case of a partially
217 <     * completed removal of the last element.)  Note that this means
218 <     * that the poll operation, considered individually, is not
219 <     * wait-free. One thief cannot successfully continue until another
220 <     * in-progress one (or, if previously empty, a push) completes.
221 <     * However, in the aggregate, we ensure at least probabilistic
217 >     * completed removal of the last element.)  Because of this, the
218 >     * poll operation, considered individually, is not wait-free. One
219 >     * thief cannot successfully continue until another in-progress
220 >     * one (or, if previously empty, a push) completes.  However, in
221 >     * the aggregate, we ensure at least probabilistic
222       * non-blockingness.  If an attempted steal fails, a thief always
223       * chooses a different random victim target to try next. So, in
224       * order for one thief to progress, it suffices for any
# Line 262 | Line 260 | public class ForkJoinPool extends Abstra
260       * resizing) but we use only a simple spinlock (using field
261       * qlock), because submitters encountering a busy queue move on to
262       * try or create other queues -- they block only when creating and
263 <     * registering new queues.
263 >     * registering new queues. Additionally, "qlock" saturates to an
264 >     * unlockable value (-1) at shutdown. Unlocking still can be and
265 >     * is performed by cheaper ordered writes of "qlock" in successful
266 >     * cases, but uses CAS in unsuccessful cases.
267       *
268       * Management
269       * ==========
# Line 416 | Line 417 | public class ForkJoinPool extends Abstra
417       * cannot find a task to steal, it deactivates and enqueues. Very
418       * often, the lack of tasks is transient due to GC or OS
419       * scheduling. To reduce false-alarm deactivation, scanners
420 <     * compute checksums of queue states during sweeps. They give up
421 <     * and try to deactivate only after the sum is stable across
422 <     * scans. Further, to avoid missed signals, they repeat this
423 <     * scanning process after successful enqueuing until again stable.
424 <     * In this state, the worker cannot take/run a task it sees until
425 <     * it is released from the queue, so the worker itself eventually
426 <     * tries to release itself or any successor (see tryRelease).
427 <     * Otherwise, upon an empty scan, a deactivated worker uses an
428 <     * adaptive local spin construction (see awaitWork) before
429 <     * blocking (via park). Note the unusual conventions about
420 >     * compute checksums of queue states during sweeps.  (The
421 >     * stability checks used here and elsewhere are probablistic
422 >     * variants of snapshot techniques -- see Herlihy & Shavit.)
423 >     * Workers give up and try to deactivate only after the sum is
424 >     * stable across scans. Further, to avoid missed signals, they
425 >     * repeat this scanning process after successful enqueuing until
426 >     * again stable.  In this state, the worker cannot take/run a task
427 >     * it sees until it is released from the queue, so the worker
428 >     * itself eventually tries to release itself or any successor (see
429 >     * tryRelease).  Otherwise, upon an empty scan, a deactivated
430 >     * worker uses an adaptive local spin construction (see awaitWork)
431 >     * before blocking (via park). Note the unusual conventions about
432       * Thread.interrupts surrounding parking and other blocking:
433       * Because interrupts are used solely to alert threads to check
434       * termination, which is checked anyway upon blocking, we clear
# Line 445 | Line 448 | public class ForkJoinPool extends Abstra
448       * noticeably long, and the time between signalling a thread and
449       * it actually making progress can be very noticeably long, so it
450       * is worth offloading these delays from critical paths as much as
451 <     * possible. Also, because enqueued workers are often rescanning
451 >     * possible. Also, because inactive workers are often rescanning
452       * or spinning rather than blocking, we set and clear the "parker"
453       * field of WorkQueues to reduce unnecessary calls to unpark.
454       * (This requires a secondary recheck to avoid missed signals.)
# Line 457 | Line 460 | public class ForkJoinPool extends Abstra
460       * number of threads decreases, eventually removing all workers.
461       * Also, when more than two spare threads exist, excess threads
462       * are immediately terminated at the next quiescent point.
463 <     * (Padding by two avoids hysteresis).
463 >     * (Padding by two avoids hysteresis.)
464       *
465       * Shutdown and Termination. A call to shutdownNow invokes
466       * tryTerminate to atomically set a runState bit. The calling
# Line 468 | Line 471 | public class ForkJoinPool extends Abstra
471       * number of workers).  Calls to non-abrupt shutdown() preface
472       * this by checking whether termination should commence. This
473       * relies primarily on the active count bits of "ctl" maintaining
474 <     * consensus about quiescence. However, external submitters do not
475 <     * take part in this consensus.  So, tryTerminate sweeps through
476 <     * queues to ensure lack of in-flight submissions and workers
474 >     * consensus -- tryterminate is called from awaitWork whenever
475 >     * quiescent. However, external submitters do not take part in
476 >     * this consensus.  So, tryTerminate sweeps through queues (until
477 >     * stable) to ensure lack of in-flight submissions and workers
478       * about to process them before triggering the "STOP" phase of
479 <     * termination.
479 >     * termination. (Note: there is an intrinsic conflict if
480 >     * helpQuiescePool is called when shutdown is enabled. Both wait
481 >     * for quiesence, but tryTerminate is biased to not trigger until
482 >     * helpQuiescePool completes.)
483 >     *
484       *
485       * Joining Tasks
486       * =============
# Line 557 | Line 565 | public class ForkJoinPool extends Abstra
565       * continuation tasks) blocks on a join and there still remain
566       * enough threads to ensure liveness.
567       *
568 <     * Bounds. The compensation mechanism may be bounded.  Bounds for
569 <     * the commonPool (see commonMaxSpares) better enable JVMs to cope
568 >     * The compensation mechanism may be bounded.  Bounds for the
569 >     * commonPool (see commonMaxSpares) better enable JVMs to cope
570       * with programming errors and abuse before running out of
571       * resources to do so. In other cases, users may supply factories
572       * that limit thread construction. The effects of bounding in this
# Line 712 | Line 720 | public class ForkJoinPool extends Abstra
720      // Masks and units for WorkQueue.scanState and ctl sp subfield
721      static final int SCANNING     = 1;             // false when running tasks
722      static final int INACTIVE     = 1 << 31;       // must be negative
723 <    static final int SS_SHIFT     = 16;            // shift for version count
716 <    static final int SS_SEQ       = 1 << SS_SHIFT; // version number
717 <    static final int SS_MASK      = 0x7fffffff;    // mask on update
723 >    static final int SS_SEQ       = 1 << 16;       // version count
724  
725      // Mode bits for ForkJoinPool.config and WorkQueue.config
726 <    static final int MODE_MASK    = SMASK << 16;
726 >    static final int MODE_MASK    = 0xffff << 16;  // top half of int
727      static final int LIFO_QUEUE   = 0;
728      static final int FIFO_QUEUE   = 1 << 16;
729      static final int SHARED_QUEUE = 1 << 31;       // must be negative
# Line 824 | Line 830 | public class ForkJoinPool extends Abstra
830                      if ((p = pool) != null)
831                          p.signalWork(p.workQueues, this);
832                  }
833 <                else if (n == m)
833 >                else if (n >= m)
834                      growArray();
835              }
836          }
# Line 1160 | Line 1166 | public class ForkJoinPool extends Abstra
1166                      s != Thread.State.TIMED_WAITING);
1167          }
1168  
1169 <        // Unsafe mechanics
1169 >        // Unsafe mechanics. Note that some are (and must be) the same as in FJP
1170          private static final sun.misc.Unsafe U;
1171          private static final int  ABASE;
1172          private static final int  ASHIFT;
# Line 1278 | Line 1284 | public class ForkJoinPool extends Abstra
1284       * spins. If/when MWAIT-like intrinsics becomes available, they
1285       * may allow quieter spinning. The value of SPINS must be a power
1286       * of two, at least 4. The current value causes spinning for a
1287 <     * small fraction of context-switch times that is worthwhile given
1288 <     * the typical likelihoods that blocking is not necessary.
1287 >     * small fraction of typical context-switch times, well worthwhile
1288 >     * given the typical likelihoods that blocking is not necessary.
1289       */
1290      private static final int SPINS  = 1 << 11;
1291  
# Line 1577 | Line 1583 | public class ForkJoinPool extends Abstra
1583       */
1584      final void signalWork(WorkQueue[] ws, WorkQueue q) {
1585          long c; int sp, i; WorkQueue v; Thread p;
1586 <        while ((c = ctl) < 0L) {
1586 >        while ((c = ctl) < 0L) {                       // too few active
1587              if ((sp = (int)c) == 0) {                  // no idle workers
1588                  if ((c & ADD_WORKER) != 0L)            // too few workers
1589                      tryAddWorker(c);
# Line 1615 | Line 1621 | public class ForkJoinPool extends Abstra
1621       */
1622      private boolean tryRelease(long c, WorkQueue v, long inc) {
1623          int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
1624 <        if (v != null && v.scanState == sp) { // v is at top of stack
1624 >        if (v != null && v.scanState == sp) {          // v is at top of stack
1625              long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
1626              if (U.compareAndSwapLong(this, CTL, c, nc)) {
1627                  v.scanState = vs;
# Line 1777 | Line 1783 | public class ForkJoinPool extends Abstra
1783                      U.park(false, parkTime);
1784                  U.putOrderedObject(w, QPARKER, null);
1785                  U.putObject(wt, PARKBLOCKER, null);
1780                if (w.qlock < 0)                      // terminated while parked
1781                    return false;
1786                  if (w.scanState >= 0)
1787                      break;
1788                  if (parkTime != 0L && ctl == c &&
# Line 2027 | Line 2031 | public class ForkJoinPool extends Abstra
2031       * caller if, by the time it tries to use the queue, it is empty.
2032       */
2033      private WorkQueue findNonEmptyStealQueue() {
2034 <        int r = ThreadLocalRandom.nextSecondarySeed(), oldSum = 0, checkSum;
2035 <        do {
2036 <            checkSum = 0;
2037 <            WorkQueue[] ws; WorkQueue q; int m, k, b;
2038 <            if ((ws = workQueues) != null && (m = ws.length - 1) > 0) {
2039 <                for (int i = 0; i <= m; ++i) {
2040 <                    if ((k = (i + r + m) & m) <= m && k >= 0 &&
2041 <                        (q = ws[k]) != null) {
2042 <                        if ((b = q.base) - q.top < 0)
2043 <                            return q;
2044 <                        checkSum += b;
2045 <                    }
2034 >        WorkQueue[] ws; int m;  // one-shot version of scan loop
2035 >        int r = ThreadLocalRandom.nextSecondarySeed();
2036 >        if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
2037 >            for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
2038 >                WorkQueue q; int b;
2039 >                if ((q = ws[k]) != null) {
2040 >                    if ((b = q.base) - q.top < 0)
2041 >                        return q;
2042 >                    checkSum += b;
2043 >                }
2044 >                if ((k = (k + 1) & m) == origin) {
2045 >                    if (oldSum == (oldSum = checkSum))
2046 >                        break;
2047 >                    checkSum = 0;
2048                  }
2049              }
2050 <        } while (oldSum != (oldSum = checkSum));
2050 >        }
2051          return null;
2052      }
2053  
# Line 2052 | Line 2058 | public class ForkJoinPool extends Abstra
2058       * find tasks either.
2059       */
2060      final void helpQuiescePool(WorkQueue w) {
2061 +        ForkJoinTask<?> ps = w.currentSteal; // save context
2062          for (boolean active = true;;) {
2063              long c; WorkQueue q; ForkJoinTask<?> t; int b;
2064 <            while ((t = w.nextLocalTask()) != null)
2058 <                t.doExec();
2064 >            w.execLocalTasks();     // run locals before each scan
2065              if ((q = findNonEmptyStealQueue()) != null) {
2066                  if (!active) {      // re-establish active count
2067                      active = true;
2068                      U.getAndAddLong(this, CTL, AC_UNIT);
2069                  }
2070                  if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2065                    ForkJoinTask<?> ps = w.currentSteal;
2071                      U.putOrderedObject(w, QCURRENTSTEAL, t);
2072                      t.doExec();
2068                    U.putOrderedObject(w, QCURRENTSTEAL, ps);
2073                      ++w.nsteals;
2074                  }
2075              }
# Line 2080 | Line 2084 | public class ForkJoinPool extends Abstra
2084                       U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
2085                  break;
2086          }
2087 +        U.putOrderedObject(w, QCURRENTSTEAL, ps);
2088      }
2089  
2090      /**
# Line 2180 | Line 2185 | public class ForkJoinPool extends Abstra
2185  
2186          if ((rs & STOP) == 0) {
2187              if (!now) {                           // check quiescence
2188 <                outer: for (long oldSum = 0L;;) { // repeat until stable
2188 >                for (long oldSum = 0L;;) {        // repeat until stable
2189                      WorkQueue[] ws; WorkQueue w; int m, b; long c;
2190                      long checkSum = ctl;
2191                      if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
# Line 2191 | Line 2196 | public class ForkJoinPool extends Abstra
2196                          if ((w = ws[i]) != null) {
2197                              if ((b = w.base) != w.top || w.scanState >= 0 ||
2198                                  w.currentSteal != null) {
2194                                if ((runState & STOP) != 0)
2195                                    break outer;  // already stopping
2199                                  tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
2200 <                                return false;     // ensure recheck
2200 >                                return false;     // arrange for recheck
2201                              }
2202                              checkSum += b;
2203                              if ((i & 1) == 0)
# Line 3284 | Line 3287 | public class ForkJoinPool extends Abstra
3287      private static final int  ABASE;
3288      private static final int  ASHIFT;
3289      private static final long CTL;
3287    private static final long PARKBLOCKER;
3288    private static final long STEALCOUNT;
3290      private static final long RUNSTATE;
3291 <    private static final long QBASE;
3291 >    private static final long STEALCOUNT;
3292 >    private static final long PARKBLOCKER;
3293 >    private static final long QBASE;      // these must be same as in WorkQueue
3294      private static final long QTOP;
3295      private static final long QLOCK;
3296      private static final long QSCANSTATE;
# Line 3302 | Line 3305 | public class ForkJoinPool extends Abstra
3305              Class<?> k = ForkJoinPool.class;
3306              CTL = U.objectFieldOffset
3307                  (k.getDeclaredField("ctl"));
3305            STEALCOUNT = U.objectFieldOffset
3306                (k.getDeclaredField("stealCount"));
3308              RUNSTATE = U.objectFieldOffset
3309                  (k.getDeclaredField("runState"));
3310 +            STEALCOUNT = U.objectFieldOffset
3311 +                (k.getDeclaredField("stealCount"));
3312              Class<?> tk = Thread.class;
3313              PARKBLOCKER = U.objectFieldOffset
3314                  (tk.getDeclaredField("parkBlocker"));

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines