ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.145 by jsr166, Mon Nov 19 01:04:24 2012 UTC vs.
Revision 1.146 by dl, Mon Nov 19 18:12:42 2012 UTC

# Line 37 | Line 37 | import java.util.concurrent.TimeUnit;
37   * ForkJoinPool}s may also be appropriate for use with event-style
38   * tasks that are never joined.
39   *
40 < * <p>A static {@link #commonPool} is available and appropriate for
40 > * <p>A static {@link #commonPool()} is available and appropriate for
41   * most applications. The common pool is used by any ForkJoinTask that
42   * is not explicitly submitted to a specified pool. Using the common
43   * pool normally reduces resource usage (its threads are slowly
# Line 103 | Line 103 | import java.util.concurrent.TimeUnit;
103   * java.util.concurrent.ForkJoinPool.common}: {@code parallelism} --
104   * an integer greater than zero, {@code threadFactory} -- the class
105   * name of a {@link ForkJoinWorkerThreadFactory}, and {@code
106 + <<<<<<< ForkJoinPool.java
107 + * exceptionHandler} -- the class name of a {@code
108 + =======
109   * exceptionHandler} -- the class name of a {@link
110   * java.lang.Thread.UncaughtExceptionHandler
111 + >>>>>>> 1.111
112   * Thread.UncaughtExceptionHandler}. Upon any error in establishing
113   * these settings, default parameters are used.
114   *
# Line 239 | Line 243 | public class ForkJoinPool extends Abstra
243       * enable shutdown.  When used as a lock, it is normally only very
244       * briefly held, so is nearly always available after at most a
245       * brief spin, but we use a monitor-based backup strategy to
246 <     * blocking when needed.
246 >     * block when needed.
247       *
248       * Recording WorkQueues.  WorkQueues are recorded in the
249       * "workQueues" array that is created upon first use and expanded
# Line 248 | Line 252 | public class ForkJoinPool extends Abstra
252       * by a lock but the array is otherwise concurrently readable, and
253       * accessed directly.  To simplify index-based operations, the
254       * array size is always a power of two, and all readers must
255 <     * tolerate null slots. Worker queues are at odd indices Shared
255 >     * tolerate null slots. Worker queues are at odd indices. Shared
256       * (submission) queues are at even indices, up to a maximum of 64
257       * slots, to limit growth even if array needs to expand to add
258       * more workers. Grouping them together in this way simplifies and
# Line 318 | Line 322 | public class ForkJoinPool extends Abstra
322       * general, pools will be over-signalled.  When a submission is
323       * added or another worker adds a task to a queue that is
324       * apparently empty, they signal waiting workers (or trigger
325 <     * creation of new ones if fewer than the given parallelism level
326 <     * -- see signalWork).  These primary signals are buttressed by
327 <     * signals whenever other threads scan for work or do not have a
328 <     * task to process. On most platforms, signalling (unpark)
329 <     * overhead time is noticeably long, and the time between
330 <     * signalling a thread and it actually making progress can be very
331 <     * noticeably long, so it is worth offloading these delays from
332 <     * critical paths as much as possible.
325 >     * creation of new ones if fewer than the given parallelism
326 >     * level).  These primary signals are buttressed by signals
327 >     * whenever other threads scan for work or do not have a task to
328 >     * process (including the case of leaving a hint to unparked
329 >     * threads to help signal others upon wakeup).  On most platforms,
330 >     * signalling (unpark) overhead time is noticeably long, and the
331 >     * time between signalling a thread and it actually making
332 >     * progress can be very noticeably long, so it is worth offloading
333 >     * these delays from critical paths as much as possible.
334       *
335       * Trimming workers. To release resources after periods of lack of
336       * use, a worker starting to wait when the pool is quiescent will
# Line 393 | Line 398 | public class ForkJoinPool extends Abstra
398       * steals, rather than use per-task bookkeeping.  This sometimes
399       * requires a linear scan of workQueues array to locate stealers,
400       * but often doesn't because stealers leave hints (that may become
401 <     * stale/wrong) of where to locate them.  A stealHint is only a
402 <     * hint because a worker might have had multiple steals and the
403 <     * hint records only one of them (usually the most current).
404 <     * Hinting isolates cost to when it is needed, rather than adding
405 <     * to per-task overhead.  (2) It is "shallow", ignoring nesting
406 <     * and potentially cyclic mutual steals.  (3) It is intentionally
401 >     * stale/wrong) of where to locate them.  It is only a hint
402 >     * because a worker might have had multiple steals and the hint
403 >     * records only one of them (usually the most current).  Hinting
404 >     * isolates cost to when it is needed, rather than adding to
405 >     * per-task overhead.  (2) It is "shallow", ignoring nesting and
406 >     * potentially cyclic mutual steals.  (3) It is intentionally
407       * racy: field currentJoin is updated only while actively joining,
408       * which means that we miss links in the chain during long-lived
409       * tasks, GC stalls etc (which is OK since blocking in such cases
# Line 525 | Line 530 | public class ForkJoinPool extends Abstra
530       * Default ForkJoinWorkerThreadFactory implementation; creates a
531       * new ForkJoinWorkerThread.
532       */
533 <    static class DefaultForkJoinWorkerThreadFactory
533 >    static final class DefaultForkJoinWorkerThreadFactory
534          implements ForkJoinWorkerThreadFactory {
535 <        public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
535 >        public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
536              return new ForkJoinWorkerThread(pool);
537          }
538      }
# Line 598 | Line 603 | public class ForkJoinPool extends Abstra
603       * trades off slightly slower average field access for the sake of
604       * avoiding really bad worst-case access. (Until better JVM
605       * support is in place, this padding is dependent on transient
606 <     * properties of JVM field layout rules.)  We also take care in
602 <     * allocating, sizing and resizing the array. Non-shared queue
603 <     * arrays are initialized by workers before use. Others are
604 <     * allocated on first use.
606 >     * properties of JVM field layout rules.)
607       */
608      static final class WorkQueue {
609          /**
# Line 627 | Line 629 | public class ForkJoinPool extends Abstra
629          int seed;                  // for random scanning; initialize nonzero
630          volatile int eventCount;   // encoded inactivation count; < 0 if inactive
631          int nextWait;              // encoded record of next event waiter
632 <        final int mode;            // lifo, fifo, or shared
631 <        int nsteals;               // cumulative number of steals
632 >        int hint;                  // steal or signal hint (index)
633          int poolIndex;             // index of this queue in pool (or 0)
634 <        int stealHint;             // index of most recent known stealer
634 >        final int mode;            // 0: lifo, > 0: fifo, < 0: shared
635 >        int nsteals;               // number of steals
636          volatile int qlock;        // 1: locked, -1: terminate; else 0
637          volatile int base;         // index of next slot for poll
638          int top;                   // index of next slot for push
# Line 640 | Line 642 | public class ForkJoinPool extends Abstra
642          volatile Thread parker;    // == owner during call to park; else null
643          volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
644          ForkJoinTask<?> currentSteal; // current non-local task being executed
645 +
646          // Heuristic padding to ameliorate unfortunate memory placements
647          Object p00, p01, p02, p03, p04, p05, p06, p07;
648 <        Object p08, p09, p0a, p0b, p0c, p0d, p0e;
648 >        Object p08, p09, p0a, p0b, p0c;
649  
650 <        WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode) {
651 <            this.mode = mode;
650 >        WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode,
651 >                  int seed) {
652 >            this.array = new ForkJoinTask<?>[WorkQueue.INITIAL_QUEUE_CAPACITY];
653              this.pool = pool;
654              this.owner = owner;
655 <            // Place indices in the center of array (that is not yet allocated)
655 >            this.mode = mode;
656 >            this.seed = seed;
657 >            // Place indices in the center of array
658              base = top = INITIAL_QUEUE_CAPACITY >>> 1;
659          }
660  
# Line 661 | Line 667 | public class ForkJoinPool extends Abstra
667           * @throw RejectedExecutionException if array cannot be resized
668           */
669          final void push(ForkJoinTask<?> task) {
670 <            ForkJoinPool p; ForkJoinTask<?>[] a;
671 <            int s = top, n;
672 <            if ((a = array) != null && a.length > (n = s + 1 - base)) {
670 >            ForkJoinTask<?>[] a; ForkJoinPool p;
671 >            int s = top, m, n;
672 >            if ((a = array) != null) {    // ignore if queue removed
673                  U.putOrderedObject
674 <                    (a, (((a.length - 1) & s) << ASHIFT) + ABASE, task);
675 <                top = s + 1;
676 <                if (n <= 1 && (p = pool) != null)
677 <                    p.signalWork(this, 1);
674 >                    (a, (((m = a.length - 1) & s) << ASHIFT) + ABASE, task);
675 >                if ((n = (top = s + 1) - base) <= 1) {
676 >                    if ((p = pool) != null)
677 >                        p.signalWork(this, 0);
678 >                }
679 >                else if (n >= m)
680 >                    growArray();
681              }
673            else
674                fullPush(task, true);
682          }
683  
684          /**
685           * Pushes a task if lock is free and array is either big
686 <         * enough or can be resized to be big enough. Note: a
680 <         * specialization of a common fast path of this method is in
681 <         * ForkJoinPool.externalPush. When called from a FJWT queue,
682 <         * this can fail only if the pool has been shut down or
683 <         * an out of memory error.
686 >         * enough or can be resized to be big enough.
687           *
688           * @param task the task. Caller must ensure non-null.
689 <         * @param owned if true, throw RJE on failure
689 >         * @return true if submitted
690           */
691 <        final boolean fullPush(ForkJoinTask<?> task, boolean owned) {
692 <            ForkJoinPool p; ForkJoinTask<?>[] a;
693 <            if (owned) {
694 <                if (qlock < 0) // must be shutting down
695 <                    throw new RejectedExecutionException();
696 <            }
697 <            else if (!U.compareAndSwapInt(this, QLOCK, 0, 1))
698 <                return false;
699 <            try {
700 <                int s = top, oldLen, len;
701 <                if ((a = array) == null)
702 <                    a = array = new ForkJoinTask<?>[len=INITIAL_QUEUE_CAPACITY];
703 <                else if ((oldLen = a.length) > s + 1 - base)
704 <                    len = oldLen;
705 <                else if ((len = oldLen << 1) > MAXIMUM_QUEUE_CAPACITY)
706 <                    throw new RejectedExecutionException("Capacity exceeded");
707 <                else {
708 <                    int oldMask, b;
709 <                    ForkJoinTask<?>[] oldA = a;
710 <                    a = array = new ForkJoinTask<?>[len];
711 <                    if ((oldMask = oldLen - 1) >= 0 && s - (b = base) > 0) {
712 <                        int mask = len - 1;
713 <                        do {
714 <                            ForkJoinTask<?> x;
715 <                            int oldj = ((b & oldMask) << ASHIFT) + ABASE;
716 <                            int j    = ((b &    mask) << ASHIFT) + ABASE;
717 <                            x = (ForkJoinTask<?>)
718 <                                U.getObjectVolatile(oldA, oldj);
719 <                            if (x != null &&
720 <                                U.compareAndSwapObject(oldA, oldj, x, null))
721 <                                U.putObjectVolatile(a, j, x);
722 <                        } while (++b != s);
723 <                    }
724 <                }
725 <                U.putOrderedObject
726 <                    (a, (((len - 1) & s) << ASHIFT) + ABASE, task);
727 <                top = s + 1;
728 <            } finally {
729 <                if (!owned)
730 <                    qlock = 0;
691 >        final boolean trySharedPush(ForkJoinTask<?> task) {
692 >            boolean submitted = false;
693 >            if (qlock == 0 && U.compareAndSwapInt(this, QLOCK, 0, 1)) {
694 >                ForkJoinTask<?>[] a = array;  ForkJoinPool p;
695 >                int s = top;
696 >                try {
697 >                    if ((a != null && a.length > s + 1 - base) ||
698 >                        (a = growArray()) != null) {   // must presize
699 >                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
700 >                        U.putOrderedObject(a, j, task);
701 >                        top = s + 1;
702 >                        submitted = true;
703 >                    }
704 >                } finally {
705 >                    qlock = 0;                         // unlock
706 >                }
707 >                if (submitted && (p = pool) != null)
708 >                    p.signalWork(this, 0);
709 >            }
710 >            return submitted;
711 >        }
712 >
713 >       /**
714 >         * Initializes or doubles the capacity of array. Call either
715 >         * by owner or with lock held -- it is OK for base, but not
716 >         * top, to move while resizings are in progress.
717 >         */
718 >        final ForkJoinTask<?>[] growArray() {
719 >            ForkJoinTask<?>[] oldA = array;
720 >            int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
721 >            if (size > MAXIMUM_QUEUE_CAPACITY)
722 >                throw new RejectedExecutionException("Queue capacity exceeded");
723 >            int oldMask, t, b;
724 >            ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
725 >            if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
726 >                (t = top) - (b = base) > 0) {
727 >                int mask = size - 1;
728 >                do {
729 >                    ForkJoinTask<?> x;
730 >                    int oldj = ((b & oldMask) << ASHIFT) + ABASE;
731 >                    int j    = ((b &    mask) << ASHIFT) + ABASE;
732 >                    x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
733 >                    if (x != null &&
734 >                        U.compareAndSwapObject(oldA, oldj, x, null))
735 >                        U.putObjectVolatile(a, j, x);
736 >                } while (++b != t);
737              }
738 <            if ((p = pool) != null)
730 <                p.signalWork(this, 1);
731 <            return true;
738 >            return a;
739          }
740  
741          /**
# Line 980 | Line 987 | public class ForkJoinPool extends Abstra
987              if (t != null) {
988                  (currentSteal = t).doExec();
989                  currentSteal = null;
990 <                if (++nsteals < 0) {     // spill on overflow
984 <                    ForkJoinPool p;
985 <                    if ((p = pool) != null)
986 <                        p.collectStealCount(this);
987 <                }
990 >                ++nsteals;
991                  if (top != base) {       // process remaining local tasks
992                      if (mode == 0)
993                          popAndExecAll();
# Line 1057 | Line 1060 | public class ForkJoinPool extends Abstra
1060          }
1061      }
1062  
1063 +    // static fields (initialized in static initializer below)
1064 +
1065 +    /**
1066 +     * Creates a new ForkJoinWorkerThread. This factory is used unless
1067 +     * overridden in ForkJoinPool constructors.
1068 +     */
1069 +    public static final ForkJoinWorkerThreadFactory
1070 +        defaultForkJoinWorkerThreadFactory;
1071 +
1072      /**
1073       * Per-thread records for threads that submit to pools. Currently
1074       * holds only pseudo-random seed / index that is used to choose
# Line 1077 | Line 1089 | public class ForkJoinPool extends Abstra
1089          Submitter(int s) { seed = s; }
1090      }
1091  
1080    /** Property prefix for constructing common pool */
1081    private static final String propPrefix =
1082        "java.util.concurrent.ForkJoinPool.common.";
1083
1084    // static fields (initialized in static initializer below)
1085
1092      /**
1093 <     * Creates a new ForkJoinWorkerThread. This factory is used unless
1094 <     * overridden in ForkJoinPool constructors.
1093 >     * Per-thread submission bookkeeping. Shared across all pools
1094 >     * to reduce ThreadLocal pollution and because random motion
1095 >     * to avoid contention in one pool is likely to hold for others.
1096 >     * Lazily initialized on first submission (but null-checked
1097 >     * in other contexts to avoid unnecessary initialization).
1098       */
1099 <    public static final ForkJoinWorkerThreadFactory
1091 <        defaultForkJoinWorkerThreadFactory;
1099 >    static final ThreadLocal<Submitter> submitters;
1100  
1101      /**
1102       * Common (static) pool. Non-null for public use unless a static
# Line 1105 | Line 1113 | public class ForkJoinPool extends Abstra
1113      private static final RuntimePermission modifyThreadPermission;
1114  
1115      /**
1108     * Per-thread submission bookkeeping. Shared across all pools
1109     * to reduce ThreadLocal pollution and because random motion
1110     * to avoid contention in one pool is likely to hold for others.
1111     * Lazily initialized on first submission (but null-checked
1112     * in other contexts to avoid unnecessary initialization).
1113     */
1114    static final ThreadLocal<Submitter> submitters;
1115
1116    /**
1116       * Common pool parallelism. Must equal commonPool.parallelism.
1117       */
1118      static final int commonPoolParallelism;
# Line 1248 | Line 1247 | public class ForkJoinPool extends Abstra
1247      static final int FIFO_QUEUE          =  1;
1248      static final int SHARED_QUEUE        = -1;
1249  
1250 +    // bounds for #steps in scan loop -- must be power 2 minus 1
1251 +    private static final int MIN_SCAN    = 0x1ff;   // cover estimation slop
1252 +    private static final int MAX_SCAN    = 0x1ffff; // 4 * max workers
1253 +
1254      // Instance fields
1255  
1256      /*
1257 <     * Field layout order in this class tends to matter more than one
1258 <     * would like. Runtime layout order is only loosely related to
1257 >     * Field layout of this class tends to matter more than one would
1258 >     * like. Runtime layout order is only loosely related to
1259       * declaration order and may differ across JVMs, but the following
1260       * empirically works OK on current JVMs.
1261       */
1262      volatile long stealCount;                  // collects worker counts
1263      volatile long ctl;                         // main pool control
1261    final int parallelism;                     // parallelism level
1262    final int localMode;                       // per-worker scheduling mode
1263    volatile int indexSeed;                    // worker/submitter index seed
1264      volatile int plock;                        // shutdown status and seqLock
1265 +    volatile int indexSeed;                    // worker/submitter index seed
1266 +    final int config;                          // mode and parallelism level
1267      WorkQueue[] workQueues;                    // main registry
1268 <    final ForkJoinWorkerThreadFactory factory; // factory for new workers
1268 >    final ForkJoinWorkerThreadFactory factory;
1269      final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
1270      final String workerNamePrefix;             // to create worker name string
1271  
# Line 1283 | Line 1285 | public class ForkJoinPool extends Abstra
1285              if (((ps = plock) & PL_LOCK) == 0 &&
1286                  U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
1287                  return nps;
1288 <            else if (r == 0)
1289 <                r = ThreadLocalRandom.current().nextInt(); // randomize spins
1288 >            else if (r == 0) { // randomize spins if possible
1289 >                Thread t = Thread.currentThread(); WorkQueue w; Submitter z;
1290 >                if ((t instanceof ForkJoinWorkerThread) &&
1291 >                    (w = ((ForkJoinWorkerThread)t).workQueue) != null)
1292 >                    r = w.seed;
1293 >                else if ((z = submitters.get()) != null)
1294 >                    r = z.seed;
1295 >                else
1296 >                    r = 1;
1297 >            }
1298              else if (spins >= 0) {
1299                  r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift
1300                  if (r >= 0)
# Line 1318 | Line 1328 | public class ForkJoinPool extends Abstra
1328          synchronized (this) { notifyAll(); }
1329      }
1330  
1331 <    //  Registering and deregistering workers
1331 >    /**
1332 >     * Tries to create and start a worker; adjusts counts etc on failure
1333 >     */
1334 >    private void addWorker() {
1335 >        ForkJoinWorkerThread wt = null;
1336 >        try {
1337 >            (wt = factory.newThread(this)).start();
1338 >        } catch (Throwable ex) {
1339 >            deregisterWorker(wt, ex); // adjust on failure
1340 >        }
1341 >    }
1342  
1343      /**
1344 <     * Callback from ForkJoinWorkerThread constructor to establish its
1345 <     * poolIndex and record its WorkQueue. To avoid scanning bias due
1346 <     * to packing entries in front of the workQueues array, we treat
1347 <     * the array as a simple power-of-two hash table using per-thread
1348 <     * seed as hash, expanding as needed.
1349 <     *
1350 <     * @param w the worker's queue
1351 <     */
1352 <    final void registerWorker(WorkQueue w) {
1353 <        int s, ps; // generate a rarely colliding candidate index seed
1354 <        do {} while (!U.compareAndSwapInt(this, INDEXSEED,
1355 <                                          s = indexSeed, s += SEED_INCREMENT) ||
1356 <                     s == 0); // skip 0
1344 >     * Performs secondary initialization, called when plock is zero.
1345 >     * Creates workQueue array and sets plock to a valid value.  The
1346 >     * lock body must be exception-free (so no try/finally) so we
1347 >     * optimistically allocate new array outside the lock and throw
1348 >     * away if (very rarely) not needed. (A similar tactic is used in
1349 >     * fullExternalPush.)  Because the plock seq value can eventually
1350 >     * wrap around zero, this method harmlessly fails to reinitialize
1351 >     * if workQueues exists, while still advancing plock.
1352 >     */
1353 >    private void initWorkQueuesArray() {
1354 >        WorkQueue[] ws; int ps;
1355 >        int p = config & SMASK;        // find power of two table size
1356 >        int n = (p > 1) ? p - 1 : 1;   // ensure at least 2 slots
1357 >        n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
1358 >        WorkQueue[] nws = new WorkQueue[(n + 1) << 1];
1359          if (((ps = plock) & PL_LOCK) != 0 ||
1360              !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1361              ps = acquirePlock();
1362 +        if ((ws = workQueues) == null || ws.length == 0)
1363 +            workQueues = nws;
1364          int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1365 <        try {
1366 <            WorkQueue[] ws;
1367 <            if (w != null && (ws = workQueues) != null) {
1368 <                w.seed = s;
1369 <                int n = ws.length, m = n - 1;
1370 <                int r = (s << 1) | 1;               // use odd-numbered indices
1371 <                if (ws[r &= m] != null) {           // collision
1372 <                    int probes = 0;                 // step by approx half size
1373 <                    int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1374 <                    while (ws[r = (r + step) & m] != null) {
1375 <                        if (++probes >= n) {
1376 <                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1377 <                            m = n - 1;
1378 <                            probes = 0;
1365 >        if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1366 >            releasePlock(nps);
1367 >        long c; int u;
1368 >        if ((u = (int)((c = ctl) >>> 32)) < 0 && (int)c == 0) {
1369 >            long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
1370 >                             ((u + UAC_UNIT) & UAC_MASK)) << 32;
1371 >            if (U.compareAndSwapLong(this, CTL, c, nc))
1372 >                addWorker();
1373 >        }
1374 >
1375 >    }
1376 >
1377 >    //  Registering and deregistering workers
1378 >
1379 >    /**
1380 >     * Callback from ForkJoinWorkerThread to establish and record its
1381 >     * WorkQueue. To avoid scanning bias due to packing entries in
1382 >     * front of the workQueues array, we treat the array as a simple
1383 >     * power-of-two hash table using per-thread seed as hash,
1384 >     * expanding as needed.
1385 >     *
1386 >     * @param wt the worker thread
1387 >     */
1388 >    final void registerWorker(ForkJoinWorkerThread wt) {
1389 >        if (wt != null && wt.workQueue == null) {
1390 >            int s, ps;    // generate a rarely colliding candidate index seed
1391 >            do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
1392 >                                              s += SEED_INCREMENT) ||
1393 >                         s == 0); // skip 0
1394 >            WorkQueue w = new WorkQueue(this, wt, config >>> 16, s);
1395 >            if (((ps = plock) & PL_LOCK) != 0 ||
1396 >                !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1397 >                ps = acquirePlock();
1398 >            int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1399 >            try {
1400 >                WorkQueue[] ws;
1401 >                if ((ws = workQueues) != null && wt.workQueue == null) {
1402 >                    int n = ws.length, m = n - 1;
1403 >                    int r = (s << 1) | 1;           // use odd-numbered indices
1404 >                    if (ws[r &= m] != null) {       // collision
1405 >                        int probes = 0;             // step by approx half size
1406 >                        int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
1407 >                        while (ws[r = (r + step) & m] != null) {
1408 >                            if (++probes >= n) {
1409 >                                workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1410 >                                m = n - 1;
1411 >                                probes = 0;
1412 >                            }
1413                          }
1414                      }
1415 +                    w.eventCount = w.poolIndex = r; // volatile write orders
1416 +                    wt.workQueue = ws[r] = w;
1417                  }
1418 <                w.eventCount = w.poolIndex = r;     // establish before recording
1419 <                ws[r] = w;
1418 >            } finally {
1419 >                if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1420 >                    releasePlock(nps);
1421              }
1361        } finally {
1362            if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1363                releasePlock(nps);
1422          }
1423      }
1424  
# Line 1377 | Line 1435 | public class ForkJoinPool extends Abstra
1435          WorkQueue w = null;
1436          if (wt != null && (w = wt.workQueue) != null) {
1437              int ps;
1380            collectStealCount(w);
1438              w.qlock = -1;                // ensure set
1439 +            long ns = w.nsteals, sc;     // collect steal count
1440 +            do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
1441 +                                               sc = stealCount, sc + ns));
1442              if (((ps = plock) & PL_LOCK) != 0 ||
1443                  !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1444                  ps = acquirePlock();
# Line 1403 | Line 1463 | public class ForkJoinPool extends Abstra
1463          if (!tryTerminate(false, false) && w != null) {
1464              w.cancelAll();                  // cancel remaining tasks
1465              if (w.array != null)            // suppress signal if never ran
1466 <                signalWork(null, 1);        // wake up or create replacement
1466 >                helpSignal(null, 0);        // wake up or create replacement
1467              if (ex == null)                 // help clean refs on way out
1468                  ForkJoinTask.helpExpungeStaleExceptions();
1469          }
# Line 1412 | Line 1472 | public class ForkJoinPool extends Abstra
1472              ForkJoinTask.rethrow(ex);
1473      }
1474  
1415    /**
1416     * Collect worker steal count into total. Called on termination
1417     * and upon int overflow of local count. (There is a possible race
1418     * in the latter case vs any caller of getStealCount, which can
1419     * make its results less accurate than usual.)
1420     */
1421    final void collectStealCount(WorkQueue w) {
1422        if (w != null) {
1423            long sc;
1424            int ns = w.nsteals;
1425            w.nsteals = 0; // handle overflow
1426            long steals = (ns >= 0) ? ns : 1L + (long)(Integer.MAX_VALUE);
1427            do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
1428                                               sc = stealCount, sc + steals));
1429        }
1430    }
1431
1475      // Submissions
1476  
1477      /**
# Line 1445 | Line 1488 | public class ForkJoinPool extends Abstra
1488              (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
1489              (q = ws[m & z.seed & SQMASK]) != null &&
1490              U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
1491 <            int s = q.top, n;
1492 <            if ((a = q.array) != null && a.length > (n = s + 1 - q.base)) {
1493 <                U.putObject(a, (long)(((a.length - 1) & s) << ASHIFT) + ABASE,
1451 <                            task);
1491 >            int b = q.base, s = q.top, n, an;
1492 >            if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
1493 >                U.putObject(a, (long)(((an - 1) & s) << ASHIFT) + ABASE, task);
1494                  q.top = s + 1;                     // push on to deque
1495                  q.qlock = 0;
1496 <                if (n <= 1)
1497 <                    signalWork(q, 1);
1496 >                if (n <= 2)
1497 >                    signalWork(q, 0);
1498                  return;
1499              }
1500              q.qlock = 0;
# Line 1463 | Line 1505 | public class ForkJoinPool extends Abstra
1505      /**
1506       * Full version of externalPush. This method is called, among
1507       * other times, upon the first submission of the first task to the
1508 <     * pool, so must perform secondary initialization: creating
1509 <     * workQueue array and setting plock to a valid value. It also
1510 <     * detects first submission by an external thread by looking up
1511 <     * its ThreadLocal, and creates a new shared queue if the one at
1512 <     * index if empty or contended. The lock bodies must be
1513 <     * exception-free (so no try/finally) so we optimistically
1514 <     * allocate new queues/arrays outside the locks and throw them
1473 <     * away if (very rarely) not needed. Note that the plock seq value
1474 <     * can eventually wrap around zero, but if so harmlessly fails to
1475 <     * reinitialize.
1508 >     * pool, so must perform secondary initialization (via
1509 >     * initWorkQueuesArray). It also detects first submission by an
1510 >     * external thread by looking up its ThreadLocal, and creates a
1511 >     * new shared queue if the one at index if empty or contended. The
1512 >     * lock body must be exception-free (so no try/finally) so we
1513 >     * optimistically allocate new queues outside the lock and throw
1514 >     * them away if (very rarely) not needed.
1515       */
1516      private void fullExternalPush(ForkJoinTask<?> task) {
1517 <        for (Submitter z = null;;) {
1518 <            WorkQueue[] ws; WorkQueue q; int ps, m, r, s;
1519 <            if ((ps = plock) < 0)
1517 >        int r = 0;
1518 >        for (Submitter z = submitters.get();;) {
1519 >            WorkQueue[] ws; WorkQueue q; int ps, m, k;
1520 >            if (z == null) {
1521 >                if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed,
1522 >                                        r += SEED_INCREMENT) && r != 0)
1523 >                    submitters.set(z = new Submitter(r));
1524 >            }
1525 >            else if (r == 0) {               // move to a different index
1526 >                r = z.seed;
1527 >                r ^= r << 13;                // same xorshift as WorkQueues
1528 >                r ^= r >>> 17;
1529 >                z.seed = r ^ (r << 5);
1530 >            }
1531 >            else if ((ps = plock) < 0)
1532                  throw new RejectedExecutionException();
1533 <            else if ((ws = workQueues) == null || (m = ws.length - 1) < 0) {
1534 <                int n = parallelism - 1; n |= n >>> 1; n |= n >>> 2;
1535 <                n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
1536 <                WorkQueue[] nws = new WorkQueue[(n + 1) << 1]; // power of two
1537 <                if ((ps & PL_LOCK) != 0 ||
1533 >            else if (ps == 0 || (ws = workQueues) == null ||
1534 >                     (m = ws.length - 1) < 0)
1535 >                initWorkQueuesArray();
1536 >            else if ((q = ws[k = r & m & SQMASK]) != null) {
1537 >                if (q.trySharedPush(task))
1538 >                    return;
1539 >                else
1540 >                    r = 0; // move on contention
1541 >            }
1542 >            else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
1543 >                q = new WorkQueue(this, null, SHARED_QUEUE, r);
1544 >                if (((ps = plock) & PL_LOCK) != 0 ||
1545                      !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1546                      ps = acquirePlock();
1547 <                if ((ws = workQueues) == null)
1548 <                    workQueues = nws;
1547 >                if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
1548 >                    ws[k] = q;
1549                  int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1550                  if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1551                      releasePlock(nps);
1552              }
1553 <            else if (z == null && (z = submitters.get()) == null) {
1554 <                if (U.compareAndSwapInt(this, INDEXSEED,
1497 <                                        s = indexSeed, s += SEED_INCREMENT) &&
1498 <                    s != 0) // skip 0
1499 <                    submitters.set(z = new Submitter(s));
1500 <            }
1501 <            else {
1502 <                int k = (r = z.seed) & m & SQMASK;
1503 <                if ((q = ws[k]) == null && (ps & PL_LOCK) == 0) {
1504 <                    (q = new WorkQueue(this, null, SHARED_QUEUE)).poolIndex = k;
1505 <                    if (((ps = plock) & PL_LOCK) != 0 ||
1506 <                        !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
1507 <                        ps = acquirePlock();
1508 <                    WorkQueue w = null;
1509 <                    if ((ws = workQueues) != null && k < ws.length &&
1510 <                        (w = ws[k]) == null)
1511 <                        ws[k] = q;
1512 <                    else
1513 <                        q = w;
1514 <                    int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
1515 <                    if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
1516 <                        releasePlock(nps);
1517 <                }
1518 <                if (q != null && q.qlock == 0 && q.fullPush(task, false))
1519 <                    return;
1520 <                r ^= r << 13;                // same xorshift as WorkQueues
1521 <                r ^= r >>> 17;
1522 <                z.seed = r ^= r << 5;        // move to a different index
1523 <            }
1553 >            else
1554 >                r = 0; // try elsewhere while lock held
1555          }
1556      }
1557  
# Line 1541 | Line 1572 | public class ForkJoinPool extends Abstra
1572       * empty or all workers are active.
1573       *
1574       * @param q if non-null, the queue holding tasks to be signalled
1575 <     * @param signals the target number of signals.
1575 >     * @param signals the target number of signals (at least one --
1576 >     * if argument is zero also sets signallee hint if parked).
1577       */
1578      final void signalWork(WorkQueue q, int signals) {
1579 <        long c; int e, u, i; WorkQueue[] ws; WorkQueue w; Thread p;
1579 >        long c; int e, u, i, s; WorkQueue[] ws; WorkQueue w; Thread p;
1580          while ((u = (int)((c = ctl) >>> 32)) < 0) {
1581              if ((e = (int)c) > 0) {
1582                  if ((ws = workQueues) != null && ws.length > (i = e & SMASK) &&
# Line 1553 | Line 1585 | public class ForkJoinPool extends Abstra
1585                                 ((long)(u + UAC_UNIT) << 32));
1586                      if (U.compareAndSwapLong(this, CTL, c, nc)) {
1587                          w.eventCount = (e + E_SEQ) & E_MASK;
1588 <                        if ((p = w.parker) != null)
1588 >                        if ((p = w.parker) != null) {
1589 >                            if (q != null && signals == 0)
1590 >                                w.hint = q.poolIndex;
1591                              U.unpark(p);
1592 +                        }
1593                          if (--signals <= 0)
1594                              break;
1595                      }
1596 <                    else
1597 <                        signals = 1;
1563 <                    if ((q != null && q.queueSize() == 0))
1596 >                    if (q != null && (s = q.queueSize()) <= signals &&
1597 >                         (signals = s) <= 0)
1598                          break;
1599                  }
1600                  else
# Line 1570 | Line 1604 | public class ForkJoinPool extends Abstra
1604                  long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
1605                                   ((u + UAC_UNIT) & UAC_MASK)) << 32;
1606                  if (U.compareAndSwapLong(this, CTL, c, nc)) {
1607 <                    ForkJoinWorkerThread wt = null;
1574 <                    Throwable ex = null;
1575 <                    boolean started = false;
1576 <                    try {
1577 <                        ForkJoinWorkerThreadFactory fac;
1578 <                        if ((fac = factory) != null &&
1579 <                            (wt = fac.newThread(this)) != null) {
1580 <                            wt.start();
1581 <                            started = true;
1582 <                        }
1583 <                    } catch (Throwable rex) {
1584 <                        ex = rex;
1585 <                    }
1586 <                    if (!started)
1587 <                        deregisterWorker(wt, ex); // adjust counts on failure
1607 >                    addWorker();
1608                      break;
1609                  }
1610              }
# Line 1599 | Line 1619 | public class ForkJoinPool extends Abstra
1619       * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1620       */
1621      final void runWorker(WorkQueue w) {
1622 <        // initialize queue array in this thread
1623 <        w.array = new ForkJoinTask<?>[WorkQueue.INITIAL_QUEUE_CAPACITY];
1604 <        do { w.runTask(scan(w)); } while (w.qlock >= 0);
1622 >        if (w != null) // skip on initialization failure
1623 >            do { w.runTask(scan(w)); } while (w.qlock >= 0);
1624      }
1625  
1626      /**
# Line 1612 | Line 1631 | public class ForkJoinPool extends Abstra
1631       * contention, or state changes that indicate possible success on
1632       * re-invocation.
1633       *
1634 <     * The scan searches for tasks across a random permutation of
1635 <     * queues (starting at a random index and stepping by a random
1636 <     * relative prime, checking each at least once).  The scan
1637 <     * terminates upon either finding a non-empty queue, or completing
1638 <     * the sweep. If the worker is not inactivated, it takes and
1639 <     * returns a task from this queue. Otherwise, if not activated, it
1640 <     * signals workers (that may include itself) and returns so caller
1641 <     * can retry. Also returns for trtry if the worker array may have
1642 <     * changed during an empty scan.  On failure to find a task, we
1643 <     * take one of the following actions, after which the caller will
1644 <     * retry calling this method unless terminated.
1634 >     * The scan searches for tasks across queues (starting at a random
1635 >     * index, and relying on registerWorker to irregularly scatter
1636 >     * them within array to avoid bias), checking each at least twice.
1637 >     * The scan terminates upon either finding a non-empty queue, or
1638 >     * completing the sweep. If the worker is not inactivated, it
1639 >     * takes and returns a task from this queue. Otherwise, if not
1640 >     * activated, it signals workers (that may include itself) and
1641 >     * returns so caller can retry. Also returns for true if the
1642 >     * worker array may have changed during an empty scan.  On failure
1643 >     * to find a task, we take one of the following actions, after
1644 >     * which the caller will retry calling this method unless
1645 >     * terminated.
1646       *
1647       * * If pool is terminating, terminate the worker.
1648       *
# Line 1639 | Line 1659 | public class ForkJoinPool extends Abstra
1659       * @return a task or null if none found
1660       */
1661      private final ForkJoinTask<?> scan(WorkQueue w) {
1662 <        WorkQueue[] ws; WorkQueue q;           // first update random seed
1663 <        int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1664 <        int ps = plock, m;                     // volatile read order matters
1665 <        if ((ws = workQueues) != null && (m = ws.length - 1) > 0) {
1666 <            int ec = w.eventCount;             // ec is negative if inactive
1667 <            int step = (r >>> 16) | 1;         // relatively prime
1668 <            for (int j = (m + 1) << 2;  ; --j, r += step) {
1669 <                ForkJoinTask<?> t; ForkJoinTask<?>[] a; int b, n;
1670 <                if ((q = ws[r & m]) != null && (b = q.base) - q.top < 0 &&
1651 <                    (a = q.array) != null) {   // probably nonempty
1662 >        WorkQueue[] ws; int m, hint;
1663 >        int ps = plock;                          // read plock before ws
1664 >        if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1665 >            int ec = w.eventCount;               // ec is negative if inactive
1666 >            int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1667 >            for (int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN; ; --j) {
1668 >                WorkQueue q; ForkJoinTask<?>[] a; int b;
1669 >                if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 &&
1670 >                    (a = q.array) != null) {     // probably nonempty
1671                      int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1672 <                    t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1672 >                    ForkJoinTask<?> t = (ForkJoinTask<?>)
1673 >                        U.getObjectVolatile(a, i);
1674                      if (q.base == b && ec >= 0 && t != null &&
1675                          U.compareAndSwapObject(a, i, t, null)) {
1676 <                        if ((n = q.top - (q.base = b + 1)) > 0)
1677 <                            signalWork(q, n);
1678 <                        return t;              // taken
1679 <                    }
1680 <                    if (j < m || (ec < 0 && (ec = w.eventCount) < 0)) {
1681 <                        if ((n = q.queueSize() - 1) > 0)
1682 <                            signalWork(q, n);
1683 <                        break;                 // let caller retry after signal
1676 >                        if ((q.base = b + 1) - q.top < 0)
1677 >                            signalWork(q, 0);
1678 >                        return t;                // taken
1679 >                    }
1680 >                    else if (ec < 0 || j < m) {  // cannot take or cannot rescan
1681 >                        w.hint = q.poolIndex;    // use hint below
1682 >                        break;                   // let caller retry after signal
1683 >                    }
1684 >                }
1685 >                else if (j < 0) { // end of scan; in loop to simplify code
1686 >                    long c, sc; int e, ns;
1687 >                    if ((ns = w.nsteals) != 0) {
1688 >                        if (U.compareAndSwapLong(this, STEALCOUNT,
1689 >                                                 sc = stealCount, sc + ns))
1690 >                            w.nsteals = 0;       // collect steals
1691                      }
1692 <                }
1666 <                else if (j < 0) {              // end of scan
1667 <                    long c = ctl; int e;
1668 <                    if (plock != ps)           // incomplete sweep
1692 >                    else if (plock != ps)        // ws may have changed
1693                          break;
1694 <                    if ((e = (int)c) < 0)      // pool is terminating
1695 <                        w.qlock = -1;
1696 <                    else if (ec >= 0) {        // try to enqueue/inactivate
1694 >                    else if ((e = (int)(c = ctl)) < 0)
1695 >                        w.qlock = -1;            // pool is terminating
1696 >                    else if (ec >= 0) {          // try to enqueue/inactivate
1697                          long nc = ((long)ec |
1698                                     ((c - AC_UNIT) & (AC_MASK|TC_MASK)));
1699 <                        w.nextWait = e;
1700 <                        w.eventCount = ec | INT_SIGN; // mark as inactive
1699 >                        w.nextWait = e;          // link and mark inactive
1700 >                        w.hint = -1;             // use hint if set while parked
1701 >                        w.eventCount = ec | INT_SIGN;
1702                          if (ctl != c ||
1703                              !U.compareAndSwapLong(this, CTL, c, nc))
1704 <                            w.eventCount = ec; // unmark on CAS failure
1705 <                        else if ((int)(c >> AC_SHIFT) == 1 - parallelism)
1706 <                            idleAwaitWork(w, nc, c);  // quiescent
1704 >                            w.eventCount = ec;  // unmark on CAS failure
1705 >                        else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1706 >                            idleAwaitWork(w, nc, c);
1707                      }
1708 <                    else if (w.seed >= 0 && w.eventCount < 0) {
1708 >                    else if (w.eventCount < 0) { // block
1709                          Thread wt = Thread.currentThread();
1710 <                        Thread.interrupted();  // clear status
1710 >                        Thread.interrupted();    // clear status
1711                          U.putObject(wt, PARKBLOCKER, this);
1712 <                        w.parker = wt;         // emulate LockSupport.park
1713 <                        if (w.eventCount < 0)  // recheck
1712 >                        w.parker = wt;           // emulate LockSupport.park
1713 >                        if (w.eventCount < 0)    // recheck
1714                              U.park(false, 0L);
1715                          w.parker = null;
1716                          U.putObject(wt, PARKBLOCKER, null);
# Line 1693 | Line 1718 | public class ForkJoinPool extends Abstra
1718                      break;
1719                  }
1720              }
1721 +            if ((hint = w.hint) >= 0) {          // help signal
1722 +                WorkQueue[] vs; WorkQueue v; int k;
1723 +                w.hint = -1;                     // suppress resignal
1724 +                if ((vs = workQueues) != null && hint < vs.length &&
1725 +                    (v = vs[hint]) != null && (k = v.base - v.top) < -1)
1726 +                    signalWork(v, 1 - k);
1727 +            }
1728          }
1729          return null;
1730      }
# Line 1710 | Line 1742 | public class ForkJoinPool extends Abstra
1742       * @param prevCtl the ctl value to restore if thread is terminated
1743       */
1744      private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1745 <        if (w.eventCount < 0 &&
1746 <            (this == commonPool || !tryTerminate(false, false)) &&
1715 <            (int)prevCtl != 0) {
1745 >        if (w != null && w.eventCount < 0 &&
1746 >            !tryTerminate(false, false) && (int)prevCtl != 0) {
1747              int dc = -(short)(currentCtl >>> TC_SHIFT);
1748              long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1749              long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop
# Line 1731 | Line 1762 | public class ForkJoinPool extends Abstra
1762                      U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1763                      w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1764                      w.qlock = -1;   // shrink
1765 +                    w.hint = -1;    // suppress helping
1766                      break;
1767                  }
1768              }
# Line 1738 | Line 1770 | public class ForkJoinPool extends Abstra
1770      }
1771  
1772      /**
1773 <     * Scans through queues looking for work while joining a task;
1774 <     * if any are present, signals.
1773 >     * Scans through queues looking for work (optionally, while
1774 >     * joining a task); if any are present, signals. May return early
1775 >     * if more signalling is detectably unneeded.
1776       *
1777 <     * @param task to return early if done
1777 >     * @param task if non-null, return early if done
1778       * @param origin an index to start scan
1779       */
1780      final int helpSignal(ForkJoinTask<?> task, int origin) {
1781 <        WorkQueue[] ws; WorkQueue q; int m, n, s;
1782 <        if (task != null && (ws = workQueues) != null &&
1750 <            (m = ws.length - 1) >= 0) {
1781 >        WorkQueue[] ws; WorkQueue q; int m, n, s, u;
1782 >        if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1783              for (int i = 0; i <= m; ++i) {
1784 <                if ((s = task.status) < 0)
1784 >                if (task != null && (s = task.status) < 0)
1785                      return s;
1786                  if ((q = ws[(i + origin) & m]) != null &&
1787                      (n = q.queueSize()) > 0) {
1788                      signalWork(q, n);
1789 <                    if ((int)(ctl >> AC_SHIFT) >= 0)
1789 >                    if ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)
1790                          break;
1791                  }
1792              }
# Line 1793 | Line 1825 | public class ForkJoinPool extends Abstra
1825                      }
1826                      if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
1827                          break restart;              // shutting down
1828 <                    if ((v = ws[h = (j.stealHint | 1) & m]) == null ||
1828 >                    if ((v = ws[h = (j.hint | 1) & m]) == null ||
1829                          v.currentSteal != subtask) {
1830                          for (int origin = h;;) {    // find stealer
1831                              if (((h = (h + 2) & m) & 15) == 1 &&
# Line 1801 | Line 1833 | public class ForkJoinPool extends Abstra
1833                                  continue restart;   // occasional staleness check
1834                              if ((v = ws[h]) != null &&
1835                                  v.currentSteal == subtask) {
1836 <                                j.stealHint = h;    // save hint
1836 >                                j.hint = h;        // save hint
1837                                  break;
1838                              }
1839                              if (h == origin)
# Line 1858 | Line 1890 | public class ForkJoinPool extends Abstra
1890       *
1891       */
1892      private int helpComplete(ForkJoinTask<?> task, int mode) {
1893 <        WorkQueue[] ws; WorkQueue q; int m, n, s;
1893 >        WorkQueue[] ws; WorkQueue q; int m, n, s, u;
1894          if (task != null && (ws = workQueues) != null &&
1895              (m = ws.length - 1) >= 0) {
1896              for (int j = 1, origin = j;;) {
# Line 1866 | Line 1898 | public class ForkJoinPool extends Abstra
1898                      return s;
1899                  if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
1900                      origin = j;
1901 <                    if (mode == SHARED_QUEUE && (int)(ctl >> AC_SHIFT) >= 0)
1901 >                    if (mode == SHARED_QUEUE &&
1902 >                        ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0))
1903                          break;
1904                  }
1905                  else if ((j = (j + 2) & m) == origin)
# Line 1884 | Line 1917 | public class ForkJoinPool extends Abstra
1917       * may become starved.
1918       */
1919      final boolean tryCompensate() {
1920 <        int pc = parallelism, e, u, i, tc; long c;
1920 >        int pc = config & SMASK, e, i, tc; long c;
1921          WorkQueue[] ws; WorkQueue w; Thread p;
1922 <        if ((e = (int)(c = ctl)) >= 0 && (ws = workQueues) != null) {
1922 >        if ((ws = workQueues) != null && (e = (int)(c = ctl)) >= 0) {
1923              if (e != 0 && (i = e & SMASK) < ws.length &&
1924                  (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
1925                  long nc = ((long)(w.nextWait & E_MASK) |
# Line 1898 | Line 1931 | public class ForkJoinPool extends Abstra
1931                      return true;   // replace with idle worker
1932                  }
1933              }
1934 <            else if ((short)((u = (int)(c >>> 32)) >>> UTC_SHIFT) >= 0 &&
1935 <                     (u >> UAC_SHIFT) + pc > 1) {
1934 >            else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 &&
1935 >                     (int)(c >> AC_SHIFT) + pc > 1) {
1936                  long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
1937                  if (U.compareAndSwapLong(this, CTL, c, nc))
1938 <                    return true;    // no compensation
1938 >                    return true;   // no compensation
1939              }
1940 <            else if ((tc = u + pc) < MAX_CAP) {
1940 >            else if (tc + pc < MAX_CAP) {
1941                  long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1942                  if (U.compareAndSwapLong(this, CTL, c, nc)) {
1943 <                    Throwable ex = null;
1944 <                    ForkJoinWorkerThread wt = null;
1912 <                    try {
1913 <                        ForkJoinWorkerThreadFactory fac;
1914 <                        if ((fac = factory) != null &&
1915 <                            (wt = fac.newThread(this)) != null) {
1916 <                            wt.start();
1917 <                            return true;
1918 <                        }
1919 <                    } catch (Throwable rex) {
1920 <                        ex = rex;
1921 <                    }
1922 <                    deregisterWorker(wt, ex); // adjust counts etc
1943 >                    addWorker();
1944 >                    return true;
1945                  }
1946              }
1947          }
# Line 1945 | Line 1967 | public class ForkJoinPool extends Abstra
1967                  (s = helpSignal(task, joiner.poolIndex)) >= 0 &&
1968                  (task instanceof CountedCompleter))
1969                  s = helpComplete(task, LIFO_QUEUE);
1970 +            int k = 0; // to perform pre-block yield for politeness
1971              while (s >= 0 && (s = task.status) >= 0) {
1972                  if ((joiner.queueSize() > 0 ||           // try helping
1973                       (s = tryHelpStealer(joiner, task)) == 0) &&
1974 <                    (s = task.status) >= 0 && tryCompensate()) {
1975 <                    if (task.trySetSignal() && (s = task.status) >= 0) {
1976 <                        synchronized (task) {
1977 <                            if (task.status >= 0) {
1978 <                                try {                // see ForkJoinTask
1979 <                                    task.wait();     //  for explanation
1980 <                                } catch (InterruptedException ie) {
1974 >                    (s = task.status) >= 0) {
1975 >                    if (k < 3) {
1976 >                        if (++k < 3)
1977 >                            s = helpSignal(task, joiner.poolIndex);
1978 >                        else
1979 >                            Thread.yield();
1980 >                    }
1981 >                    else if (!tryCompensate())
1982 >                        k = 0;
1983 >                    else {
1984 >                        if (task.trySetSignal() && (s = task.status) >= 0) {
1985 >                            synchronized (task) {
1986 >                                if (task.status >= 0) {
1987 >                                    try {                // see ForkJoinTask
1988 >                                        task.wait();     //  for explanation
1989 >                                    } catch (InterruptedException ie) {
1990 >                                    }
1991                                  }
1992 +                                else
1993 +                                    task.notifyAll();
1994                              }
1960                            else
1961                                task.notifyAll();
1995                          }
1996 +                        long c;                          // re-activate
1997 +                        do {} while (!U.compareAndSwapLong
1998 +                                     (this, CTL, c = ctl, c + AC_UNIT));
1999                      }
1964                    long c;                          // re-activate
1965                    do {} while (!U.compareAndSwapLong
1966                                 (this, CTL, c = ctl, c + AC_UNIT));
2000                  }
2001              }
2002              joiner.currentJoin = prevJoin;
# Line 2007 | Line 2040 | public class ForkJoinPool extends Abstra
2040       * @param r a (random) seed for scanning
2041       */
2042      private WorkQueue findNonEmptyStealQueue(int r) {
2010        int step = (r >>> 16) | 1;
2043          for (WorkQueue[] ws;;) {
2044 <            int ps = plock, m;
2044 >            int ps = plock, m, n;
2045              if ((ws = workQueues) == null || (m = ws.length - 1) < 1)
2046                  return null;
2047 <            for (int j = (m + 1) << 2; ; r += step) {
2048 <                WorkQueue q = ws[((r << 1) | 1) & m];
2049 <                if (q != null && q.queueSize() > 0)
2047 >            for (int j = (m + 1) << 2; ;) {
2048 >                WorkQueue q = ws[(((r + j) << 1) | 1) & m];
2049 >                if (q != null && (n = q.queueSize()) > 0) {
2050 >                    if (n > 1)
2051 >                        signalWork(q, 0);
2052                      return q;
2053 +                }
2054                  else if (--j < 0) {
2055                      if (plock == ps)
2056                          return null;
# Line 2058 | Line 2093 | public class ForkJoinPool extends Abstra
2093                  }
2094                  else
2095                      c = ctl;        // re-increment on exit
2096 <                if ((int)(c >> AC_SHIFT) + parallelism == 0) {
2096 >                if ((int)(c >> AC_SHIFT) + (config & SMASK) == 0) {
2097                      do {} while (!U.compareAndSwapLong
2098                                   (this, CTL, c = ctl, c + AC_UNIT));
2099                      break;
# Line 2133 | Line 2168 | public class ForkJoinPool extends Abstra
2168      static int getSurplusQueuedTaskCount() {
2169          Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2170          if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
2171 <            int b = (q = (wt = (ForkJoinWorkerThread)t).workQueue).base;
2172 <            int p = (pool = wt.pool).parallelism;
2171 >            int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK;
2172 >            int n = (q = wt.workQueue).top - q.base;
2173              int a = (int)(pool.ctl >> AC_SHIFT) + p;
2174 <            return q.top - b - (a > (p >>>= 1) ? 0 :
2175 <                                a > (p >>>= 1) ? 1 :
2176 <                                a > (p >>>= 1) ? 2 :
2177 <                                a > (p >>>= 1) ? 4 :
2178 <                                8);
2174 >            return n - (a > (p >>>= 1) ? 0 :
2175 >                        a > (p >>>= 1) ? 1 :
2176 >                        a > (p >>>= 1) ? 2 :
2177 >                        a > (p >>>= 1) ? 4 :
2178 >                        8);
2179          }
2180          return 0;
2181      }
# Line 2166 | Line 2201 | public class ForkJoinPool extends Abstra
2201              return false;
2202          for (long c;;) {
2203              if (((c = ctl) & STOP_BIT) != 0) {      // already terminating
2204 <                if ((short)(c >>> TC_SHIFT) == -parallelism) {
2204 >                if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
2205                      synchronized (this) {
2206                          notifyAll();                // signal when 0 workers
2207                      }
# Line 2185 | Line 2220 | public class ForkJoinPool extends Abstra
2220                      releasePlock(nps);
2221              }
2222              if (!now) {                             // check if idle & no tasks
2223 <                if ((int)(c >> AC_SHIFT) != -parallelism ||
2223 >                if ((int)(c >> AC_SHIFT) != -(config & SMASK) ||
2224                      hasQueuedSubmissions())
2225                      return false;
2226                  // Check for unqueued inactive workers. One pass suffices.
# Line 2287 | Line 2322 | public class ForkJoinPool extends Abstra
2322          if (q != null && (a = q.array) != null && (m = (a.length - 1)) >= 0 &&
2323              root != null && root.status >= 0) {
2324              for (;;) {
2325 <                int s; Object o; CountedCompleter<?> task = null;
2325 >                int s, u; Object o; CountedCompleter<?> task = null;
2326                  if ((s = q.top) - q.base > 0) {
2327                      long j = ((m & (s - 1)) << ASHIFT) + ABASE;
2328                      if ((o = U.getObject(a, j)) != null &&
# Line 2310 | Line 2345 | public class ForkJoinPool extends Abstra
2345                  }
2346                  if (task != null)
2347                      task.doExec();
2348 <                if (root.status < 0 || (int)(ctl >> AC_SHIFT) >= 0)
2348 >                if (root.status < 0 ||
2349 >                    (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)
2350                      break;
2351                  if (task == null) {
2352                      if (helpSignal(root, q.poolIndex) >= 0)
# Line 2329 | Line 2365 | public class ForkJoinPool extends Abstra
2365          // Some hard-to-avoid overlap with tryExternalUnpush
2366          ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; Submitter z;
2367          ForkJoinTask<?>[] a;  int m, s, n; long j;
2368 <        if (t != null && t.status >= 0 &&
2368 >        if (t != null &&
2369              (z = submitters.get()) != null &&
2370              (p = commonPool) != null &&
2371              (ws = p.workQueues) != null &&
2372              (m = ws.length - 1) >= 0 &&
2373              (q = ws[m & z.seed & SQMASK]) != null &&
2374 <            (a = q.array) != null) {
2374 >            (a = q.array) != null &&
2375 >            t.status >= 0) {
2376              if ((s = q.top) != q.base &&
2377                  U.getObjectVolatile
2378                  (a, j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE) == t &&
# Line 2363 | Line 2400 | public class ForkJoinPool extends Abstra
2400       */
2401      static void externalHelpQuiescePool() {
2402          ForkJoinPool p; ForkJoinTask<?> t; WorkQueue q; int b;
2366        int r = ThreadLocalRandom.current().nextInt();
2403          if ((p = commonPool) != null &&
2404 <            (q = p.findNonEmptyStealQueue(r)) != null &&
2404 >            (q = p.findNonEmptyStealQueue(1)) != null &&
2405              (b = q.base) - q.top < 0 &&
2406              (t = q.pollAt(b)) != null)
2407              t.doExec();
# Line 2442 | Line 2478 | public class ForkJoinPool extends Abstra
2478              throw new NullPointerException();
2479          if (parallelism <= 0 || parallelism > MAX_CAP)
2480              throw new IllegalArgumentException();
2445        this.parallelism = parallelism;
2481          this.factory = factory;
2482          this.ueh = handler;
2483 <        this.localMode = asyncMode ? FIFO_QUEUE : LIFO_QUEUE;
2483 >        this.config = parallelism | (asyncMode? (FIFO_QUEUE << 16) : 0);
2484          long np = (long)(-parallelism); // offset ctl counts
2485          this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2486          int pn = nextPoolId();
# Line 2462 | Line 2497 | public class ForkJoinPool extends Abstra
2497      ForkJoinPool(int parallelism, long ctl,
2498                   ForkJoinWorkerThreadFactory factory,
2499                   Thread.UncaughtExceptionHandler handler) {
2500 <        this.parallelism = parallelism;
2500 >        this.config = parallelism;
2501          this.ctl = ctl;
2502          this.factory = factory;
2503          this.ueh = handler;
2469        this.localMode = LIFO_QUEUE;
2504          this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2505      }
2506  
# Line 2476 | Line 2510 | public class ForkJoinPool extends Abstra
2510       * @return the common pool instance
2511       */
2512      public static ForkJoinPool commonPool() {
2513 <        return commonPool; // cannot be null (if so, a static init error)
2513 >        // assert commonPool != null : "static init error";
2514 >        return commonPool;
2515      }
2516  
2517      // Execution methods
# Line 2648 | Line 2683 | public class ForkJoinPool extends Abstra
2683       * @return the targeted parallelism level of this pool
2684       */
2685      public int getParallelism() {
2686 <        return parallelism;
2686 >        return config & SMASK;
2687      }
2688  
2689      /**
# Line 2669 | Line 2704 | public class ForkJoinPool extends Abstra
2704       * @return the number of worker threads
2705       */
2706      public int getPoolSize() {
2707 <        return parallelism + (short)(ctl >>> TC_SHIFT);
2707 >        return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
2708      }
2709  
2710      /**
# Line 2679 | Line 2714 | public class ForkJoinPool extends Abstra
2714       * @return {@code true} if this pool uses async mode
2715       */
2716      public boolean getAsyncMode() {
2717 <        return localMode != 0;
2717 >        return (config >>> 16) == FIFO_QUEUE;
2718      }
2719  
2720      /**
# Line 2710 | Line 2745 | public class ForkJoinPool extends Abstra
2745       * @return the number of active threads
2746       */
2747      public int getActiveThreadCount() {
2748 <        int r = parallelism + (int)(ctl >> AC_SHIFT);
2748 >        int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
2749          return (r <= 0) ? 0 : r; // suppress momentarily negative values
2750      }
2751  
# Line 2726 | Line 2761 | public class ForkJoinPool extends Abstra
2761       * @return {@code true} if all threads are currently idle
2762       */
2763      public boolean isQuiescent() {
2764 <        return (int)(ctl >> AC_SHIFT) + parallelism == 0;
2764 >        return (int)(ctl >> AC_SHIFT) + (config & SMASK) == 0;
2765      }
2766  
2767      /**
# Line 2889 | Line 2924 | public class ForkJoinPool extends Abstra
2924                  }
2925              }
2926          }
2927 <        int pc = parallelism;
2927 >        int pc = (config & SMASK);
2928          int tc = pc + (short)(c >>> TC_SHIFT);
2929          int ac = pc + (int)(c >> AC_SHIFT);
2930          if (ac < 0) // ignore transient negative
# Line 2962 | Line 2997 | public class ForkJoinPool extends Abstra
2997      public boolean isTerminated() {
2998          long c = ctl;
2999          return ((c & STOP_BIT) != 0L &&
3000 <                (short)(c >>> TC_SHIFT) == -parallelism);
3000 >                (short)(c >>> TC_SHIFT) == -(config & SMASK));
3001      }
3002  
3003      /**
# Line 2981 | Line 3016 | public class ForkJoinPool extends Abstra
3016      public boolean isTerminating() {
3017          long c = ctl;
3018          return ((c & STOP_BIT) != 0L &&
3019 <                (short)(c >>> TC_SHIFT) != -parallelism);
3019 >                (short)(c >>> TC_SHIFT) != -(config & SMASK));
3020      }
3021  
3022      /**
# Line 3125 | Line 3160 | public class ForkJoinPool extends Abstra
3160          if (t instanceof ForkJoinWorkerThread) {
3161              ForkJoinPool p = ((ForkJoinWorkerThread)t).pool;
3162              while (!blocker.isReleasable()) { // variant of helpSignal
3163 <                WorkQueue[] ws; WorkQueue q; int m, n;
3163 >                WorkQueue[] ws; WorkQueue q; int m, n, u;
3164                  if ((ws = p.workQueues) != null && (m = ws.length - 1) >= 0) {
3165                      for (int i = 0; i <= m; ++i) {
3166                          if (blocker.isReleasable())
3167                              return;
3168                          if ((q = ws[i]) != null && (n = q.queueSize()) > 0) {
3169                              p.signalWork(q, n);
3170 <                            if ((int)(p.ctl >> AC_SHIFT) >= 0)
3170 >                            if ((u = (int)(p.ctl >>> 32)) >= 0 ||
3171 >                                (u >> UAC_SHIFT) >= 0)
3172                                  break;
3173                          }
3174                      }
# Line 3178 | Line 3214 | public class ForkJoinPool extends Abstra
3214      private static final long QLOCK;
3215  
3216      static {
3181        // Establish common pool parameters
3182        // TBD: limit or report ignored exceptions?
3183
3184        int par = 0;
3185        ForkJoinWorkerThreadFactory fac = null;
3186        Thread.UncaughtExceptionHandler handler = null;
3187        try {
3188            String pp = System.getProperty(propPrefix + "parallelism");
3189            String hp = System.getProperty(propPrefix + "exceptionHandler");
3190            String fp = System.getProperty(propPrefix + "threadFactory");
3191            if (fp != null)
3192                fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
3193                       getSystemClassLoader().loadClass(fp).newInstance());
3194            if (hp != null)
3195                handler = ((Thread.UncaughtExceptionHandler)ClassLoader.
3196                           getSystemClassLoader().loadClass(hp).newInstance());
3197            if (pp != null)
3198                par = Integer.parseInt(pp);
3199        } catch (Exception ignore) {
3200        }
3201
3217          int s; // initialize field offsets for CAS etc
3218          try {
3219              U = getUnsafe();
# Line 3227 | Line 3242 | public class ForkJoinPool extends Abstra
3242          if ((s & (s-1)) != 0)
3243              throw new Error("data type scale not a power of two");
3244  
3245 +        submitters = new ThreadLocal<Submitter>();
3246 +        ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory =
3247 +            new DefaultForkJoinWorkerThreadFactory();
3248          /*
3249 <         * For extra caution, computations to set up pool state are
3250 <         * here; the constructor just assigns these values to fields.
3249 >         * Establish common pool parameters.  For extra caution,
3250 >         * computations to set up common pool state are here; the
3251 >         * constructor just assigns these values to fields.
3252           */
3253 <        ForkJoinWorkerThreadFactory defaultFac =
3254 <            defaultForkJoinWorkerThreadFactory =
3255 <            new DefaultForkJoinWorkerThreadFactory();
3256 <        if (fac == null)
3257 <            fac = defaultFac;
3253 >
3254 >        int par = 0;
3255 >        Thread.UncaughtExceptionHandler handler = null;
3256 >        try {  // TBD: limit or report ignored exceptions?
3257 >            String pp = System.getProperty
3258 >                ("java.util.concurrent.ForkJoinPool.common.parallelism");
3259 >            String hp = System.getProperty
3260 >                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
3261 >            String fp = System.getProperty
3262 >                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
3263 >            if (fp != null)
3264 >                fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
3265 >                       getSystemClassLoader().loadClass(fp).newInstance());
3266 >            if (hp != null)
3267 >                handler = ((Thread.UncaughtExceptionHandler)ClassLoader.
3268 >                           getSystemClassLoader().loadClass(hp).newInstance());
3269 >            if (pp != null)
3270 >                par = Integer.parseInt(pp);
3271 >        } catch (Exception ignore) {
3272 >        }
3273 >
3274          if (par <= 0)
3275              par = Runtime.getRuntime().availableProcessors();
3276          if (par > MAX_CAP)
3277              par = MAX_CAP;
3278 +        commonPoolParallelism = par;
3279          long np = (long)(-par); // precompute initial ctl value
3280          long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
3281  
3246        commonPoolParallelism = par;
3282          commonPool = new ForkJoinPool(par, ct, fac, handler);
3283          modifyThreadPermission = new RuntimePermission("modifyThread");
3249        submitters = new ThreadLocal<Submitter>();
3284      }
3285  
3286      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines