ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.118 by jsr166, Sat Jan 28 04:34:54 2012 UTC vs.
Revision 1.119 by dl, Tue Jan 31 00:44:13 2012 UTC

# Line 21 | Line 21 | import java.util.concurrent.RunnableFutu
21   import java.util.concurrent.TimeUnit;
22   import java.util.concurrent.atomic.AtomicInteger;
23   import java.util.concurrent.atomic.AtomicLong;
24 < import java.util.concurrent.locks.ReentrantLock;
24 > import java.util.concurrent.locks.AbstractQueuedSynchronizer;
25   import java.util.concurrent.locks.Condition;
26  
27   /**
# Line 203 | Line 203 | public class ForkJoinPool extends Abstra
203       * take tasks, and they are multiplexed on to a finite number of
204       * shared work queues. However, classes are set up so that future
205       * extensions could allow submitters to optionally help perform
206 <     * tasks as well. Pool submissions from internal workers are also
207 <     * allowed, but use randomized rather than thread-hashed queue
208 <     * indices to avoid imbalance.  Insertion of tasks in shared mode
209 <     * requires a lock (mainly to protect in the case of resizing) but
210 <     * we use only a simple spinlock (using bits in field runState),
211 <     * because submitters encountering a busy queue try or create
212 <     * others so never block.
206 >     * tasks as well. Insertion of tasks in shared mode requires a
207 >     * lock (mainly to protect in the case of resizing) but we use
208 >     * only a simple spinlock (using bits in field runState), because
209 >     * submitters encountering a busy queue move on to try or create
210 >     * other queues, so never block.
211       *
212       * Management
213       * ==========
# Line 235 | Line 233 | public class ForkJoinPool extends Abstra
233       * deregister WorkQueues, as well as to enable shutdown. It is
234       * only modified under a lock (normally briefly held, but
235       * occasionally protecting allocations and resizings) but even
236 <     * when locked remains available to check consistency.
236 >     * when locked remains available to check consistency. An
237 >     * auxiliary field "growHints", also only modified under lock,
238 >     * contains a candidate index for the next WorkQueue and
239 >     * a mask for submission queue indices.
240       *
241       * Recording WorkQueues.  WorkQueues are recorded in the
242       * "workQueues" array that is created upon pool construction and
# Line 251 | Line 252 | public class ForkJoinPool extends Abstra
252       * presized to hold twice #parallelism workers (which is unlikely
253       * to need further resizing during execution). But to avoid
254       * dealing with so many null slots, variable runState includes a
255 <     * mask for the nearest power of two that contains all current
256 <     * workers.  All worker thread creation is on-demand, triggered by
257 <     * task submissions, replacement of terminated workers, and/or
255 >     * mask for the nearest power of two that contains all currently
256 >     * used indices.
257 >     *
258 >     * All worker thread creation is on-demand, triggered by task
259 >     * submissions, replacement of terminated workers, and/or
260       * compensation for blocked workers. However, all other support
261       * code is set up to work with other policies.  To ensure that we
262       * do not hold on to worker references that would prevent GC, ALL
# Line 266 | Line 269 | public class ForkJoinPool extends Abstra
269       * both index-check and null-check the IDs. All such accesses
270       * ignore bad IDs by returning out early from what they are doing,
271       * since this can only be associated with termination, in which
272 <     * case it is OK to give up.
273 <     *
274 <     * All uses of the workQueues array check that it is non-null
275 <     * (even if previously non-null). This allows nulling during
276 <     * termination, which is currently not necessary, but remains an
277 <     * option for resource-revocation-based shutdown schemes. It also
275 <     * helps reduce JIT issuance of uncommon-trap code, which tends to
272 >     * case it is OK to give up.  All uses of the workQueues array
273 >     * also check that it is non-null (even if previously
274 >     * non-null). This allows nulling during termination, which is
275 >     * currently not necessary, but remains an option for
276 >     * resource-revocation-based shutdown schemes. It also helps
277 >     * reduce JIT issuance of uncommon-trap code, which tends to
278       * unnecessarily complicate control flow in some methods.
279       *
280       * Event Queuing. Unlike HPC work-stealing frameworks, we cannot
# Line 383 | Line 385 | public class ForkJoinPool extends Abstra
385       * (http://portal.acm.org/citation.cfm?id=155354). It differs in
386       * that: (1) We only maintain dependency links across workers upon
387       * steals, rather than use per-task bookkeeping.  This sometimes
388 <     * requires a linear scan of workers array to locate stealers, but
388 >     * requires a linear scan of workQueues array to locate stealers, but
389       * often doesn't because stealers leave hints (that may become
390       * stale/wrong) of where to locate them.  A stealHint is only a
391       * hint because a worker might have had multiple steals and the
# Line 418 | Line 420 | public class ForkJoinPool extends Abstra
420       * managed by ForkJoinPool, so are directly accessed.  There is
421       * little point trying to reduce this, since any associated future
422       * changes in representations will need to be accompanied by
423 <     * algorithmic changes anyway. All together, these low-level
424 <     * implementation choices produce as much as a factor of 4
425 <     * performance improvement compared to naive implementations, and
426 <     * enable the processing of billions of tasks per second, at the
427 <     * expense of some ugliness.
428 <     *
429 <     * Methods signalWork() and scan() are the main bottlenecks, so are
430 <     * especially heavily micro-optimized/mangled.  There are lots of
431 <     * inline assignments (of form "while ((local = field) != 0)")
432 <     * which are usually the simplest way to ensure the required read
433 <     * orderings (which are sometimes critical). This leads to a
434 <     * "C"-like style of listing declarations of these locals at the
435 <     * heads of methods or blocks.  There are several occurrences of
436 <     * the unusual "do {} while (!cas...)"  which is the simplest way
435 <     * to force an update of a CAS'ed variable. There are also other
436 <     * coding oddities that help some methods perform reasonably even
437 <     * when interpreted (not compiled).
423 >     * algorithmic changes anyway. Several methods intrinsically
424 >     * sprawl because they must accumulate sets of consistent reads of
425 >     * volatiles held in local variables.  Methods signalWork() and
426 >     * scan() are the main bottlenecks, so are especially heavily
427 >     * micro-optimized/mangled.  There are lots of inline assignments
428 >     * (of form "while ((local = field) != 0)") which are usually the
429 >     * simplest way to ensure the required read orderings (which are
430 >     * sometimes critical). This leads to a "C"-like style of listing
431 >     * declarations of these locals at the heads of methods or blocks.
432 >     * There are several occurrences of the unusual "do {} while
433 >     * (!cas...)"  which is the simplest way to force an update of a
434 >     * CAS'ed variable. There are also other coding oddities that help
435 >     * some methods perform reasonably even when interpreted (not
436 >     * compiled).
437       *
438       * The order of declarations in this file is:
439 <     * (1) statics
440 <     * (2) fields (along with constants used when unpacking some of
441 <     *     them), listed in an order that tends to reduce contention
442 <     *     among them a bit under most JVMs;
443 <     * (3) nested classes
444 <     * (4) internal control methods
445 <     * (5) callbacks and other support for ForkJoinTask methods
446 <     * (6) exported methods (plus a few little helpers)
447 <     * (7) static block initializing all statics in a minimally
448 <     *     dependent order.
439 >     * (1) Static utility functions
440 >     * (2) Nested (static) classes
441 >     * (3) Static fields
442 >     * (4) Fields, along with constants used when unpacking some of them
443 >     * (5) Internal control methods
444 >     * (6) Callbacks and other support for ForkJoinTask methods
445 >     * (7) Exported methods
446 >     * (8) Static block initializing statics in minimally dependent order
447 >     */
448 >
449 >    // Static utilities
450 >
451 >    /**
452 >     * Computes an initial hash code (also serving as a non-zero
453 >     * random seed) for a thread id. This method is expected to
454 >     * provide higher-quality hash codes than using method hashCode().
455 >     */
456 >    static final int hashId(long id) {
457 >        int h = (int)id ^ (int)(id >>> 32); // Use MurmurHash of thread id
458 >        h ^= h >>> 16; h *= 0x85ebca6b;
459 >        h ^= h >>> 13; h *= 0xc2b2ae35;
460 >        h ^= h >>> 16;
461 >        return (h == 0)? 1 : h; // ensure nonzero
462 >    }
463 >
464 >    /**
465 >     * If there is a security manager, makes sure caller has
466 >     * permission to modify threads.
467       */
468 +    private static void checkPermission() {
469 +        SecurityManager security = System.getSecurityManager();
470 +        if (security != null)
471 +            security.checkPermission(modifyThreadPermission);
472 +    }
473 +
474 +    // Nested classes
475  
476      /**
477       * Factory for creating new {@link ForkJoinWorkerThread}s.
# Line 477 | Line 501 | public class ForkJoinPool extends Abstra
501      }
502  
503      /**
504 <     * Creates a new ForkJoinWorkerThread. This factory is used unless
505 <     * overridden in ForkJoinPool constructors.
506 <     */
507 <    public static final ForkJoinWorkerThreadFactory
508 <        defaultForkJoinWorkerThreadFactory;
509 <
510 <    /**
511 <     * Permission required for callers of methods that may start or
512 <     * kill threads.
513 <     */
514 <    private static final RuntimePermission modifyThreadPermission;
515 <
516 <    /**
517 <     * If there is a security manager, makes sure caller has
518 <     * permission to modify threads.
519 <     */
520 <    private static void checkPermission() {
521 <        SecurityManager security = System.getSecurityManager();
522 <        if (security != null)
523 <            security.checkPermission(modifyThreadPermission);
504 >     * A simple non-reentrant lock used for exclusion when managing
505 >     * queues and workers. We use a custom lock so that we can readily
506 >     * probe lock state in constructions that check among alternative
507 >     * actions. The lock is normally only very briefly held, and
508 >     * sometimes treated as a spinlock, but other usages block to
509 >     * reduce overall contention in those cases where locked code
510 >     * bodies perform allocation/resizing.
511 >     */
512 >    static final class Mutex extends AbstractQueuedSynchronizer {
513 >        public final boolean tryAcquire(int ignore) {
514 >            return compareAndSetState(0, 1);
515 >        }
516 >        public final boolean tryRelease(int ignore) {
517 >            setState(0);
518 >            return true;
519 >        }
520 >        public final void lock() { acquire(0); }
521 >        public final void unlock() { release(0); }
522 >        public final boolean isHeldExclusively() { return getState() == 1; }
523 >        public final Condition newCondition() { return new ConditionObject(); }
524      }
525  
526      /**
527 <     * Generator for assigning sequence numbers as pool names.
528 <     */
529 <    private static final AtomicInteger poolNumberGenerator;
530 <
507 <    /**
508 <     * Bits and masks for control variables
509 <     *
510 <     * Field ctl is a long packed with:
511 <     * AC: Number of active running workers minus target parallelism (16 bits)
512 <     * TC: Number of total workers minus target parallelism (16 bits)
513 <     * ST: true if pool is terminating (1 bit)
514 <     * EC: the wait count of top waiting thread (15 bits)
515 <     * ID: ~(poolIndex >>> 1) of top of Treiber stack of waiters (16 bits)
516 <     *
517 <     * When convenient, we can extract the upper 32 bits of counts and
518 <     * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
519 <     * (int)ctl.  The ec field is never accessed alone, but always
520 <     * together with id and st. The offsets of counts by the target
521 <     * parallelism and the positionings of fields makes it possible to
522 <     * perform the most common checks via sign tests of fields: When
523 <     * ac is negative, there are not enough active workers, when tc is
524 <     * negative, there are not enough total workers, when id is
525 <     * negative, there is at least one waiting worker, and when e is
526 <     * negative, the pool is terminating.  To deal with these possibly
527 <     * negative fields, we use casts in and out of "short" and/or
528 <     * signed shifts to maintain signedness.
529 <     *
530 <     * When a thread is queued (inactivated), its eventCount field is
531 <     * negative, which is the only way to tell if a worker is
532 <     * prevented from executing tasks, even though it must continue to
533 <     * scan for them to avoid queuing races.
534 <     *
535 <     * Field runState is an int packed with:
536 <     * SHUTDOWN: true if shutdown is enabled (1 bit)
537 <     * SEQ:  a sequence number updated upon (de)registering workers (15 bits)
538 <     * MASK: mask (power of 2 - 1) covering all registered poolIndexes (16 bits)
539 <     *
540 <     * The combination of mask and sequence number enables simple
541 <     * consistency checks: Staleness of read-only operations on the
542 <     * workers and queues arrays can be checked by comparing runState
543 <     * before vs after the reads. The low 16 bits (i.e, anding with
544 <     * SMASK) hold the smallest power of two covering all worker
545 <     * indices, minus one.  The mask for queues (vs workers) is twice
546 <     * this value plus 1.
547 <     */
548 <
549 <    // bit positions/shifts for fields
550 <    private static final int  AC_SHIFT   = 48;
551 <    private static final int  TC_SHIFT   = 32;
552 <    private static final int  ST_SHIFT   = 31;
553 <    private static final int  EC_SHIFT   = 16;
554 <
555 <    // bounds
556 <    private static final int  MAX_ID     = 0x7fff;  // max poolIndex
557 <    private static final int  SMASK      = 0xffff;  // mask short bits
558 <    private static final int  SHORT_SIGN = 1 << 15;
559 <    private static final int  INT_SIGN   = 1 << 31;
560 <
561 <    // masks
562 <    private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
563 <    private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
564 <    private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
565 <
566 <    // units for incrementing and decrementing
567 <    private static final long TC_UNIT    = 1L << TC_SHIFT;
568 <    private static final long AC_UNIT    = 1L << AC_SHIFT;
569 <
570 <    // masks and units for dealing with u = (int)(ctl >>> 32)
571 <    private static final int  UAC_SHIFT  = AC_SHIFT - 32;
572 <    private static final int  UTC_SHIFT  = TC_SHIFT - 32;
573 <    private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
574 <    private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
575 <    private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
576 <    private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
577 <
578 <    // masks and units for dealing with e = (int)ctl
579 <    private static final int E_MASK      = 0x7fffffff; // no STOP_BIT
580 <    private static final int E_SEQ       = 1 << EC_SHIFT;
581 <
582 <    // runState bits
583 <    private static final int SHUTDOWN    = 1 << 31;
584 <    private static final int RS_SEQ      = 1 << 16;
585 <    private static final int RS_SEQ_MASK = 0x7fff0000;
586 <
587 <    // access mode for WorkQueue
588 <    static final int LIFO_QUEUE          =  0;
589 <    static final int FIFO_QUEUE          =  1;
590 <    static final int SHARED_QUEUE        = -1;
591 <
592 <    /**
593 <     * The wakeup interval (in nanoseconds) for a worker waiting for a
594 <     * task when the pool is quiescent to instead try to shrink the
595 <     * number of workers.  The exact value does not matter too
596 <     * much. It must be short enough to release resources during
597 <     * sustained periods of idleness, but not so short that threads
598 <     * are continually re-created.
599 <     */
600 <    private static final long SHRINK_RATE =
601 <        4L * 1000L * 1000L * 1000L; // 4 seconds
602 <
603 <    /**
604 <     * The timeout value for attempted shrinkage, includes
605 <     * some slop to cope with system timer imprecision.
606 <     */
607 <    private static final long SHRINK_TIMEOUT = SHRINK_RATE - (SHRINK_RATE / 10);
608 <
609 <    /**
610 <     * The maximum stolen->joining link depth allowed in tryHelpStealer.
611 <     * Depths for legitimate chains are unbounded, but we use a fixed
612 <     * constant to avoid (otherwise unchecked) cycles and to bound
613 <     * staleness of traversal parameters at the expense of sometimes
614 <     * blocking when we could be helping.
615 <     */
616 <    private static final int MAX_HELP_DEPTH = 16;
617 <
618 <    /*
619 <     * Field layout order in this class tends to matter more than one
620 <     * would like. Runtime layout order is only loosely related to
621 <     * declaration order and may differ across JVMs, but the following
622 <     * empirically works OK on current JVMs.
527 >     * Class for artificial tasks that are used to replace the target
528 >     * of local joins if they are removed from an interior queue slot
529 >     * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
530 >     * actually do anything beyond having a unique identity.
531       */
532 <
533 <    volatile long ctl;                       // main pool control
534 <    final int parallelism;                   // parallelism level
535 <    final int localMode;                     // per-worker scheduling mode
536 <    int nextPoolIndex;                       // hint used in registerWorker
537 <    volatile int runState;                   // shutdown status, seq, and mask
630 <    WorkQueue[] workQueues;                  // main registry
631 <    final ReentrantLock lock;                // for registration
632 <    final Condition termination;             // for awaitTermination
633 <    final ForkJoinWorkerThreadFactory factory; // factory for new workers
634 <    final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
635 <    final AtomicLong stealCount;             // collect counts when terminated
636 <    final AtomicInteger nextWorkerNumber;    // to create worker name string
637 <    final String workerNamePrefix;           // Prefix for assigning worker names
532 >    static final class EmptyTask extends ForkJoinTask<Void> {
533 >        EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
534 >        public final Void getRawResult() { return null; }
535 >        public final void setRawResult(Void x) {}
536 >        public final boolean exec() { return true; }
537 >    }
538  
539      /**
540       * Queues supporting work-stealing as well as external task
# Line 685 | Line 585 | public class ForkJoinPool extends Abstra
585       * avoiding really bad worst-case access. (Until better JVM
586       * support is in place, this padding is dependent on transient
587       * properties of JVM field layout rules.)  We also take care in
588 <     * allocating and sizing and resizing the array. Non-shared queue
588 >     * allocating, sizing and resizing the array. Non-shared queue
589       * arrays are initialized (via method growArray) by workers before
590       * use. Others are allocated on first use.
591       */
# Line 774 | Line 674 | public class ForkJoinPool extends Abstra
674              boolean submitted = false;
675              if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
676                  ForkJoinTask<?>[] a = array;
677 <                int s = top, n = s - base;
677 >                int s = top;
678                  try {
679 <                    if ((a != null && n < a.length - 1) ||
679 >                    if ((a != null && a.length > s + 1 - base) ||
680                          (a = growArray(false)) != null) { // must presize
681                          int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
682                          U.putObject(a, (long)j, task);    // don't need "ordered"
# Line 794 | Line 694 | public class ForkJoinPool extends Abstra
694           * Takes next task, if one exists, in FIFO order.
695           */
696          final ForkJoinTask<?> poll() {
697 <            ForkJoinTask<?>[] a; int b, i;
698 <            while ((b = base) - top < 0 && (a = array) != null &&
699 <                   (i = (a.length - 1) & b) >= 0) {
700 <                int j = (i << ASHIFT) + ABASE;
701 <                ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
802 <                if (t != null && base == b &&
697 >            ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
698 >            while ((b = base) - top < 0 && (a = array) != null) {
699 >                int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
700 >                if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
701 >                    base == b &&
702                      U.compareAndSwapObject(a, j, t, null)) {
703                      base = b + 1;
704                      return t;
# Line 809 | Line 708 | public class ForkJoinPool extends Abstra
708          }
709  
710          /**
711 <         * Takes next task, if one exists, in LIFO order.
712 <         * Call only by owner in unshared queues.
711 >         * Takes next task, if one exists, in LIFO order.  Call only
712 >         * by owner in unshared queues. (We do not have a shared
713 >         * version of this method because it is never needed.)
714           */
715          final ForkJoinTask<?> pop() {
716              ForkJoinTask<?> t; int m;
# Line 852 | Line 752 | public class ForkJoinPool extends Abstra
752           * Returns task at index b if b is current base of queue.
753           */
754          final ForkJoinTask<?> pollAt(int b) {
755 <            ForkJoinTask<?>[] a; int i;
756 <            ForkJoinTask<?> task = null;
757 <            if ((a = array) != null && (i = ((a.length - 1) & b)) >= 0) {
758 <                int j = (i << ASHIFT) + ABASE;
759 <                ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
860 <                if (t != null && base == b &&
755 >            ForkJoinTask<?> t; ForkJoinTask<?>[] a;
756 >            if ((a = array) != null) {
757 >                int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
758 >                if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
759 >                    base == b &&
760                      U.compareAndSwapObject(a, j, t, null)) {
761                      base = b + 1;
762 <                    task = t;
762 >                    return t;
763                  }
764              }
765 <            return task;
765 >            return null;
766          }
767  
768          /**
# Line 884 | Line 783 | public class ForkJoinPool extends Abstra
783           * Polls the given task only if it is at the current base.
784           */
785          final boolean pollFor(ForkJoinTask<?> task) {
786 <            ForkJoinTask<?>[] a; int b, i;
787 <            if ((b = base) - top < 0 && (a = array) != null &&
788 <                (i = (a.length - 1) & b) >= 0) {
890 <                int j = (i << ASHIFT) + ABASE;
786 >            ForkJoinTask<?>[] a; int b;
787 >            if ((b = base) - top < 0 && (a = array) != null) {
788 >                int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
789                  if (U.getObjectVolatile(a, j) == task && base == b &&
790                      U.compareAndSwapObject(a, j, task, null)) {
791                      base = b + 1;
# Line 990 | Line 888 | public class ForkJoinPool extends Abstra
888                  ForkJoinTask.cancelIgnoringExceptions(t);
889          }
890  
891 +        /**
892 +         * Computes next value for random probes.  Scans don't require
893 +         * a very high quality generator, but also not a crummy one.
894 +         * Marsaglia xor-shift is cheap and works well enough.  Note:
895 +         * This is manually inlined in several usages in ForkJoinPool
896 +         * to avoid writes inside busy scan loops.
897 +         */
898 +        final int nextSeed() {
899 +            int r = seed;
900 +            r ^= r << 13;
901 +            r ^= r >>> 17;
902 +            return seed = r ^= r << 5;
903 +        }
904 +
905          // Execution methods
906  
907          /**
# Line 1036 | Line 948 | public class ForkJoinPool extends Abstra
948          }
949  
950          /**
951 <         * Computes next value for random probes.  Scans don't require
1040 <         * a very high quality generator, but also not a crummy one.
1041 <         * Marsaglia xor-shift is cheap and works well enough.  Note:
1042 <         * This is manually inlined in several usages in ForkJoinPool
1043 <         * to avoid writes inside busy scan loops.
951 >         * Returns true if owned and not known to be blocked.
952           */
953 <        final int nextSeed() {
954 <            int r = seed;
955 <            r ^= r << 13;
956 <            r ^= r >>> 17;
957 <            r ^= r << 5;
958 <            return seed = r;
953 >        final boolean isApparentlyUnblocked() {
954 >            Thread wt; Thread.State s;
955 >            return (eventCount >= 0 &&
956 >                    (wt = owner) != null &&
957 >                    (s = wt.getState()) != Thread.State.BLOCKED &&
958 >                    s != Thread.State.WAITING &&
959 >                    s != Thread.State.TIMED_WAITING);
960 >        }
961 >
962 >        /**
963 >         * If this owned and is not already interrupted, try to
964 >         * interrupt and/or unpark, ignoring exceptions.
965 >         */
966 >        final void interruptOwner() {
967 >            Thread wt, p;
968 >            if ((wt = owner) != null && !wt.isInterrupted()) {
969 >                try {
970 >                    wt.interrupt();
971 >                } catch (SecurityException ignore) {
972 >                }
973 >            }
974 >            if ((p = parker) != null)
975 >                U.unpark(p);
976          }
977  
978          // Unsafe mechanics
# Line 1075 | Line 1000 | public class ForkJoinPool extends Abstra
1000      }
1001  
1002      /**
1003 <     * Class for artificial tasks that are used to replace the target
1004 <     * of local joins if they are removed from an interior queue slot
1005 <     * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
1006 <     * actually do anything beyond having a unique identity.
1007 <     */
1083 <    static final class EmptyTask extends ForkJoinTask<Void> {
1084 <        EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
1085 <        public Void getRawResult() { return null; }
1086 <        public void setRawResult(Void x) {}
1087 <        public boolean exec() { return true; }
1088 <    }
1089 <
1090 <    /**
1091 <     * Per-thread records for (typically non-FJ) threads that submit
1092 <     * to pools. Cureently holds only psuedo-random seed / index that
1093 <     * is used to choose submission queues in method doSubmit. In the
1094 <     * future, this may incorporate a means to implement different
1095 <     * task rejection and resubmission policies.
1003 >     * Per-thread records for threads that submit to pools. Currently
1004 >     * holds only psuedo-random seed / index that is used to choose
1005 >     * submission queues in method doSubmit. In the future, this may
1006 >     * also incorporate a means to implement different task rejection
1007 >     * and resubmission policies.
1008       */
1009      static final class Submitter {
1010 <        int seed; // seed for random submission queue selection
1011 <
1100 <        // Heuristic padding to ameliorate unfortunate memory placements
1101 <        int p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
1102 <
1103 <        Submitter() {
1104 <            // Use identityHashCode, forced negative, for seed
1105 <            seed = System.identityHashCode(Thread.currentThread()) | (1 << 31);
1106 <        }
1107 <
1108 <        /**
1109 <         * Computes next value for random probes.  Like method
1110 <         * WorkQueue.nextSeed, this is manually inlined in several
1111 <         * usages to avoid writes inside busy loops.
1112 <         */
1113 <        final int nextSeed() {
1114 <            int r = seed;
1115 <            r ^= r << 13;
1116 <            r ^= r >>> 17;
1117 <            return seed = r ^= r << 5;
1118 <        }
1010 >        int seed;
1011 >        Submitter() { seed = hashId(Thread.currentThread().getId()); }
1012      }
1013  
1014      /** ThreadLocal class for Submitters */
# Line 1123 | Line 1016 | public class ForkJoinPool extends Abstra
1016          public Submitter initialValue() { return new Submitter(); }
1017      }
1018  
1019 +    // static fields (initialized in static initializer below)
1020 +
1021 +    /**
1022 +     * Creates a new ForkJoinWorkerThread. This factory is used unless
1023 +     * overridden in ForkJoinPool constructors.
1024 +     */
1025 +    public static final ForkJoinWorkerThreadFactory
1026 +        defaultForkJoinWorkerThreadFactory;
1027 +
1028 +    /**
1029 +     * Generator for assigning sequence numbers as pool names.
1030 +     */
1031 +    private static final AtomicInteger poolNumberGenerator;
1032 +
1033 +    /**
1034 +     * Permission required for callers of methods that may start or
1035 +     * kill threads.
1036 +     */
1037 +    private static final RuntimePermission modifyThreadPermission;
1038 +
1039      /**
1040       * Per-thread submission bookeeping. Shared across all pools
1041       * to reduce ThreadLocal pollution and because random motion
1042       * to avoid contention in one pool is likely to hold for others.
1043       */
1044 <    static final ThreadSubmitter submitters = new ThreadSubmitter();
1044 >    private static final ThreadSubmitter submitters;
1045 >
1046 >    // static constants
1047  
1048      /**
1049 <     * Top-level runloop for workers
1049 >     * The wakeup interval (in nanoseconds) for a worker waiting for a
1050 >     * task when the pool is quiescent to instead try to shrink the
1051 >     * number of workers.  The exact value does not matter too
1052 >     * much. It must be short enough to release resources during
1053 >     * sustained periods of idleness, but not so short that threads
1054 >     * are continually re-created.
1055       */
1056 <    final void runWorker(ForkJoinWorkerThread wt) {
1057 <        // Initialize queue array and seed in this thread
1138 <        WorkQueue w = wt.workQueue;
1139 <        w.growArray(false);
1140 <        // Same initial hash as Submitters
1141 <        w.seed = System.identityHashCode(Thread.currentThread()) | (1 << 31);
1056 >    private static final long SHRINK_RATE =
1057 >        4L * 1000L * 1000L * 1000L; // 4 seconds
1058  
1059 <        do {} while (w.runTask(scan(w)));
1060 <    }
1059 >    /**
1060 >     * The timeout value for attempted shrinkage, includes
1061 >     * some slop to cope with system timer imprecision.
1062 >     */
1063 >    private static final long SHRINK_TIMEOUT = SHRINK_RATE - (SHRINK_RATE / 10);
1064  
1065 <    // Creating, registering and deregistering workers
1065 >    /**
1066 >     * The maximum stolen->joining link depth allowed in tryHelpStealer.
1067 >     * Depths for legitimate chains are unbounded, but we use a fixed
1068 >     * constant to avoid (otherwise unchecked) cycles and to bound
1069 >     * staleness of traversal parameters at the expense of sometimes
1070 >     * blocking when we could be helping.
1071 >     */
1072 >    private static final int MAX_HELP_DEPTH = 16;
1073 >
1074 >    /**
1075 >     * Bits and masks for control variables
1076 >     *
1077 >     * Field ctl is a long packed with:
1078 >     * AC: Number of active running workers minus target parallelism (16 bits)
1079 >     * TC: Number of total workers minus target parallelism (16 bits)
1080 >     * ST: true if pool is terminating (1 bit)
1081 >     * EC: the wait count of top waiting thread (15 bits)
1082 >     * ID: poolIndex of top of Treiber stack of waiters (16 bits)
1083 >     *
1084 >     * When convenient, we can extract the upper 32 bits of counts and
1085 >     * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
1086 >     * (int)ctl.  The ec field is never accessed alone, but always
1087 >     * together with id and st. The offsets of counts by the target
1088 >     * parallelism and the positionings of fields makes it possible to
1089 >     * perform the most common checks via sign tests of fields: When
1090 >     * ac is negative, there are not enough active workers, when tc is
1091 >     * negative, there are not enough total workers, and when e is
1092 >     * negative, the pool is terminating.  To deal with these possibly
1093 >     * negative fields, we use casts in and out of "short" and/or
1094 >     * signed shifts to maintain signedness.
1095 >     *
1096 >     * When a thread is queued (inactivated), its eventCount field is
1097 >     * set negative, which is the only way to tell if a worker is
1098 >     * prevented from executing tasks, even though it must continue to
1099 >     * scan for them to avoid queuing races. Note however that
1100 >     * eventCount updates lag releases so usage requires care.
1101 >     *
1102 >     * Field runState is an int packed with:
1103 >     * SHUTDOWN: true if shutdown is enabled (1 bit)
1104 >     * SEQ:  a sequence number updated upon (de)registering workers (15 bits)
1105 >     * MASK: mask (power of 2 - 1) covering all registered poolIndexes (16 bits)
1106 >     *
1107 >     * The combination of mask and sequence number enables simple
1108 >     * consistency checks: Staleness of read-only operations on the
1109 >     * workQueues array can be checked by comparing runState before vs
1110 >     * after the reads. The low 16 bits (i.e, anding with SMASK) hold
1111 >     * the smallest power of two covering all indices, minus
1112 >     * one.
1113 >     */
1114 >
1115 >    // bit positions/shifts for fields
1116 >    private static final int  AC_SHIFT   = 48;
1117 >    private static final int  TC_SHIFT   = 32;
1118 >    private static final int  ST_SHIFT   = 31;
1119 >    private static final int  EC_SHIFT   = 16;
1120 >
1121 >    // bounds
1122 >    private static final int  POOL_MAX   = 0x7fff;  // max #workers - 1
1123 >    private static final int  SMASK      = 0xffff;  // short bits
1124 >    private static final int  SQMASK     = 0xfffe;  // even short bits
1125 >    private static final int  SHORT_SIGN = 1 << 15;
1126 >    private static final int  INT_SIGN   = 1 << 31;
1127 >
1128 >    // masks
1129 >    private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
1130 >    private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
1131 >    private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
1132 >
1133 >    // units for incrementing and decrementing
1134 >    private static final long TC_UNIT    = 1L << TC_SHIFT;
1135 >    private static final long AC_UNIT    = 1L << AC_SHIFT;
1136 >
1137 >    // masks and units for dealing with u = (int)(ctl >>> 32)
1138 >    private static final int  UAC_SHIFT  = AC_SHIFT - 32;
1139 >    private static final int  UTC_SHIFT  = TC_SHIFT - 32;
1140 >    private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
1141 >    private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
1142 >    private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
1143 >    private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
1144 >
1145 >    // masks and units for dealing with e = (int)ctl
1146 >    private static final int E_MASK      = 0x7fffffff; // no STOP_BIT
1147 >    private static final int E_SEQ       = 1 << EC_SHIFT;
1148 >
1149 >    // runState bits
1150 >    private static final int SHUTDOWN    = 1 << 31;
1151 >    private static final int RS_SEQ      = 1 << 16;
1152 >    private static final int RS_SEQ_MASK = 0x7fff0000;
1153 >
1154 >    // access mode for WorkQueue
1155 >    static final int LIFO_QUEUE          =  0;
1156 >    static final int FIFO_QUEUE          =  1;
1157 >    static final int SHARED_QUEUE        = -1;
1158 >
1159 >    // Instance fields
1160 >
1161 >    /*
1162 >     * Field layout order in this class tends to matter more than one
1163 >     * would like. Runtime layout order is only loosely related to
1164 >     * declaration order and may differ across JVMs, but the following
1165 >     * empirically works OK on current JVMs.
1166 >     */
1167 >
1168 >    volatile long ctl;                         // main pool control
1169 >    final int parallelism;                     // parallelism level
1170 >    final int localMode;                       // per-worker scheduling mode
1171 >    int growHints;                             // for expanding indices/ranges
1172 >    volatile int runState;                     // shutdown status, seq, and mask
1173 >    WorkQueue[] workQueues;                    // main registry
1174 >    final Mutex lock;                          // for registration
1175 >    final Condition termination;               // for awaitTermination
1176 >    final ForkJoinWorkerThreadFactory factory; // factory for new workers
1177 >    final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
1178 >    final AtomicLong stealCount;               // collect counts when terminated
1179 >    final AtomicInteger nextWorkerNumber;      // to create worker name string
1180 >    final String workerNamePrefix;             // to create worker name string
1181 >
1182 >    //  Creating, registering, deregistering and running workers
1183  
1184      /**
1185       * Tries to create and start a worker
1186       */
1187      private void addWorker() {
1188          Throwable ex = null;
1189 <        ForkJoinWorkerThread w = null;
1189 >        ForkJoinWorkerThread wt = null;
1190          try {
1191 <            if ((w = factory.newThread(this)) != null) {
1192 <                w.start();
1191 >            if ((wt = factory.newThread(this)) != null) {
1192 >                wt.start();
1193                  return;
1194              }
1195          } catch (Throwable e) {
1196              ex = e;
1197          }
1198 <        deregisterWorker(w, ex);
1198 >        deregisterWorker(wt, ex); // adjust counts etc on failure
1199      }
1200  
1201      /**
# Line 1181 | Line 1217 | public class ForkJoinPool extends Abstra
1217       */
1218      final void registerWorker(ForkJoinWorkerThread wt) {
1219          WorkQueue w = wt.workQueue;
1220 <        ReentrantLock lock = this.lock;
1220 >        Mutex lock = this.lock;
1221          lock.lock();
1222          try {
1223 <            int k = nextPoolIndex;
1223 >            int g = growHints, k = g & SMASK;
1224              WorkQueue[] ws = workQueues;
1225              if (ws != null) {                       // ignore on shutdown
1226                  int n = ws.length;
1227 <                if (k < 0 || (k & 1) == 0 || k >= n || ws[k] != null) {
1227 >                if ((k & 1) == 0 || k >= n || ws[k] != null) {
1228                      for (k = 1; k < n && ws[k] != null; k += 2)
1229                          ;                           // workers are at odd indices
1230                      if (k >= n)                     // resize
1231                          workQueues = ws = Arrays.copyOf(ws, n << 1);
1232                  }
1233 <                w.poolIndex = k;
1234 <                w.eventCount = ~(k >>> 1) & SMASK;  // Set up wait count
1235 <                ws[k] = w;                          // record worker
1200 <                nextPoolIndex = k + 2;
1233 >                w.eventCount = w.poolIndex = k;     // establish before recording
1234 >                ws[k] = w;
1235 >                growHints = (g & ~SMASK) | ((k + 2) & SMASK);
1236                  int rs = runState;
1237                  int m = rs & SMASK;                 // recalculate runState mask
1238                  if (k > m)
# Line 1210 | Line 1245 | public class ForkJoinPool extends Abstra
1245      }
1246  
1247      /**
1248 <     * Final callback from terminating worker, as well as failure to
1249 <     * construct or start a worker in addWorker.  Removes record of
1248 >     * Final callback from terminating worker, as well as upon failure
1249 >     * to construct or start a worker in addWorker.  Removes record of
1250       * worker from array, and adjusts counts. If pool is shutting
1251       * down, tries to complete termination.
1252       *
# Line 1224 | Line 1259 | public class ForkJoinPool extends Abstra
1259              w.runState = -1;                // ensure runState is set
1260              stealCount.getAndAdd(w.totalSteals + w.nsteals);
1261              int idx = w.poolIndex;
1262 <            ReentrantLock lock = this.lock;
1262 >            Mutex lock = this.lock;
1263              lock.lock();
1264              try {                           // remove record from array
1265                  WorkQueue[] ws = workQueues;
1266 <                if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
1267 <                    ws[nextPoolIndex = idx] = null;
1266 >                if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w) {
1267 >                    ws[idx] = null;
1268 >                    growHints = (growHints & ~SMASK) | idx;
1269 >                }
1270              } finally {
1271                  lock.unlock();
1272              }
# Line 1241 | Line 1278 | public class ForkJoinPool extends Abstra
1278                                             ((c - TC_UNIT) & TC_MASK) |
1279                                             (c & ~(AC_MASK|TC_MASK)))));
1280  
1281 <        if (!tryTerminate(false) && w != null) {
1281 >        if (!tryTerminate(false, false) && w != null) {
1282              w.cancelAll();                  // cancel remaining tasks
1283              if (w.array != null)            // suppress signal if never ran
1284                  signalWork();               // wake up or create replacement
1285 +            if (ex == null)                 // help clean refs on way out
1286 +                ForkJoinTask.helpExpungeStaleExceptions();
1287          }
1288  
1289          if (ex != null)                     // rethrow
# Line 1252 | Line 1291 | public class ForkJoinPool extends Abstra
1291      }
1292  
1293      /**
1294 <     * Tries to add and register a new queue at the given index.
1295 <     *
1296 <     * @param idx the workQueues array index to register the queue
1297 <     * @return the queue, or null if could not add because could
1298 <     * not acquire lock or idx is unusable
1299 <     */
1300 <    private WorkQueue tryAddSharedQueue(int idx) {
1301 <        WorkQueue q = null;
1302 <        ReentrantLock lock = this.lock;
1303 <        if (idx >= 0 && (idx & 1) == 0 && !lock.isLocked()) {
1304 <            // create queue outside of lock but only if apparently free
1305 <            WorkQueue nq = new WorkQueue(null, SHARED_QUEUE);
1306 <            if (lock.tryLock()) {
1307 <                try {
1308 <                    WorkQueue[] ws = workQueues;
1309 <                    if (ws != null && idx < ws.length) {
1310 <                        if ((q = ws[idx]) == null) {
1311 <                            int rs;         // update runState seq
1312 <                            ws[idx] = q = nq;
1313 <                            runState = (((rs = runState) & SHUTDOWN) |
1314 <                                        ((rs + RS_SEQ) & ~SHUTDOWN));
1315 <                        }
1294 >     * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1295 >     */
1296 >    final void runWorker(ForkJoinWorkerThread wt) {
1297 >        // Initialize queue array and seed in this thread
1298 >        WorkQueue w = wt.workQueue;
1299 >        w.growArray(false);
1300 >        w.seed = hashId(Thread.currentThread().getId());
1301 >
1302 >        do {} while (w.runTask(scan(w)));
1303 >    }
1304 >
1305 >    // Submissions
1306 >
1307 >    /**
1308 >     * Unless shutting down, adds the given task to a submission queue
1309 >     * at submitter's current queue index (modulo submission
1310 >     * range). If no queue exists at the index, one is created unless
1311 >     * pool lock is busy.  If the queue and/or lock are busy, another
1312 >     * index is randomly chosen. The mask in growHints controls the
1313 >     * effective index range of queues considered. The mask is
1314 >     * expanded, up to the current workerQueue mask, upon any detected
1315 >     * contention but otherwise remains small to avoid needlessly
1316 >     * creating queues when there is no contention.
1317 >     */
1318 >    private void doSubmit(ForkJoinTask<?> task) {
1319 >        if (task == null)
1320 >            throw new NullPointerException();
1321 >        Submitter s = submitters.get();
1322 >        for (int r = s.seed, m = growHints >>> 16;;) {
1323 >            WorkQueue[] ws; WorkQueue q; Mutex lk;
1324 >            int k = r & m & SQMASK;          // use only even indices
1325 >            if (runState < 0 || (ws = workQueues) == null || ws.length <= k)
1326 >                throw new RejectedExecutionException(); // shutting down
1327 >            if ((q = ws[k]) == null && (lk = lock).tryAcquire(0)) {
1328 >                try {                        // try to create new queue
1329 >                    if (ws == workQueues && (q = ws[k]) == null) {
1330 >                        int rs;              // update runState seq
1331 >                        ws[k] = q = new WorkQueue(null, SHARED_QUEUE);
1332 >                        runState = (((rs = runState) & SHUTDOWN) |
1333 >                                    ((rs + RS_SEQ) & ~SHUTDOWN));
1334                      }
1335                  } finally {
1336 <                    lock.unlock();
1336 >                    lk.unlock();
1337                  }
1338              }
1339 +            if (q != null) {
1340 +                if (q.trySharedPush(task)) {
1341 +                    signalWork();
1342 +                    return;
1343 +                }
1344 +                else if (m < parallelism - 1 && m < (runState & SMASK)) {
1345 +                    Mutex lock = this.lock;
1346 +                    lock.lock();             // block until lock free
1347 +                    int g = growHints;
1348 +                    if (g >>> 16 == m)       // expand range
1349 +                        growHints = (((m << 1) + 1) << 16) | (g & SMASK);
1350 +                    lock.unlock();           // no need for try/finally
1351 +                }
1352 +                else if ((r & m) == 0)
1353 +                    Thread.yield();          // occasionally yield if busy
1354 +            }
1355 +            if (m == (m = growHints >>> 16)) {
1356 +                r ^= r << 13;                // update seed unless new range
1357 +                r ^= r >>> 17;               // same xorshift as WorkQueues
1358 +                s.seed = r ^= r << 5;
1359 +            }
1360          }
1283        return q;
1361      }
1362  
1363      // Maintaining ctl counts
# Line 1294 | Line 1371 | public class ForkJoinPool extends Abstra
1371      }
1372  
1373      /**
1374 <     * Activates or creates a worker.
1374 >     * Tries to activate or create a worker if too few are active.
1375       */
1376      final void signalWork() {
1377 <        /*
1378 <         * The while condition is true if: (there is are too few total
1379 <         * workers OR there is at least one waiter) AND (there are too
1380 <         * few active workers OR the pool is terminating).  The value
1381 <         * of e distinguishes the remaining cases: zero (no waiters)
1382 <         * for create, negative if terminating (in which case do
1383 <         * nothing), else release a waiter. The secondary checks for
1384 <         * release (non-null array etc) can fail if the pool begins
1385 <         * terminating after the test, and don't impose any added cost
1386 <         * because JVMs must perform null and bounds checks anyway.
1387 <         */
1388 <        long c; int e, u;
1389 <        while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &
1390 <                (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN)) {
1314 <            WorkQueue[] ws = workQueues; int i; WorkQueue w; Thread p;
1315 <            if (e == 0) {                    // add a new worker
1316 <                if (U.compareAndSwapLong
1317 <                    (this, CTL, c, (long)(((u + UTC_UNIT) & UTC_MASK) |
1318 <                                          ((u + UAC_UNIT) & UAC_MASK)) << 32)) {
1319 <                    addWorker();
1320 <                    break;
1377 >        long c; int u;
1378 >        while ((u = (int)((c = ctl) >>> 32)) < 0) {     // too few active
1379 >            WorkQueue[] ws = workQueues; int e, i; WorkQueue w; Thread p;
1380 >            if ((e = (int)c) > 0) {                     // at least one waiting
1381 >                if (ws != null && (i = e & SMASK) < ws.length &&
1382 >                    (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
1383 >                    long nc = (((long)(w.nextWait & E_MASK)) |
1384 >                               ((long)(u + UAC_UNIT) << 32));
1385 >                    if (U.compareAndSwapLong(this, CTL, c, nc)) {
1386 >                        w.eventCount = (e + E_SEQ) & E_MASK;
1387 >                        if ((p = w.parker) != null)
1388 >                            U.unpark(p);                // activate and release
1389 >                        break;
1390 >                    }
1391                  }
1392 +                else
1393 +                    break;
1394              }
1395 <            else if (e > 0 && ws != null &&
1396 <                     (i = ((~e << 1) | 1) & SMASK) < ws.length &&
1397 <                     (w = ws[i]) != null &&
1398 <                     w.eventCount == (e | INT_SIGN)) {
1399 <                if (U.compareAndSwapLong
1328 <                    (this, CTL, c, (((long)(w.nextWait & E_MASK)) |
1329 <                                    ((long)(u + UAC_UNIT) << 32)))) {
1330 <                    w.eventCount = (e + E_SEQ) & E_MASK;
1331 <                    if ((p = w.parker) != null)
1332 <                        U.unpark(p);         // release a waiting worker
1395 >            else if (e == 0 && (u & SHORT_SIGN) != 0) { // too few total
1396 >                long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
1397 >                                 ((u + UAC_UNIT) & UAC_MASK)) << 32;
1398 >                if (U.compareAndSwapLong(this, CTL, c, nc)) {
1399 >                    addWorker();
1400                      break;
1401                  }
1402              }
# Line 1346 | Line 1413 | public class ForkJoinPool extends Abstra
1413       * @return true if the caller can block, else should recheck and retry
1414       */
1415      final boolean tryCompensate() {
1416 <        WorkQueue[] ws; WorkQueue w; Thread p;
1416 >        WorkQueue w; Thread p;
1417          int pc = parallelism, e, u, ac, tc, i;
1418          long c = ctl;
1419 <
1419 >        WorkQueue[] ws = workQueues;
1420          if ((e = (int)c) >= 0) {
1421              if ((ac = ((u = (int)(c >>> 32)) >> UAC_SHIFT)) <= 0 &&
1422 <                e != 0 && (ws = workQueues) != null &&
1356 <                (i = ((~e << 1) | 1) & SMASK) < ws.length &&
1422 >                e != 0 && ws != null && (i = e & SMASK) < ws.length &&
1423                  (w = ws[i]) != null) {
1424 +                long nc = (long)(w.nextWait & E_MASK) | (c & (AC_MASK|TC_MASK));
1425                  if (w.eventCount == (e | INT_SIGN) &&
1426 <                    U.compareAndSwapLong
1360 <                    (this, CTL, c, ((long)(w.nextWait & E_MASK) |
1361 <                                    (c & (AC_MASK|TC_MASK))))) {
1426 >                    U.compareAndSwapLong(this, CTL, c, nc)) {
1427                      w.eventCount = (e + E_SEQ) & E_MASK;
1428                      if ((p = w.parker) != null)
1429                          U.unpark(p);
# Line 1370 | Line 1435 | public class ForkJoinPool extends Abstra
1435                  if (U.compareAndSwapLong(this, CTL, c, nc))
1436                      return true;             // no compensation needed
1437              }
1438 <            else if (tc + pc < MAX_ID) {
1438 >            else if (tc + pc < POOL_MAX) {
1439                  long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1440                  if (U.compareAndSwapLong(this, CTL, c, nc)) {
1441                      addWorker();
# Line 1381 | Line 1446 | public class ForkJoinPool extends Abstra
1446          return false;
1447      }
1448  
1384    // Submissions
1385
1386    /**
1387     * Unless shutting down, adds the given task to a submission queue
1388     * at submitter's current queue index. If no queue exists at the
1389     * index, one is created unless pool lock is busy.  If the queue
1390     * and/or lock are busy, another index is randomly chosen.
1391     */
1392    private void doSubmit(ForkJoinTask<?> task) {
1393        if (task == null)
1394            throw new NullPointerException();
1395        Submitter s = submitters.get();
1396        for (int r = s.seed;;) {
1397            WorkQueue q; int k;
1398            int rs = runState, m = rs & SMASK;
1399            WorkQueue[] ws = workQueues;
1400            if (rs < 0 || ws == null)   // shutting down
1401                throw new RejectedExecutionException();
1402            if (ws.length > m &&        // k must be at index
1403                ((q = ws[k = (r << 1) & m]) != null ||
1404                 (q = tryAddSharedQueue(k)) != null) &&
1405                q.trySharedPush(task)) {
1406                signalWork();
1407                return;
1408            }
1409            r ^= r << 13;               // xorshift seed to new position
1410            r ^= r >>> 17;
1411            if (((s.seed = r ^= r << 5) & m) == 0)
1412                Thread.yield();         // occasionally yield if busy
1413        }
1414    }
1415
1416
1449      // Scanning for tasks
1450  
1451      /**
# Line 1425 | Line 1457 | public class ForkJoinPool extends Abstra
1457       * re-invocation.
1458       *
1459       * The scan searches for tasks across queues, randomly selecting
1460 <     * the first #queues probes, favoring steals 2:1 over submissions
1460 >     * the first #queues probes, favoring steals over submissions
1461       * (by exploiting even/odd indexing), and then performing a
1462       * circular sweep of all queues.  The scan terminates upon either
1463       * finding a non-empty queue, or completing a full sweep. If the
# Line 1434 | Line 1466 | public class ForkJoinPool extends Abstra
1466       * following actions, after which the caller will retry calling
1467       * this method unless terminated.
1468       *
1469 +     * * If pool is terminating, terminate the worker.
1470 +     *
1471       * * If not a complete sweep, try to release a waiting worker.  If
1472       * the scan terminated because the worker is inactivated, then the
1473       * released worker will often be the calling worker, and it can
# Line 1444 | Line 1478 | public class ForkJoinPool extends Abstra
1478       * * If the caller has run a task since the last empty scan,
1479       * return (to allow rescan) if other workers are not also yet
1480       * enqueued.  Field WorkQueue.rescans counts down on each scan to
1481 <     * ensure eventual inactivation, and occasional calls to
1448 <     * Thread.yield to help avoid interference with more useful
1449 <     * activities on the system.
1450 <     *
1451 <     * * If pool is terminating, terminate the worker.
1481 >     * ensure eventual inactivation and blocking.
1482       *
1483       * * If not already enqueued, try to inactivate and enqueue the
1484       * worker on wait queue.
# Line 1462 | Line 1492 | public class ForkJoinPool extends Abstra
1492       * @return a task or null of none found
1493       */
1494      private final ForkJoinTask<?> scan(WorkQueue w) {
1495 <        boolean swept = false;                 // true after full empty scan
1496 <        WorkQueue[] ws;                        // volatile read order matters
1497 <        int r = w.seed, ec = w.eventCount;     // ec is negative if inactive
1495 >        boolean swept = false;               // true after full empty scan
1496 >        WorkQueue[] ws;                      // volatile read order matters
1497 >        int r = w.seed, ec = w.eventCount;   // ec is negative if inactive
1498          int rs = runState, m = rs & SMASK;
1499 <        if ((ws = workQueues) != null && ws.length > m) {
1500 <            ForkJoinTask<?> task = null;
1471 <            for (int k = 0, j = -2 - m; ; ++j) {
1499 >        if ((ws = workQueues) != null && ws.length > m) { // consistency check
1500 >            for (int k = 0, j = -1 - m; ; ++j) {
1501                  WorkQueue q; int b;
1502 <                if (j < 0) {                   // random probes while j negative
1502 >                if (j < 0) {                 // random probes while j negative
1503                      r ^= r << 13; r ^= r >>> 17; k = (r ^= r << 5) | (j & 1);
1504 <                }                              // worker (not submit) for odd j
1505 <                else                           // cyclic scan when j >= 0
1506 <                    k += (m >>> 1) | 1;        // step by half to reduce bias
1478 <
1504 >                }                            // worker (not submit) for odd j
1505 >                else                         // cyclic scan when j >= 0
1506 >                    k += 7;                  // step 7 reduces array packing bias
1507                  if ((q = ws[k & m]) != null && (b = q.base) - q.top < 0) {
1508 <                    if (ec >= 0)
1509 <                        task = q.pollAt(b);    // steal
1508 >                    ForkJoinTask<?> t = (ec >= 0) ? q.pollAt(b) : null;
1509 >                    w.seed = r;              // save seed for next scan
1510 >                    if (t != null)
1511 >                        return t;
1512                      break;
1513                  }
1514 <                else if (j > m) {
1515 <                    if (rs == runState)        // staleness check
1514 >                else if (j - m > m) {
1515 >                    if (rs == runState)      // staleness check
1516                          swept = true;
1517                      break;
1518                  }
1519              }
1520 <            w.seed = r;                        // save seed for next scan
1521 <            if (task != null)
1522 <                return task;
1523 <        }
1524 <
1525 <        // Decode ctl on empty scan
1526 <        long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns;
1527 <        if (!swept) {                          // try to release a waiter
1528 <            WorkQueue v; Thread p;
1529 <            if (e > 0 && a < 0 && ws != null &&
1530 <                (v = ws[((~e << 1) | 1) & m]) != null &&
1531 <                v.eventCount == (e | INT_SIGN) && U.compareAndSwapLong
1532 <                (this, CTL, c, ((long)(v.nextWait & E_MASK) |
1533 <                                ((c + AC_UNIT) & (AC_MASK|TC_MASK))))) {
1534 <                v.eventCount = (e + E_SEQ) & E_MASK;
1535 <                if ((p = v.parker) != null)
1536 <                    U.unpark(p);
1537 <            }
1538 <        }
1539 <        else if ((nr = w.rescans) > 0) {       // continue rescanning
1540 <            int ac = a + parallelism;
1541 <            if ((w.rescans = (ac < nr) ? ac : nr - 1) > 0 && w.seed < 0 &&
1542 <                w.eventCount == ec)
1543 <                Thread.yield();                // 1 bit randomness for yield call
1544 <        }
1545 <        else if (e < 0)                        // pool is terminating
1546 <            w.runState = -1;
1547 <        else if (ec >= 0) {                    // try to enqueue
1548 <            long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
1549 <            w.nextWait = e;
1550 <            w.eventCount = ec | INT_SIGN;      // mark as inactive
1551 <            if (!U.compareAndSwapLong(this, CTL, c, nc))
1522 <                w.eventCount = ec;             // back out on CAS failure
1523 <            else if ((ns = w.nsteals) != 0) {  // set rescans if ran task
1524 <                if (a <= 0)                    // ... unless too many active
1520 >
1521 >            // Decode ctl on empty scan
1522 >            long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns;
1523 >            if (e < 0)                       // pool is terminating
1524 >                w.runState = -1;
1525 >            else if (!swept) {               // try to release a waiter
1526 >                WorkQueue v; Thread p;
1527 >                if (e > 0 && a < 0 && (v = ws[e & m]) != null &&
1528 >                    v.eventCount == (e | INT_SIGN)) {
1529 >                    long nc = ((long)(v.nextWait & E_MASK) |
1530 >                               ((c + AC_UNIT) & (AC_MASK|TC_MASK)));
1531 >                    if (U.compareAndSwapLong(this, CTL, c, nc)) {
1532 >                        v.eventCount = (e + E_SEQ) & E_MASK;
1533 >                        if ((p = v.parker) != null)
1534 >                            U.unpark(p);
1535 >                    }
1536 >                }
1537 >            }
1538 >            else if ((nr = w.rescans) > 0) { // continue rescanning
1539 >                int ac = a + parallelism;
1540 >                if (((w.rescans = (ac < nr) ? ac : nr - 1) & 3) == 0 &&
1541 >                    w.eventCount == ec)
1542 >                    Thread.yield();          // occasionally yield
1543 >            }
1544 >            else if (ec >= 0) {              // try to enqueue
1545 >                long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
1546 >                w.nextWait = e;
1547 >                w.eventCount = ec | INT_SIGN;// mark as inactive
1548 >                if (!U.compareAndSwapLong(this, CTL, c, nc))
1549 >                    w.eventCount = ec;       // unmark on CAS failure
1550 >                else if ((ns = w.nsteals) != 0) {
1551 >                    w.nsteals = 0;           // set rescans if ran task
1552                      w.rescans = a + parallelism;
1553 <                w.nsteals = 0;
1554 <                w.totalSteals += ns;
1553 >                    w.totalSteals += ns;
1554 >                }
1555              }
1556 <        }
1557 <        else{                                  // already queued
1558 <            if (parallelism == -a)
1559 <                idleAwaitWork(w);              // quiescent
1560 <            if (w.eventCount == ec) {
1561 <                Thread.interrupted();          // clear status
1562 <                ForkJoinWorkerThread wt = w.owner;
1563 <                U.putObject(wt, PARKBLOCKER, this);
1564 <                w.parker = wt;                 // emulate LockSupport.park
1565 <                if (w.eventCount == ec)        // recheck
1566 <                    U.park(false, 0L);         // block
1567 <                w.parker = null;
1568 <                U.putObject(wt, PARKBLOCKER, null);
1556 >            else{                            // already queued
1557 >                if (parallelism == -a)
1558 >                    idleAwaitWork(w);        // quiescent
1559 >                if (w.eventCount == ec) {
1560 >                    Thread.interrupted();    // clear status
1561 >                    ForkJoinWorkerThread wt = w.owner;
1562 >                    U.putObject(wt, PARKBLOCKER, this);
1563 >                    w.parker = wt;           // emulate LockSupport.park
1564 >                    if (w.eventCount == ec)  // recheck
1565 >                        U.park(false, 0L);   // block
1566 >                    w.parker = null;
1567 >                    U.putObject(wt, PARKBLOCKER, null);
1568 >                }
1569              }
1570          }
1571          return null;
# Line 1556 | Line 1583 | public class ForkJoinPool extends Abstra
1583       */
1584      private void idleAwaitWork(WorkQueue w) {
1585          long c; int nw, ec;
1586 <        if (!tryTerminate(false) &&
1586 >        if (!tryTerminate(false, false) &&
1587              (int)((c = ctl) >> AC_SHIFT) + parallelism == 0 &&
1588              (ec = w.eventCount) == ((int)c | INT_SIGN) &&
1589              (nw = w.nextWait) != 0) {
1590              long nc = ((long)(nw & E_MASK) | // ctl to restore on timeout
1591                         ((c + AC_UNIT) & AC_MASK) | (c & TC_MASK));
1565            ForkJoinTask.helpExpungeStaleExceptions(); // help clean
1592              ForkJoinWorkerThread wt = w.owner;
1593              while (ctl == c) {
1594                  long startTime = System.nanoTime();
# Line 1577 | Line 1603 | public class ForkJoinPool extends Abstra
1603                      break;
1604                  if (System.nanoTime() - startTime >= SHRINK_TIMEOUT &&
1605                      U.compareAndSwapLong(this, CTL, c, nc)) {
1580                    w.runState = -1;          // shrink
1606                      w.eventCount = (ec + E_SEQ) | E_MASK;
1607 +                    w.runState = -1;          // shrink
1608                      break;
1609                  }
1610              }
# Line 1690 | Line 1716 | public class ForkJoinPool extends Abstra
1716                  return null;
1717              if (ws.length > m) {
1718                  WorkQueue q;
1719 <                for (int n = m << 2, k = r, j = -n;;) {
1720 <                    r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
1719 >                for (int k = 0, j = -1 - m;; ++j) {
1720 >                    if (j < 0) {
1721 >                        r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
1722 >                    }
1723 >                    else
1724 >                        k += 7;
1725                      if ((q = ws[(k | 1) & m]) != null && q.base - q.top < 0) {
1726                          w.seed = r;
1727                          return q;
1728                      }
1729 <                    else if (j > n)
1729 >                    else if (j - m > m)
1730                          return null;
1701                    else
1702                        k = (j++ < 0) ? r : k + ((m >>> 1) | 1);
1703
1731                  }
1732              }
1733          }
# Line 1778 | Line 1805 | public class ForkJoinPool extends Abstra
1805                  8);
1806      }
1807  
1808 <    // Termination
1808 >    //  Termination
1809  
1810      /**
1811 <     * Sets SHUTDOWN bit of runState under lock
1812 <     */
1813 <    private void enableShutdown() {
1814 <        ReentrantLock lock = this.lock;
1815 <        if (runState >= 0) {
1816 <            lock.lock();                       // don't need try/finally
1817 <            runState |= SHUTDOWN;
1791 <            lock.unlock();
1792 <        }
1793 <    }
1794 <
1795 <    /**
1796 <     * Possibly initiates and/or completes termination.  Upon
1797 <     * termination, cancels all queued tasks and then
1811 >     * Possibly initiates and/or completes termination.  The caller
1812 >     * triggering termination runs three passes through workQueues:
1813 >     * (0) Setting termination status, followed by wakeups of queued
1814 >     * workers; (1) cancelling all tasks; (2) interrupting lagging
1815 >     * threads (likely in external tasks, but possibly also blocked in
1816 >     * joins).  Each pass repeats previous steps because of potential
1817 >     * lagging thread creation.
1818       *
1819       * @param now if true, unconditionally terminate, else only
1820       * if no work and no active workers
1821 +     * @paran enable if true, enable shutdown when next possible
1822       * @return true if now terminating or terminated
1823       */
1824 <    private boolean tryTerminate(boolean now) {
1824 >    private boolean tryTerminate(boolean now, boolean enable) {
1825 >        Mutex lock = this.lock;
1826          for (long c;;) {
1827              if (((c = ctl) & STOP_BIT) != 0) {      // already terminating
1828                  if ((short)(c >>> TC_SHIFT) == -parallelism) {
1807                    ReentrantLock lock = this.lock; // signal when no workers
1829                      lock.lock();                    // don't need try/finally
1830                      termination.signalAll();        // signal when 0 workers
1831                      lock.unlock();
1832                  }
1833                  return true;
1834              }
1835 <            if (!now) {
1836 <                if ((int)(c >> AC_SHIFT) != -parallelism || runState >= 0 ||
1835 >            if (runState >= 0) {                    // not yet enabled
1836 >                if (!enable)
1837 >                    return false;
1838 >                lock.lock();
1839 >                runState |= SHUTDOWN;
1840 >                lock.unlock();
1841 >            }
1842 >            if (!now) {                             // check if idle & no tasks
1843 >                if ((int)(c >> AC_SHIFT) != -parallelism ||
1844                      hasQueuedSubmissions())
1845                      return false;
1846                  // Check for unqueued inactive workers. One pass suffices.
1847                  WorkQueue[] ws = workQueues; WorkQueue w;
1848                  if (ws != null) {
1849 <                    int n = ws.length;
1822 <                    for (int i = 1; i < n; i += 2) {
1849 >                    for (int i = 1; i < ws.length; i += 2) {
1850                          if ((w = ws[i]) != null && w.eventCount >= 0)
1851                              return false;
1852                      }
1853                  }
1854              }
1855 <            if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT))
1856 <                startTerminating();
1857 <        }
1858 <    }
1859 <
1860 <    /**
1861 <     * Initiates termination: Runs three passes through workQueues:
1862 <     * (0) Setting termination status, followed by wakeups of queued
1863 <     * workers; (1) cancelling all tasks; (2) interrupting lagging
1864 <     * threads (likely in external tasks, but possibly also blocked in
1865 <     * joins).  Each pass repeats previous steps because of potential
1866 <     * lagging thread creation.
1867 <     */
1841 <    private void startTerminating() {
1842 <        for (int pass = 0; pass < 3; ++pass) {
1843 <            WorkQueue[] ws = workQueues;
1844 <            if (ws != null) {
1845 <                WorkQueue w; Thread wt;
1846 <                int n = ws.length;
1847 <                for (int j = 0; j < n; ++j) {
1848 <                    if ((w = ws[j]) != null) {
1849 <                        w.runState = -1;
1850 <                        if (pass > 0) {
1851 <                            w.cancelAll();
1852 <                            if (pass > 1 && (wt = w.owner) != null &&
1853 <                                !wt.isInterrupted()) {
1854 <                                try {
1855 <                                    wt.interrupt();
1856 <                                } catch (SecurityException ignore) {
1855 >            if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
1856 >                for (int pass = 0; pass < 3; ++pass) {
1857 >                    WorkQueue[] ws = workQueues;
1858 >                    if (ws != null) {
1859 >                        WorkQueue w;
1860 >                        int n = ws.length;
1861 >                        for (int i = 0; i < n; ++i) {
1862 >                            if ((w = ws[i]) != null) {
1863 >                                w.runState = -1;
1864 >                                if (pass > 0) {
1865 >                                    w.cancelAll();
1866 >                                    if (pass > 1)
1867 >                                        w.interruptOwner();
1868                                  }
1869                              }
1870                          }
1871 <                    }
1872 <                }
1873 <                // Wake up workers parked on event queue
1874 <                int i, e; long c; Thread p;
1875 <                while ((i = ((~(e = (int)(c = ctl)) << 1) | 1) & SMASK) < n &&
1876 <                       (w = ws[i]) != null &&
1877 <                       w.eventCount == (e | INT_SIGN)) {
1878 <                    long nc = ((long)(w.nextWait & E_MASK) |
1879 <                               ((c + AC_UNIT) & AC_MASK) |
1880 <                               (c & (TC_MASK|STOP_BIT)));
1881 <                    if (U.compareAndSwapLong(this, CTL, c, nc)) {
1882 <                        w.eventCount = (e + E_SEQ) & E_MASK;
1883 <                        if ((p = w.parker) != null)
1884 <                            U.unpark(p);
1871 >                        // Wake up workers parked on event queue
1872 >                        int i, e; long cc; Thread p;
1873 >                        while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
1874 >                               (i = e & SMASK) < n &&
1875 >                               (w = ws[i]) != null) {
1876 >                            long nc = ((long)(w.nextWait & E_MASK) |
1877 >                                       ((cc + AC_UNIT) & AC_MASK) |
1878 >                                       (cc & (TC_MASK|STOP_BIT)));
1879 >                            if (w.eventCount == (e | INT_SIGN) &&
1880 >                                U.compareAndSwapLong(this, CTL, cc, nc)) {
1881 >                                w.eventCount = (e + E_SEQ) & E_MASK;
1882 >                                w.runState = -1;
1883 >                                if ((p = w.parker) != null)
1884 >                                    U.unpark(p);
1885 >                            }
1886 >                        }
1887                      }
1888                  }
1889              }
# Line 1946 | Line 1959 | public class ForkJoinPool extends Abstra
1959          checkPermission();
1960          if (factory == null)
1961              throw new NullPointerException();
1962 <        if (parallelism <= 0 || parallelism > MAX_ID)
1962 >        if (parallelism <= 0 || parallelism > POOL_MAX)
1963              throw new IllegalArgumentException();
1964          this.parallelism = parallelism;
1965          this.factory = factory;
1966          this.ueh = handler;
1967          this.localMode = asyncMode ? FIFO_QUEUE : LIFO_QUEUE;
1968 <        this.nextPoolIndex = 1;
1968 >        this.growHints = 1;
1969          long np = (long)(-parallelism); // offset ctl counts
1970          this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
1971          // initialize workQueues array with room for 2*parallelism if possible
1972          int n = parallelism << 1;
1973 <        if (n >= MAX_ID)
1974 <            n = MAX_ID;
1973 >        if (n >= POOL_MAX)
1974 >            n = POOL_MAX;
1975          else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
1976              n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
1977          }
1978 <        this.workQueues = new WorkQueue[(n + 1) << 1];
1979 <        ReentrantLock lck = this.lock = new ReentrantLock();
1967 <        this.termination = lck.newCondition();
1978 >        this.workQueues = new WorkQueue[(n + 1) << 1]; // #slots = 2 * #workers
1979 >        this.termination = (this.lock = new Mutex()).newCondition();
1980          this.stealCount = new AtomicLong();
1981          this.nextWorkerNumber = new AtomicInteger();
1982          StringBuilder sb = new StringBuilder("ForkJoinPool-");
1983          sb.append(poolNumberGenerator.incrementAndGet());
1984          sb.append("-worker-");
1985          this.workerNamePrefix = sb.toString();
1974        // Create initial submission queue
1975        WorkQueue sq = tryAddSharedQueue(0);
1976        if (sq != null)
1977            sq.growArray(false);
1986      }
1987  
1988      // Execution methods
# Line 2092 | Line 2100 | public class ForkJoinPool extends Abstra
2100       * @throws RejectedExecutionException {@inheritDoc}
2101       */
2102      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2103 <        ArrayList<ForkJoinTask<T>> forkJoinTasks =
2104 <            new ArrayList<ForkJoinTask<T>>(tasks.size());
2105 <        for (Callable<T> task : tasks)
2106 <            forkJoinTasks.add(ForkJoinTask.adapt(task));
2107 <        invoke(new InvokeAll<T>(forkJoinTasks));
2108 <
2103 >        // In previous versions of this class, this method constructed
2104 >        // a task to run ForkJoinTask.invokeAll, but now external
2105 >        // invocation of multiple tasks is at least as efficient.
2106 >        List<ForkJoinTask<T>> fs = new ArrayList<ForkJoinTask<T>>(tasks.size());
2107 >        // Workaround needed because method wasn't declared with
2108 >        // wildcards in return type but should have been.
2109          @SuppressWarnings({"unchecked", "rawtypes"})
2110 <            List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
2103 <        return futures;
2104 <    }
2110 >            List<Future<T>> futures = (List<Future<T>>) (List) fs;
2111  
2112 <    static final class InvokeAll<T> extends RecursiveAction {
2113 <        final ArrayList<ForkJoinTask<T>> tasks;
2114 <        InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
2115 <        public void compute() {
2116 <            try { invokeAll(tasks); }
2117 <            catch (Exception ignore) {}
2112 >        boolean done = false;
2113 >        try {
2114 >            for (Callable<T> t : tasks) {
2115 >                ForkJoinTask<T> f = ForkJoinTask.adapt(t);
2116 >                doSubmit(f);
2117 >                fs.add(f);
2118 >            }
2119 >            for (ForkJoinTask<T> f : fs)
2120 >                f.quietlyJoin();
2121 >            done = true;
2122 >            return futures;
2123 >        } finally {
2124 >            if (!done)
2125 >                for (ForkJoinTask<T> f : fs)
2126 >                    f.cancel(false);
2127          }
2113        private static final long serialVersionUID = -7914297376763021607L;
2128      }
2129  
2130      /**
# Line 2175 | Line 2189 | public class ForkJoinPool extends Abstra
2189          int rc = 0;
2190          WorkQueue[] ws; WorkQueue w;
2191          if ((ws = workQueues) != null) {
2192 <            int n = ws.length;
2193 <            for (int i = 1; i < n; i += 2) {
2180 <                Thread.State s; ForkJoinWorkerThread wt;
2181 <                if ((w = ws[i]) != null && (wt = w.owner) != null &&
2182 <                    w.eventCount >= 0 &&
2183 <                    (s = wt.getState()) != Thread.State.BLOCKED &&
2184 <                    s != Thread.State.WAITING &&
2185 <                    s != Thread.State.TIMED_WAITING)
2192 >            for (int i = 1; i < ws.length; i += 2) {
2193 >                if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2194                      ++rc;
2195              }
2196          }
# Line 2231 | Line 2239 | public class ForkJoinPool extends Abstra
2239          long count = stealCount.get();
2240          WorkQueue[] ws; WorkQueue w;
2241          if ((ws = workQueues) != null) {
2242 <            int n = ws.length;
2235 <            for (int i = 1; i < n; i += 2) {
2242 >            for (int i = 1; i < ws.length; i += 2) {
2243                  if ((w = ws[i]) != null)
2244                      count += w.totalSteals;
2245              }
# Line 2254 | Line 2261 | public class ForkJoinPool extends Abstra
2261          long count = 0;
2262          WorkQueue[] ws; WorkQueue w;
2263          if ((ws = workQueues) != null) {
2264 <            int n = ws.length;
2258 <            for (int i = 1; i < n; i += 2) {
2264 >            for (int i = 1; i < ws.length; i += 2) {
2265                  if ((w = ws[i]) != null)
2266                      count += w.queueSize();
2267              }
# Line 2274 | Line 2280 | public class ForkJoinPool extends Abstra
2280          int count = 0;
2281          WorkQueue[] ws; WorkQueue w;
2282          if ((ws = workQueues) != null) {
2283 <            int n = ws.length;
2278 <            for (int i = 0; i < n; i += 2) {
2283 >            for (int i = 0; i < ws.length; i += 2) {
2284                  if ((w = ws[i]) != null)
2285                      count += w.queueSize();
2286              }
# Line 2292 | Line 2297 | public class ForkJoinPool extends Abstra
2297      public boolean hasQueuedSubmissions() {
2298          WorkQueue[] ws; WorkQueue w;
2299          if ((ws = workQueues) != null) {
2300 <            int n = ws.length;
2296 <            for (int i = 0; i < n; i += 2) {
2300 >            for (int i = 0; i < ws.length; i += 2) {
2301                  if ((w = ws[i]) != null && w.queueSize() != 0)
2302                      return true;
2303              }
# Line 2311 | Line 2315 | public class ForkJoinPool extends Abstra
2315      protected ForkJoinTask<?> pollSubmission() {
2316          WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2317          if ((ws = workQueues) != null) {
2318 <            int n = ws.length;
2315 <            for (int i = 0; i < n; i += 2) {
2318 >            for (int i = 0; i < ws.length; i += 2) {
2319                  if ((w = ws[i]) != null && (t = w.poll()) != null)
2320                      return t;
2321              }
# Line 2341 | Line 2344 | public class ForkJoinPool extends Abstra
2344          int count = 0;
2345          WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2346          if ((ws = workQueues) != null) {
2347 <            int n = ws.length;
2345 <            for (int i = 0; i < n; ++i) {
2347 >            for (int i = 0; i < ws.length; ++i) {
2348                  if ((w = ws[i]) != null) {
2349                      while ((t = w.poll()) != null) {
2350                          c.add(t);
# Line 2362 | Line 2364 | public class ForkJoinPool extends Abstra
2364       * @return a string identifying this pool, as well as its state
2365       */
2366      public String toString() {
2367 <        long st = getStealCount();
2368 <        long qt = getQueuedTaskCount();
2369 <        long qs = getQueuedSubmissionCount();
2368 <        int rc = getRunningThreadCount();
2369 <        int pc = parallelism;
2367 >        // Use a single pass through workQueues to collect counts
2368 >        long qt = 0L, qs = 0L; int rc = 0;
2369 >        long st = stealCount.get();
2370          long c = ctl;
2371 +        WorkQueue[] ws; WorkQueue w;
2372 +        if ((ws = workQueues) != null) {
2373 +            for (int i = 0; i < ws.length; ++i) {
2374 +                if ((w = ws[i]) != null) {
2375 +                    int size = w.queueSize();
2376 +                    if ((i & 1) == 0)
2377 +                        qs += size;
2378 +                    else {
2379 +                        qt += size;
2380 +                        st += w.totalSteals;
2381 +                        if (w.isApparentlyUnblocked())
2382 +                            ++rc;
2383 +                    }
2384 +                }
2385 +            }
2386 +        }
2387 +        int pc = parallelism;
2388          int tc = pc + (short)(c >>> TC_SHIFT);
2389          int ac = pc + (int)(c >> AC_SHIFT);
2390          if (ac < 0) // ignore transient negative
# Line 2403 | Line 2420 | public class ForkJoinPool extends Abstra
2420       */
2421      public void shutdown() {
2422          checkPermission();
2423 <        enableShutdown();
2407 <        tryTerminate(false);
2423 >        tryTerminate(false, true);
2424      }
2425  
2426      /**
# Line 2425 | Line 2441 | public class ForkJoinPool extends Abstra
2441       */
2442      public List<Runnable> shutdownNow() {
2443          checkPermission();
2444 <        enableShutdown();
2429 <        tryTerminate(true);
2444 >        tryTerminate(true, true);
2445          return Collections.emptyList();
2446      }
2447  
# Line 2483 | Line 2498 | public class ForkJoinPool extends Abstra
2498      public boolean awaitTermination(long timeout, TimeUnit unit)
2499          throws InterruptedException {
2500          long nanos = unit.toNanos(timeout);
2501 <        final ReentrantLock lock = this.lock;
2501 >        final Mutex lock = this.lock;
2502          lock.lock();
2503          try {
2504              for (;;) {
# Line 2624 | Line 2639 | public class ForkJoinPool extends Abstra
2639      // Unsafe mechanics
2640      private static final sun.misc.Unsafe U;
2641      private static final long CTL;
2627    private static final long RUNSTATE;
2642      private static final long PARKBLOCKER;
2643  
2644      static {
# Line 2632 | Line 2646 | public class ForkJoinPool extends Abstra
2646          modifyThreadPermission = new RuntimePermission("modifyThread");
2647          defaultForkJoinWorkerThreadFactory =
2648              new DefaultForkJoinWorkerThreadFactory();
2649 <        int s;
2649 >        submitters = new ThreadSubmitter();
2650          try {
2651              U = getUnsafe();
2652              Class<?> k = ForkJoinPool.class;
2639            Class<?> tk = Thread.class;
2653              CTL = U.objectFieldOffset
2654                  (k.getDeclaredField("ctl"));
2655 <            RUNSTATE = U.objectFieldOffset
2643 <                (k.getDeclaredField("runState"));
2655 >            Class<?> tk = Thread.class;
2656              PARKBLOCKER = U.objectFieldOffset
2657                  (tk.getDeclaredField("parkBlocker"));
2658          } catch (Exception e) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines