ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.142 by jsr166, Wed Nov 14 19:05:03 2012 UTC vs.
Revision 1.148 by jsr166, Tue Nov 20 06:18:39 2012 UTC

# Line 37 | Line 37 | import java.util.concurrent.TimeUnit;
37   * ForkJoinPool}s may also be appropriate for use with event-style
38   * tasks that are never joined.
39   *
40 < * <p>A static {@link #commonPool} is available and appropriate for
40 > * <p>A static {@link #commonPool()} is available and appropriate for
41   * most applications. The common pool is used by any ForkJoinTask that
42   * is not explicitly submitted to a specified pool. Using the common
43   * pool normally reduces resource usage (its threads are slowly
# Line 62 | Line 62 | import java.util.concurrent.TimeUnit;
62   * {@link #toString} returns indications of pool state in a
63   * convenient form for informal monitoring.
64   *
65 < * <p> As is the case with other ExecutorServices, there are three
65 > * <p>As is the case with other ExecutorServices, there are three
66   * main task execution methods summarized in the following table.
67   * These are designed to be used primarily by clients not already
68   * engaged in fork/join computations in the current pool.  The main
# Line 104 | Line 104 | import java.util.concurrent.TimeUnit;
104   * an integer greater than zero, {@code threadFactory} -- the class
105   * name of a {@link ForkJoinWorkerThreadFactory}, and {@code
106   * exceptionHandler} -- the class name of a {@link
107 + * java.lang.Thread.UncaughtExceptionHandler
108   * Thread.UncaughtExceptionHandler}. Upon any error in establishing
109   * these settings, default parameters are used.
110   *
# Line 238 | Line 239 | public class ForkJoinPool extends Abstra
239       * enable shutdown.  When used as a lock, it is normally only very
240       * briefly held, so is nearly always available after at most a
241       * brief spin, but we use a monitor-based backup strategy to
242 <     * blocking when needed.
242 >     * block when needed.
243       *
244       * Recording WorkQueues.  WorkQueues are recorded in the
245       * "workQueues" array that is created upon first use and expanded
# Line 247 | Line 248 | public class ForkJoinPool extends Abstra
248       * by a lock but the array is otherwise concurrently readable, and
249       * accessed directly.  To simplify index-based operations, the
250       * array size is always a power of two, and all readers must
251 <     * tolerate null slots. Worker queues are at odd indices Shared
251 >     * tolerate null slots. Worker queues are at odd indices. Shared
252       * (submission) queues are at even indices, up to a maximum of 64
253       * slots, to limit growth even if array needs to expand to add
254       * more workers. Grouping them together in this way simplifies and
# Line 317 | Line 318 | public class ForkJoinPool extends Abstra
318       * general, pools will be over-signalled.  When a submission is
319       * added or another worker adds a task to a queue that is
320       * apparently empty, they signal waiting workers (or trigger
321 <     * creation of new ones if fewer than the given parallelism level
322 <     * -- see signalWork).  These primary signals are buttressed by
323 <     * signals whenever other threads scan for work or do not have a
324 <     * task to process. On most platforms, signalling (unpark)
325 <     * overhead time is noticeably long, and the time between
326 <     * signalling a thread and it actually making progress can be very
327 <     * noticeably long, so it is worth offloading these delays from
328 <     * critical paths as much as possible.
321 >     * creation of new ones if fewer than the given parallelism
322 >     * level).  These primary signals are buttressed by signals
323 >     * whenever other threads scan for work or do not have a task to
324 >     * process (including the case of leaving a hint to unparked
325 >     * threads to help signal others upon wakeup).  On most platforms,
326 >     * signalling (unpark) overhead time is noticeably long, and the
327 >     * time between signalling a thread and it actually making
328 >     * progress can be very noticeably long, so it is worth offloading
329 >     * these delays from critical paths as much as possible.
330       *
331       * Trimming workers. To release resources after periods of lack of
332       * use, a worker starting to wait when the pool is quiescent will
# Line 392 | Line 394 | public class ForkJoinPool extends Abstra
394       * steals, rather than use per-task bookkeeping.  This sometimes
395       * requires a linear scan of workQueues array to locate stealers,
396       * but often doesn't because stealers leave hints (that may become
397 <     * stale/wrong) of where to locate them.  A stealHint is only a
398 <     * hint because a worker might have had multiple steals and the
399 <     * hint records only one of them (usually the most current).
400 <     * Hinting isolates cost to when it is needed, rather than adding
401 <     * to per-task overhead.  (2) It is "shallow", ignoring nesting
402 <     * and potentially cyclic mutual steals.  (3) It is intentionally
397 >     * stale/wrong) of where to locate them.  It is only a hint
398 >     * because a worker might have had multiple steals and the hint
399 >     * records only one of them (usually the most current).  Hinting
400 >     * isolates cost to when it is needed, rather than adding to
401 >     * per-task overhead.  (2) It is "shallow", ignoring nesting and
402 >     * potentially cyclic mutual steals.  (3) It is intentionally
403       * racy: field currentJoin is updated only while actively joining,
404       * which means that we miss links in the chain during long-lived
405       * tasks, GC stalls etc (which is OK since blocking in such cases
# Line 524 | Line 526 | public class ForkJoinPool extends Abstra
526       * Default ForkJoinWorkerThreadFactory implementation; creates a
527       * new ForkJoinWorkerThread.
528       */
529 <    static class DefaultForkJoinWorkerThreadFactory
529 >    static final class DefaultForkJoinWorkerThreadFactory
530          implements ForkJoinWorkerThreadFactory {
531 <        public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
531 >        public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
532              return new ForkJoinWorkerThread(pool);
533          }
534      }
# Line 597 | Line 599 | public class ForkJoinPool extends Abstra
599       * trades off slightly slower average field access for the sake of
600       * avoiding really bad worst-case access. (Until better JVM
601       * support is in place, this padding is dependent on transient
602 <     * properties of JVM field layout rules.)  We also take care in
601 <     * allocating, sizing and resizing the array. Non-shared queue
602 <     * arrays are initialized by workers before use. Others are
603 <     * allocated on first use.
602 >     * properties of JVM field layout rules.)
603       */
604      static final class WorkQueue {
605          /**
# Line 626 | Line 625 | public class ForkJoinPool extends Abstra
625          int seed;                  // for random scanning; initialize nonzero
626          volatile int eventCount;   // encoded inactivation count; < 0 if inactive
627          int nextWait;              // encoded record of next event waiter
628 <        final int mode;            // lifo, fifo, or shared
630 <        int nsteals;               // cumulative number of steals
628 >        int hint;                  // steal or signal hint (index)
629          int poolIndex;             // index of this queue in pool (or 0)
630 <        int stealHint;             // index of most recent known stealer
630 >        final int mode;            // 0: lifo, > 0: fifo, < 0: shared
631 >        int nsteals;               // number of steals
632          volatile int qlock;        // 1: locked, -1: terminate; else 0
633          volatile int base;         // index of next slot for poll
634          int top;                   // index of next slot for push
# Line 639 | Line 638 | public class ForkJoinPool extends Abstra
638          volatile Thread parker;    // == owner during call to park; else null
639          volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
640          ForkJoinTask<?> currentSteal; // current non-local task being executed
641 +
642          // Heuristic padding to ameliorate unfortunate memory placements
643          Object p00, p01, p02, p03, p04, p05, p06, p07;
644 <        Object p08, p09, p0a, p0b, p0c, p0d, p0e;
644 >        Object p08, p09, p0a, p0b, p0c;
645  
646 <        WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode) {
647 <            this.mode = mode;
646 >        WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode,
647 >                  int seed) {
648 >            this.array = new ForkJoinTask<?>[WorkQueue.INITIAL_QUEUE_CAPACITY];
649              this.pool = pool;
650              this.owner = owner;
651 <            // Place indices in the center of array (that is not yet allocated)
651 >            this.mode = mode;
652 >            this.seed = seed;
653 >            // Place indices in the center of array
654              base = top = INITIAL_QUEUE_CAPACITY >>> 1;
655          }
656  
# Line 660 | Line 663 | public class ForkJoinPool extends Abstra
663           * @throw RejectedExecutionException if array cannot be resized
664           */
665          final void push(ForkJoinTask<?> task) {
666 <            ForkJoinPool p; ForkJoinTask<?>[] a;
667 <            int s = top, n;
668 <            if ((a = array) != null && a.length > (n = s + 1 - base)) {
666 >            ForkJoinTask<?>[] a; ForkJoinPool p;
667 >            int s = top, m, n;
668 >            if ((a = array) != null) {    // ignore if queue removed
669                  U.putOrderedObject
670 <                    (a, (((a.length - 1) & s) << ASHIFT) + ABASE, task);
671 <                top = s + 1;
672 <                if (n <= 1 && (p = pool) != null)
673 <                    p.signalWork(this, 1);
670 >                    (a, (((m = a.length - 1) & s) << ASHIFT) + ABASE, task);
671 >                if ((n = (top = s + 1) - base) <= 1) {
672 >                    if ((p = pool) != null)
673 >                        p.signalWork(this, 0);
674 >                }
675 >                else if (n >= m)
676 >                    growArray();
677              }
672            else
673                fullPush(task, true);
678          }
679  
680          /**
681           * Pushes a task if lock is free and array is either big
682 <         * enough or can be resized to be big enough. Note: a
679 <         * specialization of a common fast path of this method is in
680 <         * ForkJoinPool.externalPush. When called from a FJWT queue,
681 <         * this can fail only if the pool has been shut down or
682 <         * an out of memory error.
682 >         * enough or can be resized to be big enough.
683           *
684           * @param task the task. Caller must ensure non-null.
685 <         * @param owned if true, throw RJE on failure
685 >         * @return true if submitted
686           */
687 <        final boolean fullPush(ForkJoinTask<?> task, boolean owned) {
688 <            ForkJoinPool p; ForkJoinTask<?>[] a;
689 <            if (owned) {
690 <                if (qlock < 0) // must be shutting down
691 <                    throw new RejectedExecutionException();
692 <            }
693 <            else if (!U.compareAndSwapInt(this, QLOCK, 0, 1))
694 <                return false;
695 <            try {
696 <                int s = top, oldLen, len;
697 <                if ((a = array) == null)
698 <                    a = array = new ForkJoinTask<?>[len=INITIAL_QUEUE_CAPACITY];
699 <                else if ((oldLen = a.length) > s + 1 - base)
700 <                    len = oldLen;
701 <                else if ((len = oldLen << 1) > MAXIMUM_QUEUE_CAPACITY)
702 <                    throw new RejectedExecutionException("Capacity exceeded");
703 <                else {
704 <                    int oldMask, b;
705 <                    ForkJoinTask<?>[] oldA = a;
706 <                    a = array = new ForkJoinTask<?>[len];
707 <                    if ((oldMask = oldLen - 1) >= 0 && s - (b = base) > 0) {
708 <                        int mask = len - 1;
709 <                        do {
710 <                            ForkJoinTask<?> x;
711 <                            int oldj = ((b & oldMask) << ASHIFT) + ABASE;
712 <                            int j    = ((b &    mask) << ASHIFT) + ABASE;
713 <                            x = (ForkJoinTask<?>)
714 <                                U.getObjectVolatile(oldA, oldj);
715 <                            if (x != null &&
716 <                                U.compareAndSwapObject(oldA, oldj, x, null))
717 <                                U.putObjectVolatile(a, j, x);
718 <                        } while (++b != s);
719 <                    }
720 <                }
721 <                U.putOrderedObject
722 <                    (a, (((len - 1) & s) << ASHIFT) + ABASE, task);
723 <                top = s + 1;
724 <            } finally {
725 <                if (!owned)
726 <                    qlock = 0;
687 >        final boolean trySharedPush(ForkJoinTask<?> task) {
688 >            boolean submitted = false;
689 >            if (qlock == 0 && U.compareAndSwapInt(this, QLOCK, 0, 1)) {
690 >                ForkJoinTask<?>[] a = array;  ForkJoinPool p;
691 >                int s = top;
692 >                try {
693 >                    if ((a != null && a.length > s + 1 - base) ||
694 >                        (a = growArray()) != null) {   // must presize
695 >                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
696 >                        U.putOrderedObject(a, j, task);
697 >                        top = s + 1;
698 >                        submitted = true;
699 >                    }
700 >                } finally {
701 >                    qlock = 0;                         // unlock
702 >                }
703 >                if (submitted && (p = pool) != null)
704 >                    p.signalWork(this, 0);
705 >            }
706 >            return submitted;
707 >        }
708 >
709 >       /**
710 >         * Initializes or doubles the capacity of array. Call either
711 >         * by owner or with lock held -- it is OK for base, but not
712 >         * top, to move while resizings are in progress.
713 >         */
714 >        final ForkJoinTask<?>[] growArray() {
715 >            ForkJoinTask<?>[] oldA = array;
716 >            int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
717 >            if (size > MAXIMUM_QUEUE_CAPACITY)
718 >                throw new RejectedExecutionException("Queue capacity exceeded");
719 >            int oldMask, t, b;
720 >            ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
721 >            if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
722 >                (t = top) - (b = base) > 0) {
723 >                int mask = size - 1;
724 >                do {
725 >                    ForkJoinTask<?> x;
726 >                    int oldj = ((b & oldMask) << ASHIFT) + ABASE;
727 >                    int j    = ((b &    mask) << ASHIFT) + ABASE;
728 >                    x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
729 >                    if (x != null &&
730 >                        U.compareAndSwapObject(oldA, oldj, x, null))
731 >                        U.putObjectVolatile(a, j, x);
732 >                } while (++b != t);
733              }
734 <            if ((p = pool) != null)
729 <                p.signalWork(this, 1);
730 <            return true;
734 >            return a;
735          }
736  
737          /**
# Line 979 | Line 983 | public class ForkJoinPool extends Abstra
983              if (t != null) {
984                  (currentSteal = t).doExec();
985                  currentSteal = null;
986 <                if (++nsteals < 0) {     // spill on overflow
983 <                    ForkJoinPool p;
984 <                    if ((p = pool) != null)
985 <                        p.collectStealCount(this);
986 <                }
986 >                ++nsteals;
987                  if (top != base) {       // process remaining local tasks
988                      if (mode == 0)
989                          popAndExecAll();
# Line 1056 | Line 1056 | public class ForkJoinPool extends Abstra
1056          }
1057      }
1058  
1059 +    // static fields (initialized in static initializer below)
1060 +
1061 +    /**
1062 +     * Creates a new ForkJoinWorkerThread. This factory is used unless
1063 +     * overridden in ForkJoinPool constructors.
1064 +     */
1065 +    public static final ForkJoinWorkerThreadFactory
1066 +        defaultForkJoinWorkerThreadFactory;
1067 +
1068      /**
1069       * Per-thread records for threads that submit to pools. Currently
1070       * holds only pseudo-random seed / index that is used to choose
# Line 1076 | Line 1085 | public class ForkJoinPool extends Abstra
1085          Submitter(int s) { seed = s; }
1086      }
1087  
1079    /** Property prefix for constructing common pool */
1080    private static final String propPrefix =
1081        "java.util.concurrent.ForkJoinPool.common.";
1082
1083    // static fields (initialized in static initializer below)
1084
1088      /**
1089 <     * Creates a new ForkJoinWorkerThread. This factory is used unless
1090 <     * overridden in ForkJoinPool constructors.
1089 >     * Per-thread submission bookkeeping. Shared across all pools
1090 >     * to reduce ThreadLocal pollution and because random motion
1091 >     * to avoid contention in one pool is likely to hold for others.
1092 >     * Lazily initialized on first submission (but null-checked
1093 >     * in other contexts to avoid unnecessary initialization).
1094       */
1095 <    public static final ForkJoinWorkerThreadFactory
1090 <        defaultForkJoinWorkerThreadFactory;
1095 >    static final ThreadLocal<Submitter> submitters;
1096  
1097      /**
1098       * Common (static) pool. Non-null for public use unless a static
# Line 1104 | Line 1109 | public class ForkJoinPool extends Abstra
1109      private static final RuntimePermission modifyThreadPermission;
1110  
1111      /**
1107     * Per-thread submission bookkeeping. Shared across all pools
1108     * to reduce ThreadLocal pollution and because random motion
1109     * to avoid contention in one pool is likely to hold for others.
1110     * Lazily initialized on first submission (but null-checked
1111     * in other contexts to avoid unnecessary initialization).
1112     */
1113    static final ThreadLocal<Submitter> submitters;
1114
1115    /**
1112       * Common pool parallelism. Must equal commonPool.parallelism.
1113       */
1114      static final int commonPoolParallelism;
# Line 1247 | Line 1243 | public class ForkJoinPool extends Abstra
1243      static final int FIFO_QUEUE          =  1;
1244      static final int SHARED_QUEUE        = -1;
1245  
1246 +    // bounds for #steps in scan loop -- must be power 2 minus 1
1247 +    private static final int MIN_SCAN    = 0x1ff;   // cover estimation slop
1248 +    private static final int MAX_SCAN    = 0x1ffff; // 4 * max workers
1249 +
1250      // Instance fields
1251  
1252      /*
1253 <     * Field layout order in this class tends to matter more than one
1254 <     * would like. Runtime layout order is only loosely related to
1253 >     * Field layout of this class tends to matter more than one would
1254 >     * like. Runtime layout order is only loosely related to
1255       * declaration order and may differ across JVMs, but the following
1256       * empirically works OK on current JVMs.
1257       */
1258      volatile long stealCount;                  // collects worker counts
1259      volatile long ctl;                         // main pool control
1260    final int parallelism;                     // parallelism level
1261    final int localMode;                       // per-worker scheduling mode
1262    volatile int indexSeed;                    // worker/submitter index seed
1260      volatile int plock;                        // shutdown status and seqLock
1261 +    volatile int indexSeed;                    // worker/submitter index seed
1262 +    final int config;                          // mode and parallelism level
1263      WorkQueue[] workQueues;                    // main registry
1264 <    final ForkJoinWorkerThreadFactory factory; // factory for new workers
1264 >    final ForkJoinWorkerThreadFactory factory;
1265      final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
1266      final String workerNamePrefix;             // to create worker name string
1267  
# Line 1282 | Line 1281 | public class ForkJoinPool extends Abstra
1281              if (((ps = plock) & PL_LOCK) == 0 &&
1282                  U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
1283                  return nps;
1284 <            else if (r == 0)
1285 <                r = ThreadLocalRandom.current().nextInt(); // randomize spins
1284 >            else if (r == 0) { // randomize spins if possible
1285 >                Thread t = Thread.currentThread(); WorkQueue w; Submitter z;
1286 >                if ((t instanceof ForkJoinWorkerThread) &&
1287 >                    (w = ((ForkJoinWorkerThread)t).workQueue) != null)
1288 >                    r = w.seed;
1289 >                else if ((z = submitters.get()) != null)
1290 >                    r = z.seed;
1291 >                else
1292 >                    r = 1;
1293 >            }
1294              else if (spins >= 0) {
1295                  r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift
1296                  if (r >= 0)
# Line 1317 | Line 1324 | public class ForkJoinPool extends Abstra
1324          synchronized (this) { notifyAll(); }
1325      }
1326  
1327 <    //  Registering and deregistering workers
1327 >    /**
1328 >     * Tries to create and start a worker; adjusts counts etc on failure
1329 >     */
1330 >    private void addWorker() {
1331 >        ForkJoinWorkerThread wt = null;
1332 >        try {
1333 >            (wt = factory.newThread(this)).start();
1334 >        } catch (Throwable ex) {
1335 >            deregisterWorker(wt, ex); // adjust on failure
1336 >        }
1337 >    }
1338  
1339      /**
1340 <     * Callback from ForkJoinWorkerThread constructor to establish its
1341 <     * poolIndex and record its WorkQueue. To avoid scanning bias due
1342 <     * to packing entries in front of the workQueues array, we treat
1343 <     * the array as a simple power-of-two hash table using per-thread
1344 <     * seed as hash, expanding as needed.
1345 <     *
1346 <     * @param w the worker's queue
1347 <     */
1348 <    final void registerWorker(WorkQueue w) {
1349 <        int s, ps; // generate a rarely colliding candidate index seed
1350 <        do {} while (!U.compareAndSwapInt(this, INDEXSEED,
1351 <                                          s = indexSeed, s += SEED_INCREMENT) ||
1352 <                     s == 0); // skip 0
1340 >     * Performs secondary initialization, called when plock is zero.
1341 >     * Creates workQueue array and sets plock to a valid value.  The
1342 >     * lock body must be exception-free (so no try/finally) so we
1343 >     * optimistically allocate new array outside the lock and throw
1344 >     * away if (very rarely) not needed. (A similar tactic is used in
1345 >     * fullExternalPush.)  Because the plock seq value can eventually
1346 >     * wrap around zero, this method harmlessly fails to reinitialize
1347 >     * if workQueues exists, while still advancing plock.
1348 >     */
1349 >    private void initWorkQueuesArray() {
1350 >        WorkQueue[] ws; int ps;
1351 >        int p = config & SMASK;        // find power of two table size
1352 >        int n = (p > 1) ? p - 1 : 1;   // ensure at least 2 slots
1353 >        n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
1354 >        WorkQueue[] nws = new WorkQueue[(n + 1) << 1];
1355          if (((ps = plock) & PL_LOCK) != 0 ||
1356              !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1357              ps = acquirePlock();
1358 +        if ((ws = workQueues) == null || ws.length == 0)
1359 +            workQueues = nws;
1360          int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1361 <        try {
1362 <            WorkQueue[] ws;
1363 <            if (w != null && (ws = workQueues) != null) {
1364 <                w.seed = s;
1365 <                int n = ws.length, m = n - 1;
1366 <                int r = (s << 1) | 1;               // use odd-numbered indices
1367 <                if (ws[r &= m] != null) {           // collision
1368 <                    int probes = 0;                 // step by approx half size
1369 <                    int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1370 <                    while (ws[r = (r + step) & m] != null) {
1371 <                        if (++probes >= n) {
1372 <                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1373 <                            m = n - 1;
1374 <                            probes = 0;
1361 >        if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1362 >            releasePlock(nps);
1363 >        long c; int u;
1364 >        if ((u = (int)((c = ctl) >>> 32)) < 0 && (int)c == 0) {
1365 >            long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
1366 >                             ((u + UAC_UNIT) & UAC_MASK)) << 32;
1367 >            if (U.compareAndSwapLong(this, CTL, c, nc))
1368 >                addWorker();
1369 >        }
1370 >
1371 >    }
1372 >
1373 >    //  Registering and deregistering workers
1374 >
1375 >    /**
1376 >     * Callback from ForkJoinWorkerThread to establish and record its
1377 >     * WorkQueue. To avoid scanning bias due to packing entries in
1378 >     * front of the workQueues array, we treat the array as a simple
1379 >     * power-of-two hash table using per-thread seed as hash,
1380 >     * expanding as needed.
1381 >     *
1382 >     * @param wt the worker thread
1383 >     */
1384 >    final void registerWorker(ForkJoinWorkerThread wt) {
1385 >        if (wt != null && wt.workQueue == null) {
1386 >            int s, ps;    // generate a rarely colliding candidate index seed
1387 >            do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
1388 >                                              s += SEED_INCREMENT) ||
1389 >                         s == 0); // skip 0
1390 >            WorkQueue w = new WorkQueue(this, wt, config >>> 16, s);
1391 >            if (((ps = plock) & PL_LOCK) != 0 ||
1392 >                !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1393 >                ps = acquirePlock();
1394 >            int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1395 >            try {
1396 >                WorkQueue[] ws;
1397 >                if ((ws = workQueues) != null && wt.workQueue == null) {
1398 >                    int n = ws.length, m = n - 1;
1399 >                    int r = (s << 1) | 1;           // use odd-numbered indices
1400 >                    if (ws[r &= m] != null) {       // collision
1401 >                        int probes = 0;             // step by approx half size
1402 >                        int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1403 >                        while (ws[r = (r + step) & m] != null) {
1404 >                            if (++probes >= n) {
1405 >                                workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1406 >                                m = n - 1;
1407 >                                probes = 0;
1408 >                            }
1409                          }
1410                      }
1411 +                    w.eventCount = w.poolIndex = r; // volatile write orders
1412 +                    wt.workQueue = ws[r] = w;
1413                  }
1414 <                w.eventCount = w.poolIndex = r;     // establish before recording
1415 <                ws[r] = w;
1414 >            } finally {
1415 >                if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1416 >                    releasePlock(nps);
1417              }
1360        } finally {
1361            if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1362                releasePlock(nps);
1418          }
1419      }
1420  
# Line 1376 | Line 1431 | public class ForkJoinPool extends Abstra
1431          WorkQueue w = null;
1432          if (wt != null && (w = wt.workQueue) != null) {
1433              int ps;
1379            collectStealCount(w);
1434              w.qlock = -1;                // ensure set
1435 +            long ns = w.nsteals, sc;     // collect steal count
1436 +            do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
1437 +                                               sc = stealCount, sc + ns));
1438              if (((ps = plock) & PL_LOCK) != 0 ||
1439                  !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1440                  ps = acquirePlock();
# Line 1402 | Line 1459 | public class ForkJoinPool extends Abstra
1459          if (!tryTerminate(false, false) && w != null) {
1460              w.cancelAll();                  // cancel remaining tasks
1461              if (w.array != null)            // suppress signal if never ran
1462 <                signalWork(null, 1);        // wake up or create replacement
1462 >                helpSignal(null, 0);        // wake up or create replacement
1463              if (ex == null)                 // help clean refs on way out
1464                  ForkJoinTask.helpExpungeStaleExceptions();
1465          }
# Line 1411 | Line 1468 | public class ForkJoinPool extends Abstra
1468              ForkJoinTask.rethrow(ex);
1469      }
1470  
1414    /**
1415     * Collect worker steal count into total. Called on termination
1416     * and upon int overflow of local count. (There is a possible race
1417     * in the latter case vs any caller of getStealCount, which can
1418     * make its results less accurate than usual.)
1419     */
1420    final void collectStealCount(WorkQueue w) {
1421        if (w != null) {
1422            long sc;
1423            int ns = w.nsteals;
1424            w.nsteals = 0; // handle overflow
1425            long steals = (ns >= 0) ? ns : 1L + (long)(Integer.MAX_VALUE);
1426            do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
1427                                               sc = stealCount, sc + steals));
1428        }
1429    }
1430
1471      // Submissions
1472  
1473      /**
# Line 1444 | Line 1484 | public class ForkJoinPool extends Abstra
1484              (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
1485              (q = ws[m & z.seed & SQMASK]) != null &&
1486              U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
1487 <            int s = q.top, n;
1488 <            if ((a = q.array) != null && a.length > (n = s + 1 - q.base)) {
1489 <                U.putObject(a, (long)(((a.length - 1) & s) << ASHIFT) + ABASE,
1450 <                            task);
1487 >            int b = q.base, s = q.top, n, an;
1488 >            if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
1489 >                U.putObject(a, (long)(((an - 1) & s) << ASHIFT) + ABASE, task);
1490                  q.top = s + 1;                     // push on to deque
1491                  q.qlock = 0;
1492 <                if (n <= 1)
1493 <                    signalWork(q, 1);
1492 >                if (n <= 2)
1493 >                    signalWork(q, 0);
1494                  return;
1495              }
1496              q.qlock = 0;
# Line 1462 | Line 1501 | public class ForkJoinPool extends Abstra
1501      /**
1502       * Full version of externalPush. This method is called, among
1503       * other times, upon the first submission of the first task to the
1504 <     * pool, so must perform secondary initialization: creating
1505 <     * workQueue array and setting plock to a valid value. It also
1506 <     * detects first submission by an external thread by looking up
1507 <     * its ThreadLocal, and creates a new shared queue if the one at
1508 <     * index if empty or contended. The lock bodies must be
1509 <     * exception-free (so no try/finally) so we optimistically
1510 <     * allocate new queues/arrays outside the locks and throw them
1472 <     * away if (very rarely) not needed. Note that the plock seq value
1473 <     * can eventually wrap around zero, but if so harmlessly fails to
1474 <     * reinitialize.
1504 >     * pool, so must perform secondary initialization (via
1505 >     * initWorkQueuesArray). It also detects first submission by an
1506 >     * external thread by looking up its ThreadLocal, and creates a
1507 >     * new shared queue if the one at index if empty or contended. The
1508 >     * lock body must be exception-free (so no try/finally) so we
1509 >     * optimistically allocate new queues outside the lock and throw
1510 >     * them away if (very rarely) not needed.
1511       */
1512      private void fullExternalPush(ForkJoinTask<?> task) {
1513 <        for (Submitter z = null;;) {
1514 <            WorkQueue[] ws; WorkQueue q; int ps, m, r, s;
1515 <            if ((ps = plock) < 0)
1513 >        int r = 0;
1514 >        for (Submitter z = submitters.get();;) {
1515 >            WorkQueue[] ws; WorkQueue q; int ps, m, k;
1516 >            if (z == null) {
1517 >                if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed,
1518 >                                        r += SEED_INCREMENT) && r != 0)
1519 >                    submitters.set(z = new Submitter(r));
1520 >            }
1521 >            else if (r == 0) {               // move to a different index
1522 >                r = z.seed;
1523 >                r ^= r << 13;                // same xorshift as WorkQueues
1524 >                r ^= r >>> 17;
1525 >                z.seed = r ^ (r << 5);
1526 >            }
1527 >            else if ((ps = plock) < 0)
1528                  throw new RejectedExecutionException();
1529 <            else if ((ws = workQueues) == null || (m = ws.length - 1) < 0) {
1530 <                int n = parallelism - 1; n |= n >>> 1; n |= n >>> 2;
1531 <                n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
1532 <                WorkQueue[] nws = new WorkQueue[(n + 1) << 1]; // power of two
1533 <                if ((ps & PL_LOCK) != 0 ||
1529 >            else if (ps == 0 || (ws = workQueues) == null ||
1530 >                     (m = ws.length - 1) < 0)
1531 >                initWorkQueuesArray();
1532 >            else if ((q = ws[k = r & m & SQMASK]) != null) {
1533 >                if (q.trySharedPush(task))
1534 >                    return;
1535 >                else
1536 >                    r = 0; // move on contention
1537 >            }
1538 >            else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
1539 >                q = new WorkQueue(this, null, SHARED_QUEUE, r);
1540 >                if (((ps = plock) & PL_LOCK) != 0 ||
1541                      !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1542                      ps = acquirePlock();
1543 <                if ((ws = workQueues) == null)
1544 <                    workQueues = nws;
1543 >                if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
1544 >                    ws[k] = q;
1545                  int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1546                  if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1547                      releasePlock(nps);
1548              }
1549 <            else if (z == null && (z = submitters.get()) == null) {
1550 <                if (U.compareAndSwapInt(this, INDEXSEED,
1496 <                                        s = indexSeed, s += SEED_INCREMENT) &&
1497 <                    s != 0) // skip 0
1498 <                    submitters.set(z = new Submitter(s));
1499 <            }
1500 <            else {
1501 <                int k = (r = z.seed) & m & SQMASK;
1502 <                if ((q = ws[k]) == null && (ps & PL_LOCK) == 0) {
1503 <                    (q = new WorkQueue(this, null, SHARED_QUEUE)).poolIndex = k;
1504 <                    if (((ps = plock) & PL_LOCK) != 0 ||
1505 <                        !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1506 <                        ps = acquirePlock();
1507 <                    WorkQueue w = null;
1508 <                    if ((ws = workQueues) != null && k < ws.length &&
1509 <                        (w = ws[k]) == null)
1510 <                        ws[k] = q;
1511 <                    else
1512 <                        q = w;
1513 <                    int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1514 <                    if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1515 <                        releasePlock(nps);
1516 <                }
1517 <                if (q != null && q.qlock == 0 && q.fullPush(task, false))
1518 <                    return;
1519 <                r ^= r << 13;                // same xorshift as WorkQueues
1520 <                r ^= r >>> 17;
1521 <                z.seed = r ^= r << 5;        // move to a different index
1522 <            }
1549 >            else
1550 >                r = 0; // try elsewhere while lock held
1551          }
1552      }
1553  
# Line 1540 | Line 1568 | public class ForkJoinPool extends Abstra
1568       * empty or all workers are active.
1569       *
1570       * @param q if non-null, the queue holding tasks to be signalled
1571 <     * @param signals the target number of signals.
1571 >     * @param signals the target number of signals (at least one --
1572 >     * if argument is zero also sets signallee hint if parked).
1573       */
1574      final void signalWork(WorkQueue q, int signals) {
1575 <        long c; int e, u, i; WorkQueue[] ws; WorkQueue w; Thread p;
1575 >        long c; int e, u, i, s; WorkQueue[] ws; WorkQueue w; Thread p;
1576          while ((u = (int)((c = ctl) >>> 32)) < 0) {
1577              if ((e = (int)c) > 0) {
1578                  if ((ws = workQueues) != null && ws.length > (i = e & SMASK) &&
# Line 1552 | Line 1581 | public class ForkJoinPool extends Abstra
1581                                 ((long)(u + UAC_UNIT) << 32));
1582                      if (U.compareAndSwapLong(this, CTL, c, nc)) {
1583                          w.eventCount = (e + E_SEQ) & E_MASK;
1584 <                        if ((p = w.parker) != null)
1584 >                        if ((p = w.parker) != null) {
1585 >                            if (q != null && signals == 0)
1586 >                                w.hint = q.poolIndex;
1587                              U.unpark(p);
1588 +                        }
1589                          if (--signals <= 0)
1590                              break;
1591                      }
1592 <                    else
1593 <                        signals = 1;
1562 <                    if ((q != null && q.queueSize() == 0))
1592 >                    if (q != null && (s = q.queueSize()) <= signals &&
1593 >                         (signals = s) <= 0)
1594                          break;
1595                  }
1596                  else
# Line 1569 | Line 1600 | public class ForkJoinPool extends Abstra
1600                  long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
1601                                   ((u + UAC_UNIT) & UAC_MASK)) << 32;
1602                  if (U.compareAndSwapLong(this, CTL, c, nc)) {
1603 <                    ForkJoinWorkerThread wt = null;
1573 <                    Throwable ex = null;
1574 <                    boolean started = false;
1575 <                    try {
1576 <                        ForkJoinWorkerThreadFactory fac;
1577 <                        if ((fac = factory) != null &&
1578 <                            (wt = fac.newThread(this)) != null) {
1579 <                            wt.start();
1580 <                            started = true;
1581 <                        }
1582 <                    } catch (Throwable rex) {
1583 <                        ex = rex;
1584 <                    }
1585 <                    if (!started)
1586 <                        deregisterWorker(wt, ex); // adjust counts on failure
1603 >                    addWorker();
1604                      break;
1605                  }
1606              }
# Line 1598 | Line 1615 | public class ForkJoinPool extends Abstra
1615       * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1616       */
1617      final void runWorker(WorkQueue w) {
1618 <        // initialize queue array in this thread
1619 <        w.array = new ForkJoinTask<?>[WorkQueue.INITIAL_QUEUE_CAPACITY];
1603 <        do { w.runTask(scan(w)); } while (w.qlock >= 0);
1618 >        if (w != null) // skip on initialization failure
1619 >            do { w.runTask(scan(w)); } while (w.qlock >= 0);
1620      }
1621  
1622      /**
# Line 1611 | Line 1627 | public class ForkJoinPool extends Abstra
1627       * contention, or state changes that indicate possible success on
1628       * re-invocation.
1629       *
1630 <     * The scan searches for tasks across a random permutation of
1631 <     * queues (starting at a random index and stepping by a random
1632 <     * relative prime, checking each at least once).  The scan
1633 <     * terminates upon either finding a non-empty queue, or completing
1634 <     * the sweep. If the worker is not inactivated, it takes and
1635 <     * returns a task from this queue. Otherwise, if not activated, it
1636 <     * signals workers (that may include itself) and returns so caller
1637 <     * can retry. Also returns for trtry if the worker array may have
1638 <     * changed during an empty scan.  On failure to find a task, we
1639 <     * take one of the following actions, after which the caller will
1640 <     * retry calling this method unless terminated.
1630 >     * The scan searches for tasks across queues (starting at a random
1631 >     * index, and relying on registerWorker to irregularly scatter
1632 >     * them within array to avoid bias), checking each at least twice.
1633 >     * The scan terminates upon either finding a non-empty queue, or
1634 >     * completing the sweep. If the worker is not inactivated, it
1635 >     * takes and returns a task from this queue. Otherwise, if not
1636 >     * activated, it signals workers (that may include itself) and
1637 >     * returns so caller can retry. Also returns for true if the
1638 >     * worker array may have changed during an empty scan.  On failure
1639 >     * to find a task, we take one of the following actions, after
1640 >     * which the caller will retry calling this method unless
1641 >     * terminated.
1642       *
1643       * * If pool is terminating, terminate the worker.
1644       *
# Line 1638 | Line 1655 | public class ForkJoinPool extends Abstra
1655       * @return a task or null if none found
1656       */
1657      private final ForkJoinTask<?> scan(WorkQueue w) {
1658 <        WorkQueue[] ws; WorkQueue q;           // first update random seed
1659 <        int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1660 <        int ps = plock, m;                     // volatile read order matters
1661 <        if ((ws = workQueues) != null && (m = ws.length - 1) > 0) {
1662 <            int ec = w.eventCount;             // ec is negative if inactive
1663 <            int step = (r >>> 16) | 1;         // relatively prime
1664 <            for (int j = (m + 1) << 2;  ; --j, r += step) {
1665 <                ForkJoinTask<?> t; ForkJoinTask<?>[] a; int b, n;
1666 <                if ((q = ws[r & m]) != null && (b = q.base) - q.top < 0 &&
1650 <                    (a = q.array) != null) {   // probably nonempty
1658 >        WorkQueue[] ws; int m, hint;
1659 >        int ps = plock;                          // read plock before ws
1660 >        if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1661 >            int ec = w.eventCount;               // ec is negative if inactive
1662 >            int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1663 >            for (int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN; ; --j) {
1664 >                WorkQueue q; ForkJoinTask<?>[] a; int b;
1665 >                if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 &&
1666 >                    (a = q.array) != null) {     // probably nonempty
1667                      int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1668 <                    t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1668 >                    ForkJoinTask<?> t = (ForkJoinTask<?>)
1669 >                        U.getObjectVolatile(a, i);
1670                      if (q.base == b && ec >= 0 && t != null &&
1671                          U.compareAndSwapObject(a, i, t, null)) {
1672 <                        if ((n = q.top - (q.base = b + 1)) > 0)
1673 <                            signalWork(q, n);
1674 <                        return t;              // taken
1675 <                    }
1676 <                    if (j < m || (ec < 0 && (ec = w.eventCount) < 0)) {
1677 <                        if ((n = q.queueSize() - 1) > 0)
1678 <                            signalWork(q, n);
1679 <                        break;                 // let caller retry after signal
1672 >                        if ((q.base = b + 1) - q.top < 0)
1673 >                            signalWork(q, 0);
1674 >                        return t;                // taken
1675 >                    }
1676 >                    else if (ec < 0 || j < m) {  // cannot take or cannot rescan
1677 >                        w.hint = q.poolIndex;    // use hint below
1678 >                        break;                   // let caller retry after signal
1679 >                    }
1680 >                }
1681 >                else if (j < 0) { // end of scan; in loop to simplify code
1682 >                    long c, sc; int e, ns;
1683 >                    if ((ns = w.nsteals) != 0) {
1684 >                        if (U.compareAndSwapLong(this, STEALCOUNT,
1685 >                                                 sc = stealCount, sc + ns))
1686 >                            w.nsteals = 0;       // collect steals
1687                      }
1688 <                }
1665 <                else if (j < 0) {              // end of scan
1666 <                    long c = ctl; int e;
1667 <                    if (plock != ps)           // incomplete sweep
1688 >                    else if (plock != ps)        // ws may have changed
1689                          break;
1690 <                    if ((e = (int)c) < 0)      // pool is terminating
1691 <                        w.qlock = -1;
1692 <                    else if (ec >= 0) {        // try to enqueue/inactivate
1690 >                    else if ((e = (int)(c = ctl)) < 0)
1691 >                        w.qlock = -1;            // pool is terminating
1692 >                    else if (ec >= 0) {          // try to enqueue/inactivate
1693                          long nc = ((long)ec |
1694                                     ((c - AC_UNIT) & (AC_MASK|TC_MASK)));
1695 <                        w.nextWait = e;
1696 <                        w.eventCount = ec | INT_SIGN; // mark as inactive
1695 >                        w.nextWait = e;          // link and mark inactive
1696 >                        w.hint = -1;             // use hint if set while parked
1697 >                        w.eventCount = ec | INT_SIGN;
1698                          if (ctl != c ||
1699                              !U.compareAndSwapLong(this, CTL, c, nc))
1700 <                            w.eventCount = ec; // unmark on CAS failure
1701 <                        else if ((int)(c >> AC_SHIFT) == 1 - parallelism)
1702 <                            idleAwaitWork(w, nc, c);  // quiescent
1700 >                            w.eventCount = ec;  // unmark on CAS failure
1701 >                        else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1702 >                            idleAwaitWork(w, nc, c);
1703                      }
1704 <                    else if (w.seed >= 0 && w.eventCount < 0) {
1704 >                    else if (w.eventCount < 0) { // block
1705                          Thread wt = Thread.currentThread();
1706 <                        Thread.interrupted();  // clear status
1706 >                        Thread.interrupted();    // clear status
1707                          U.putObject(wt, PARKBLOCKER, this);
1708 <                        w.parker = wt;         // emulate LockSupport.park
1709 <                        if (w.eventCount < 0)  // recheck
1708 >                        w.parker = wt;           // emulate LockSupport.park
1709 >                        if (w.eventCount < 0)    // recheck
1710                              U.park(false, 0L);
1711                          w.parker = null;
1712                          U.putObject(wt, PARKBLOCKER, null);
# Line 1692 | Line 1714 | public class ForkJoinPool extends Abstra
1714                      break;
1715                  }
1716              }
1717 +            if ((hint = w.hint) >= 0) {          // help signal
1718 +                WorkQueue[] vs; WorkQueue v; int k;
1719 +                w.hint = -1;                     // suppress resignal
1720 +                if ((vs = workQueues) != null && hint < vs.length &&
1721 +                    (v = vs[hint]) != null && (k = v.base - v.top) < -1)
1722 +                    signalWork(v, 1 - k);
1723 +            }
1724          }
1725          return null;
1726      }
# Line 1709 | Line 1738 | public class ForkJoinPool extends Abstra
1738       * @param prevCtl the ctl value to restore if thread is terminated
1739       */
1740      private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1741 <        if (w.eventCount < 0 &&
1742 <            (this == commonPool || !tryTerminate(false, false)) &&
1714 <            (int)prevCtl != 0) {
1741 >        if (w != null && w.eventCount < 0 &&
1742 >            !tryTerminate(false, false) && (int)prevCtl != 0) {
1743              int dc = -(short)(currentCtl >>> TC_SHIFT);
1744              long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1745              long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop
# Line 1730 | Line 1758 | public class ForkJoinPool extends Abstra
1758                      U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1759                      w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1760                      w.qlock = -1;   // shrink
1761 +                    w.hint = -1;    // suppress helping
1762                      break;
1763                  }
1764              }
# Line 1737 | Line 1766 | public class ForkJoinPool extends Abstra
1766      }
1767  
1768      /**
1769 <     * Scans through queues looking for work while joining a task;
1770 <     * if any are present, signals.
1769 >     * Scans through queues looking for work (optionally, while
1770 >     * joining a task); if any are present, signals. May return early
1771 >     * if more signalling is detectably unneeded.
1772       *
1773 <     * @param task to return early if done
1773 >     * @param task if non-null, return early if done
1774       * @param origin an index to start scan
1775       */
1776      final int helpSignal(ForkJoinTask<?> task, int origin) {
1777 <        WorkQueue[] ws; WorkQueue q; int m, n, s;
1778 <        if (task != null && (ws = workQueues) != null &&
1749 <            (m = ws.length - 1) >= 0) {
1777 >        WorkQueue[] ws; WorkQueue q; int m, n, s, u;
1778 >        if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1779              for (int i = 0; i <= m; ++i) {
1780 <                if ((s = task.status) < 0)
1780 >                if (task != null && (s = task.status) < 0)
1781                      return s;
1782                  if ((q = ws[(i + origin) & m]) != null &&
1783                      (n = q.queueSize()) > 0) {
1784                      signalWork(q, n);
1785 <                    if ((int)(ctl >> AC_SHIFT) >= 0)
1785 >                    if ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)
1786                          break;
1787                  }
1788              }
# Line 1792 | Line 1821 | public class ForkJoinPool extends Abstra
1821                      }
1822                      if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
1823                          break restart;              // shutting down
1824 <                    if ((v = ws[h = (j.stealHint | 1) & m]) == null ||
1824 >                    if ((v = ws[h = (j.hint | 1) & m]) == null ||
1825                          v.currentSteal != subtask) {
1826                          for (int origin = h;;) {    // find stealer
1827                              if (((h = (h + 2) & m) & 15) == 1 &&
# Line 1800 | Line 1829 | public class ForkJoinPool extends Abstra
1829                                  continue restart;   // occasional staleness check
1830                              if ((v = ws[h]) != null &&
1831                                  v.currentSteal == subtask) {
1832 <                                j.stealHint = h;    // save hint
1832 >                                j.hint = h;        // save hint
1833                                  break;
1834                              }
1835                              if (h == origin)
# Line 1849 | Line 1878 | public class ForkJoinPool extends Abstra
1878  
1879      /**
1880       * Analog of tryHelpStealer for CountedCompleters. Tries to steal
1881 <     * and run tasks within the target's computation
1881 >     * and run tasks within the target's computation.
1882       *
1883       * @param task the task to join
1884       * @param mode if shared, exit upon completing any task
# Line 1857 | Line 1886 | public class ForkJoinPool extends Abstra
1886       *
1887       */
1888      private int helpComplete(ForkJoinTask<?> task, int mode) {
1889 <        WorkQueue[] ws; WorkQueue q; int m, n, s;
1889 >        WorkQueue[] ws; WorkQueue q; int m, n, s, u;
1890          if (task != null && (ws = workQueues) != null &&
1891              (m = ws.length - 1) >= 0) {
1892              for (int j = 1, origin = j;;) {
# Line 1865 | Line 1894 | public class ForkJoinPool extends Abstra
1894                      return s;
1895                  if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
1896                      origin = j;
1897 <                    if (mode == SHARED_QUEUE && (int)(ctl >> AC_SHIFT) >= 0)
1897 >                    if (mode == SHARED_QUEUE &&
1898 >                        ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0))
1899                          break;
1900                  }
1901                  else if ((j = (j + 2) & m) == origin)
# Line 1883 | Line 1913 | public class ForkJoinPool extends Abstra
1913       * may become starved.
1914       */
1915      final boolean tryCompensate() {
1916 <        int pc = parallelism, e, u, i, tc; long c;
1916 >        int pc = config & SMASK, e, i, tc; long c;
1917          WorkQueue[] ws; WorkQueue w; Thread p;
1918 <        if ((e = (int)(c = ctl)) >= 0 && (ws = workQueues) != null) {
1918 >        if ((ws = workQueues) != null && (e = (int)(c = ctl)) >= 0) {
1919              if (e != 0 && (i = e & SMASK) < ws.length &&
1920                  (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
1921                  long nc = ((long)(w.nextWait & E_MASK) |
# Line 1897 | Line 1927 | public class ForkJoinPool extends Abstra
1927                      return true;   // replace with idle worker
1928                  }
1929              }
1930 <            else if ((short)((u = (int)(c >>> 32)) >>> UTC_SHIFT) >= 0 &&
1931 <                     (u >> UAC_SHIFT) + pc > 1) {
1930 >            else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 &&
1931 >                     (int)(c >> AC_SHIFT) + pc > 1) {
1932                  long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
1933                  if (U.compareAndSwapLong(this, CTL, c, nc))
1934 <                    return true;    // no compensation
1934 >                    return true;   // no compensation
1935              }
1936 <            else if ((tc = u + pc) < MAX_CAP) {
1936 >            else if (tc + pc < MAX_CAP) {
1937                  long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1938                  if (U.compareAndSwapLong(this, CTL, c, nc)) {
1939 <                    Throwable ex = null;
1940 <                    ForkJoinWorkerThread wt = null;
1911 <                    try {
1912 <                        ForkJoinWorkerThreadFactory fac;
1913 <                        if ((fac = factory) != null &&
1914 <                            (wt = fac.newThread(this)) != null) {
1915 <                            wt.start();
1916 <                            return true;
1917 <                        }
1918 <                    } catch (Throwable rex) {
1919 <                        ex = rex;
1920 <                    }
1921 <                    deregisterWorker(wt, ex); // adjust counts etc
1939 >                    addWorker();
1940 >                    return true;
1941                  }
1942              }
1943          }
# Line 1944 | Line 1963 | public class ForkJoinPool extends Abstra
1963                  (s = helpSignal(task, joiner.poolIndex)) >= 0 &&
1964                  (task instanceof CountedCompleter))
1965                  s = helpComplete(task, LIFO_QUEUE);
1966 +            int k = 0; // to perform pre-block yield for politeness
1967              while (s >= 0 && (s = task.status) >= 0) {
1968                  if ((joiner.queueSize() > 0 ||           // try helping
1969                       (s = tryHelpStealer(joiner, task)) == 0) &&
1970 <                    (s = task.status) >= 0 && tryCompensate()) {
1971 <                    if (task.trySetSignal() && (s = task.status) >= 0) {
1972 <                        synchronized (task) {
1973 <                            if (task.status >= 0) {
1974 <                                try {                // see ForkJoinTask
1975 <                                    task.wait();     //  for explanation
1976 <                                } catch (InterruptedException ie) {
1970 >                    (s = task.status) >= 0) {
1971 >                    if (k < 3) {
1972 >                        if (++k < 3)
1973 >                            s = helpSignal(task, joiner.poolIndex);
1974 >                        else
1975 >                            Thread.yield();
1976 >                    }
1977 >                    else if (!tryCompensate())
1978 >                        k = 0;
1979 >                    else {
1980 >                        if (task.trySetSignal() && (s = task.status) >= 0) {
1981 >                            synchronized (task) {
1982 >                                if (task.status >= 0) {
1983 >                                    try {                // see ForkJoinTask
1984 >                                        task.wait();     //  for explanation
1985 >                                    } catch (InterruptedException ie) {
1986 >                                    }
1987                                  }
1988 +                                else
1989 +                                    task.notifyAll();
1990                              }
1959                            else
1960                                task.notifyAll();
1991                          }
1992 +                        long c;                          // re-activate
1993 +                        do {} while (!U.compareAndSwapLong
1994 +                                     (this, CTL, c = ctl, c + AC_UNIT));
1995                      }
1963                    long c;                          // re-activate
1964                    do {} while (!U.compareAndSwapLong
1965                                 (this, CTL, c = ctl, c + AC_UNIT));
1996                  }
1997              }
1998              joiner.currentJoin = prevJoin;
# Line 2006 | Line 2036 | public class ForkJoinPool extends Abstra
2036       * @param r a (random) seed for scanning
2037       */
2038      private WorkQueue findNonEmptyStealQueue(int r) {
2009        int step = (r >>> 16) | 1;
2039          for (WorkQueue[] ws;;) {
2040 <            int ps = plock, m;
2040 >            int ps = plock, m, n;
2041              if ((ws = workQueues) == null || (m = ws.length - 1) < 1)
2042                  return null;
2043 <            for (int j = (m + 1) << 2; ; r += step) {
2044 <                WorkQueue q = ws[((r << 1) | 1) & m];
2045 <                if (q != null && q.queueSize() > 0)
2043 >            for (int j = (m + 1) << 2; ;) {
2044 >                WorkQueue q = ws[(((r + j) << 1) | 1) & m];
2045 >                if (q != null && (n = q.queueSize()) > 0) {
2046 >                    if (n > 1)
2047 >                        signalWork(q, 0);
2048                      return q;
2049 +                }
2050                  else if (--j < 0) {
2051                      if (plock == ps)
2052                          return null;
# Line 2057 | Line 2089 | public class ForkJoinPool extends Abstra
2089                  }
2090                  else
2091                      c = ctl;        // re-increment on exit
2092 <                if ((int)(c >> AC_SHIFT) + parallelism == 0) {
2092 >                if ((int)(c >> AC_SHIFT) + (config & SMASK) == 0) {
2093                      do {} while (!U.compareAndSwapLong
2094                                   (this, CTL, c = ctl, c + AC_UNIT));
2095                      break;
# Line 2132 | Line 2164 | public class ForkJoinPool extends Abstra
2164      static int getSurplusQueuedTaskCount() {
2165          Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2166          if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
2167 <            int b = (q = (wt = (ForkJoinWorkerThread)t).workQueue).base;
2168 <            int p = (pool = wt.pool).parallelism;
2167 >            int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK;
2168 >            int n = (q = wt.workQueue).top - q.base;
2169              int a = (int)(pool.ctl >> AC_SHIFT) + p;
2170 <            return q.top - b - (a > (p >>>= 1) ? 0 :
2171 <                                a > (p >>>= 1) ? 1 :
2172 <                                a > (p >>>= 1) ? 2 :
2173 <                                a > (p >>>= 1) ? 4 :
2174 <                                8);
2170 >            return n - (a > (p >>>= 1) ? 0 :
2171 >                        a > (p >>>= 1) ? 1 :
2172 >                        a > (p >>>= 1) ? 2 :
2173 >                        a > (p >>>= 1) ? 4 :
2174 >                        8);
2175          }
2176          return 0;
2177      }
# Line 2165 | Line 2197 | public class ForkJoinPool extends Abstra
2197              return false;
2198          for (long c;;) {
2199              if (((c = ctl) & STOP_BIT) != 0) {      // already terminating
2200 <                if ((short)(c >>> TC_SHIFT) == -parallelism) {
2200 >                if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
2201                      synchronized (this) {
2202                          notifyAll();                // signal when 0 workers
2203                      }
# Line 2184 | Line 2216 | public class ForkJoinPool extends Abstra
2216                      releasePlock(nps);
2217              }
2218              if (!now) {                             // check if idle & no tasks
2219 <                if ((int)(c >> AC_SHIFT) != -parallelism ||
2219 >                if ((int)(c >> AC_SHIFT) != -(config & SMASK) ||
2220                      hasQueuedSubmissions())
2221                      return false;
2222                  // Check for unqueued inactive workers. One pass suffices.
# Line 2286 | Line 2318 | public class ForkJoinPool extends Abstra
2318          if (q != null && (a = q.array) != null && (m = (a.length - 1)) >= 0 &&
2319              root != null && root.status >= 0) {
2320              for (;;) {
2321 <                int s; Object o; CountedCompleter<?> task = null;
2321 >                int s, u; Object o; CountedCompleter<?> task = null;
2322                  if ((s = q.top) - q.base > 0) {
2323                      long j = ((m & (s - 1)) << ASHIFT) + ABASE;
2324                      if ((o = U.getObject(a, j)) != null &&
# Line 2309 | Line 2341 | public class ForkJoinPool extends Abstra
2341                  }
2342                  if (task != null)
2343                      task.doExec();
2344 <                if (root.status < 0 || (int)(ctl >> AC_SHIFT) >= 0)
2344 >                if (root.status < 0 ||
2345 >                    (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)
2346                      break;
2347                  if (task == null) {
2348                      if (helpSignal(root, q.poolIndex) >= 0)
# Line 2328 | Line 2361 | public class ForkJoinPool extends Abstra
2361          // Some hard-to-avoid overlap with tryExternalUnpush
2362          ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; Submitter z;
2363          ForkJoinTask<?>[] a;  int m, s, n; long j;
2364 <        if (t != null && t.status >= 0 &&
2364 >        if (t != null &&
2365              (z = submitters.get()) != null &&
2366              (p = commonPool) != null &&
2367              (ws = p.workQueues) != null &&
2368              (m = ws.length - 1) >= 0 &&
2369              (q = ws[m & z.seed & SQMASK]) != null &&
2370 <            (a = q.array) != null) {
2370 >            (a = q.array) != null &&
2371 >            t.status >= 0) {
2372              if ((s = q.top) != q.base &&
2373                  U.getObjectVolatile
2374                  (a, j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE) == t &&
# Line 2362 | Line 2396 | public class ForkJoinPool extends Abstra
2396       */
2397      static void externalHelpQuiescePool() {
2398          ForkJoinPool p; ForkJoinTask<?> t; WorkQueue q; int b;
2365        int r = ThreadLocalRandom.current().nextInt();
2399          if ((p = commonPool) != null &&
2400 <            (q = p.findNonEmptyStealQueue(r)) != null &&
2400 >            (q = p.findNonEmptyStealQueue(1)) != null &&
2401              (b = q.base) - q.top < 0 &&
2402              (t = q.pollAt(b)) != null)
2403              t.doExec();
# Line 2441 | Line 2474 | public class ForkJoinPool extends Abstra
2474              throw new NullPointerException();
2475          if (parallelism <= 0 || parallelism > MAX_CAP)
2476              throw new IllegalArgumentException();
2444        this.parallelism = parallelism;
2477          this.factory = factory;
2478          this.ueh = handler;
2479 <        this.localMode = asyncMode ? FIFO_QUEUE : LIFO_QUEUE;
2479 >        this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
2480          long np = (long)(-parallelism); // offset ctl counts
2481          this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2482          int pn = nextPoolId();
# Line 2461 | Line 2493 | public class ForkJoinPool extends Abstra
2493      ForkJoinPool(int parallelism, long ctl,
2494                   ForkJoinWorkerThreadFactory factory,
2495                   Thread.UncaughtExceptionHandler handler) {
2496 <        this.parallelism = parallelism;
2496 >        this.config = parallelism;
2497          this.ctl = ctl;
2498          this.factory = factory;
2499          this.ueh = handler;
2468        this.localMode = LIFO_QUEUE;
2500          this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2501      }
2502  
# Line 2475 | Line 2506 | public class ForkJoinPool extends Abstra
2506       * @return the common pool instance
2507       */
2508      public static ForkJoinPool commonPool() {
2509 <        return commonPool; // cannot be null (if so, a static init error)
2509 >        // assert commonPool != null : "static init error";
2510 >        return commonPool;
2511      }
2512  
2513      // Execution methods
# Line 2647 | Line 2679 | public class ForkJoinPool extends Abstra
2679       * @return the targeted parallelism level of this pool
2680       */
2681      public int getParallelism() {
2682 <        return parallelism;
2682 >        return config & SMASK;
2683      }
2684  
2685      /**
# Line 2668 | Line 2700 | public class ForkJoinPool extends Abstra
2700       * @return the number of worker threads
2701       */
2702      public int getPoolSize() {
2703 <        return parallelism + (short)(ctl >>> TC_SHIFT);
2703 >        return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2704      }
2705  
2706      /**
# Line 2678 | Line 2710 | public class ForkJoinPool extends Abstra
2710       * @return {@code true} if this pool uses async mode
2711       */
2712      public boolean getAsyncMode() {
2713 <        return localMode != 0;
2713 >        return (config >>> 16) == FIFO_QUEUE;
2714      }
2715  
2716      /**
# Line 2709 | Line 2741 | public class ForkJoinPool extends Abstra
2741       * @return the number of active threads
2742       */
2743      public int getActiveThreadCount() {
2744 <        int r = parallelism + (int)(ctl >> AC_SHIFT);
2744 >        int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
2745          return (r <= 0) ? 0 : r; // suppress momentarily negative values
2746      }
2747  
# Line 2725 | Line 2757 | public class ForkJoinPool extends Abstra
2757       * @return {@code true} if all threads are currently idle
2758       */
2759      public boolean isQuiescent() {
2760 <        return (int)(ctl >> AC_SHIFT) + parallelism == 0;
2760 >        return (int)(ctl >> AC_SHIFT) + (config & SMASK) == 0;
2761      }
2762  
2763      /**
# Line 2888 | Line 2920 | public class ForkJoinPool extends Abstra
2920                  }
2921              }
2922          }
2923 <        int pc = parallelism;
2923 >        int pc = (config & SMASK);
2924          int tc = pc + (short)(c >>> TC_SHIFT);
2925          int ac = pc + (int)(c >> AC_SHIFT);
2926          if (ac < 0) // ignore transient negative
# Line 2961 | Line 2993 | public class ForkJoinPool extends Abstra
2993      public boolean isTerminated() {
2994          long c = ctl;
2995          return ((c & STOP_BIT) != 0L &&
2996 <                (short)(c >>> TC_SHIFT) == -parallelism);
2996 >                (short)(c >>> TC_SHIFT) == -(config & SMASK));
2997      }
2998  
2999      /**
# Line 2980 | Line 3012 | public class ForkJoinPool extends Abstra
3012      public boolean isTerminating() {
3013          long c = ctl;
3014          return ((c & STOP_BIT) != 0L &&
3015 <                (short)(c >>> TC_SHIFT) != -parallelism);
3015 >                (short)(c >>> TC_SHIFT) != -(config & SMASK));
3016      }
3017  
3018      /**
# Line 3124 | Line 3156 | public class ForkJoinPool extends Abstra
3156          if (t instanceof ForkJoinWorkerThread) {
3157              ForkJoinPool p = ((ForkJoinWorkerThread)t).pool;
3158              while (!blocker.isReleasable()) { // variant of helpSignal
3159 <                WorkQueue[] ws; WorkQueue q; int m, n;
3159 >                WorkQueue[] ws; WorkQueue q; int m, n, u;
3160                  if ((ws = p.workQueues) != null && (m = ws.length - 1) >= 0) {
3161                      for (int i = 0; i <= m; ++i) {
3162                          if (blocker.isReleasable())
3163                              return;
3164                          if ((q = ws[i]) != null && (n = q.queueSize()) > 0) {
3165                              p.signalWork(q, n);
3166 <                            if ((int)(p.ctl >> AC_SHIFT) >= 0)
3166 >                            if ((u = (int)(p.ctl >>> 32)) >= 0 ||
3167 >                                (u >> UAC_SHIFT) >= 0)
3168                                  break;
3169                          }
3170                      }
# Line 3177 | Line 3210 | public class ForkJoinPool extends Abstra
3210      private static final long QLOCK;
3211  
3212      static {
3180        // Establish common pool parameters
3181        // TBD: limit or report ignored exceptions?
3182
3183        int par = 0;
3184        ForkJoinWorkerThreadFactory fac = null;
3185        Thread.UncaughtExceptionHandler handler = null;
3186        try {
3187            String pp = System.getProperty(propPrefix + "parallelism");
3188            String hp = System.getProperty(propPrefix + "exceptionHandler");
3189            String fp = System.getProperty(propPrefix + "threadFactory");
3190            if (fp != null)
3191                fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
3192                       getSystemClassLoader().loadClass(fp).newInstance());
3193            if (hp != null)
3194                handler = ((Thread.UncaughtExceptionHandler)ClassLoader.
3195                           getSystemClassLoader().loadClass(hp).newInstance());
3196            if (pp != null)
3197                par = Integer.parseInt(pp);
3198        } catch (Exception ignore) {
3199        }
3200
3213          int s; // initialize field offsets for CAS etc
3214          try {
3215              U = getUnsafe();
# Line 3226 | Line 3238 | public class ForkJoinPool extends Abstra
3238          if ((s & (s-1)) != 0)
3239              throw new Error("data type scale not a power of two");
3240  
3241 +        submitters = new ThreadLocal<Submitter>();
3242 +        ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory =
3243 +            new DefaultForkJoinWorkerThreadFactory();
3244          /*
3245 <         * For extra caution, computations to set up pool state are
3246 <         * here; the constructor just assigns these values to fields.
3245 >         * Establish common pool parameters.  For extra caution,
3246 >         * computations to set up common pool state are here; the
3247 >         * constructor just assigns these values to fields.
3248           */
3249 <        ForkJoinWorkerThreadFactory defaultFac =
3250 <            defaultForkJoinWorkerThreadFactory =
3251 <            new DefaultForkJoinWorkerThreadFactory();
3252 <        if (fac == null)
3253 <            fac = defaultFac;
3249 >
3250 >        int par = 0;
3251 >        Thread.UncaughtExceptionHandler handler = null;
3252 >        try {  // TBD: limit or report ignored exceptions?
3253 >            String pp = System.getProperty
3254 >                ("java.util.concurrent.ForkJoinPool.common.parallelism");
3255 >            String hp = System.getProperty
3256 >                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3257 >            String fp = System.getProperty
3258 >                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3259 >            if (fp != null)
3260 >                fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
3261 >                       getSystemClassLoader().loadClass(fp).newInstance());
3262 >            if (hp != null)
3263 >                handler = ((Thread.UncaughtExceptionHandler)ClassLoader.
3264 >                           getSystemClassLoader().loadClass(hp).newInstance());
3265 >            if (pp != null)
3266 >                par = Integer.parseInt(pp);
3267 >        } catch (Exception ignore) {
3268 >        }
3269 >
3270          if (par <= 0)
3271              par = Runtime.getRuntime().availableProcessors();
3272          if (par > MAX_CAP)
3273              par = MAX_CAP;
3274 +        commonPoolParallelism = par;
3275          long np = (long)(-par); // precompute initial ctl value
3276          long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
3277  
3245        commonPoolParallelism = par;
3278          commonPool = new ForkJoinPool(par, ct, fac, handler);
3279          modifyThreadPermission = new RuntimePermission("modifyThread");
3248        submitters = new ThreadLocal<Submitter>();
3280      }
3281  
3282      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines