ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.63 by dl, Fri Aug 13 16:21:23 2010 UTC vs.
Revision 1.64 by dl, Tue Aug 17 18:30:32 2010 UTC

# Line 259 | Line 259 | public class ForkJoinPool extends Abstra
259       * workers that previously could not find a task to now find one:
260       * Submission of a new task to the pool, or another worker pushing
261       * a task onto a previously empty queue.  (We also use this
262 <     * mechanism for termination actions that require wakeups of idle
263 <     * workers).  Each worker maintains its last known event count,
264 <     * and blocks when a scan for work did not find a task AND its
265 <     * lastEventCount matches the current eventCount. Waiting idle
266 <     * workers are recorded in a variant of Treiber stack headed by
267 <     * field eventWaiters which, when nonzero, encodes the thread
268 <     * index and count awaited for by the worker thread most recently
269 <     * calling eventSync. This thread in turn has a record (field
270 <     * nextEventWaiter) for the next waiting worker.  In addition to
271 <     * allowing simpler decisions about need for wakeup, the event
272 <     * count bits in eventWaiters serve the role of tags to avoid ABA
273 <     * errors in Treiber stacks.  To reduce delays in task diffusion,
274 <     * workers not otherwise occupied may invoke method
275 <     * releaseEventWaiters, that removes and signals (unparks) workers
276 <     * not waiting on current count. To reduce stalls, To minimize
277 <     * task production stalls associate with signalling, any worker
278 <     * pushing a task on an empty queue invokes the weaker method
279 <     * signalWork, that only releases idle workers until it detects
280 <     * interference by other threads trying to release, and lets them
281 <     * take over.  The net effect is a tree-like diffusion of signals,
282 <     * where released threads (and possibly others) help with unparks.
283 <     * To further reduce contention effects a bit, failed CASes to
284 <     * increment field eventCount are tolerated without retries.
262 >     * mechanism for configuration and termination actions that
263 >     * require wakeups of idle workers).  Each worker maintains its
264 >     * last known event count, and blocks when a scan for work did not
265 >     * find a task AND its lastEventCount matches the current
266 >     * eventCount. Waiting idle workers are recorded in a variant of
267 >     * Treiber stack headed by field eventWaiters which, when nonzero,
268 >     * encodes the thread index and count awaited for by the worker
269 >     * thread most recently calling eventSync. This thread in turn has
270 >     * a record (field nextEventWaiter) for the next waiting worker.
271 >     * In addition to allowing simpler decisions about need for
272 >     * wakeup, the event count bits in eventWaiters serve the role of
273 >     * tags to avoid ABA errors in Treiber stacks. Upon any wakeup,
274 >     * released threads also try to release others (but give up upon
275 >     * contention to reduce useless flailing).  The net effect is a
276 >     * tree-like diffusion of signals, where released threads (and
277 >     * possibly others) help with unparks.  To further reduce
278 >     * contention effects a bit, failed CASes to increment field
279 >     * eventCount are tolerated without retries in signalWork.
280       * Conceptually they are merged into the same event, which is OK
281       * when their only purpose is to enable workers to scan for work.
282       *
# Line 304 | Line 299 | public class ForkJoinPool extends Abstra
299       * may become a spare at about the same time as another is
300       * needlessly being created. We counteract this and related slop
301       * in part by requiring resumed spares to immediately recheck (in
302 <     * preStep) to see whether they they should re-suspend.  To avoid
308 <     * long-term build-up of spares, the oldest spare (see
309 <     * ForkJoinWorkerThread.suspendAsSpare) occasionally wakes up if
310 <     * not signalled and calls tryTrimSpare, which uses two different
311 <     * thresholds: Always killing if the number of spares is greater
312 <     * that 25% of total, and killing others only at a slower rate
313 <     * (UNUSED_SPARE_TRIM_RATE_NANOS).
302 >     * preStep) to see whether they they should re-suspend.
303       *
304 <     * 6. Deciding when to create new workers. The main dynamic
304 >     * 6. Killing off unneeded workers. The Spare and Event queues use
305 >     * similar mechanisms to shed unused workers: The oldest (first)
306 >     * waiter uses a timed rather than hard wait. When this wait times
307 >     * out without a normal wakeup, it tries to shutdown any one (for
308 >     * convenience the newest) other waiter via tryShutdownSpare or
309 >     * tryShutdownWaiter, respectively. The wakeup rates for spares
310 >     * are much shorter than for waiters. Together, they will
311 >     * eventually reduce the number of worker threads to a minimum of
312 >     * one after a long enough period without use.
313 >     *
314 >     * 7. Deciding when to create new workers. The main dynamic
315       * control in this class is deciding when to create extra threads
316       * in method helpMaintainParallelism. We would like to keep
317       * exactly #parallelism threads running, which is an impossble
# Line 326 | Line 325 | public class ForkJoinPool extends Abstra
325       * impedes accuracy. Our main defense is to allow some slack in
326       * creation thresholds, using rules that reflect the fact that the
327       * more threads we have running, the more likely that we are
328 <     * underestimating the number running threads. The rules also
329 <     * better cope with the fact that some of the methods in this
328 >     * underestimating the number running threads. (We also include
329 >     * some heuristic use of Thread.yield when all workers appear to
330 >     * be busy, to improve likelihood of counts settling.) The rules
331 >     * also better cope with the fact that some of the methods in this
332       * class tend to never become compiled (but are interpreted), so
333       * some components of the entire set of controls might execute 100
334       * times faster than others. And similarly for cases where the
# Line 419 | Line 420 | public class ForkJoinPool extends Abstra
420          new AtomicInteger();
421  
422      /**
423 +     * The wakeup interval (in nanoseconds) for the oldest worker
424 +     * worker waiting for an event invokes tryShutdownWaiter to shrink
425 +     * the number of workers.  The exact value does not matter too
426 +     * much, but should be long enough to slowly release resources
427 +     * during long periods without use without disrupting normal use.
428 +     */
429 +    private static final long SHRINK_RATE_NANOS =
430 +        60L * 1000L * 1000L * 1000L; // one minute
431 +
432 +    /**
433       * Absolute bound for parallelism level. Twice this number plus
434       * one (i.e., 0xfff) must fit into a 16bit field to enable
435       * word-packing for some counts and indices.
# Line 463 | Line 474 | public class ForkJoinPool extends Abstra
474      private volatile long stealCount;
475  
476      /**
466     * The last nanoTime that a spare thread was trimmed
467     */
468    private volatile long trimTime;
469
470    /**
471     * The rate at which to trim unused spares
472     */
473    static final long UNUSED_SPARE_TRIM_RATE_NANOS =
474        1000L * 1000L * 1000L; // 1 sec
475
476    /**
477       * Encoded record of top of treiber stack of threads waiting for
478       * events. The top 32 bits contain the count being waited for. The
479       * bottom 16 bits contains one plus the pool index of waiting
# Line 514 | Line 514 | public class ForkJoinPool extends Abstra
514       * These are bundled together to ensure consistent read for
515       * termination checks (i.e., that runLevel is at least SHUTDOWN
516       * and active threads is zero).
517 +     *
518 +     * Notes: Most direct CASes are dependent on these bitfield
519 +     * positions.  Also, this field is non-private to enable direct
520 +     * performance-sensitive CASes in ForkJoinWorkerThread.
521       */
522 <    private volatile int runState;
522 >    volatile int runState;
523  
524      // Note: The order among run level values matters.
525      private static final int RUNLEVEL_SHIFT     = 16;
# Line 523 | Line 527 | public class ForkJoinPool extends Abstra
527      private static final int TERMINATING        = 1 << (RUNLEVEL_SHIFT + 1);
528      private static final int TERMINATED         = 1 << (RUNLEVEL_SHIFT + 2);
529      private static final int ACTIVE_COUNT_MASK  = (1 << RUNLEVEL_SHIFT) - 1;
526    private static final int ONE_ACTIVE         = 1; // active update delta
530  
531      /**
532       * Holds number of total (i.e., created and not yet terminated)
# Line 565 | Line 568 | public class ForkJoinPool extends Abstra
568      private final int poolNumber;
569  
570  
571 <    // Utilities for CASing fields. Note that several of these
572 <    // are manually inlined by callers
571 >    // Utilities for CASing fields. Note that most of these
572 >    // are usually manually inlined by callers
573  
574      /**
575       * Increments running count part of workerCounts
# Line 599 | Line 602 | public class ForkJoinPool extends Abstra
602      private void decrementWorkerCounts(int dr, int dt) {
603          for (;;) {
604              int wc = workerCounts;
602            if (wc == 0 && (runState & TERMINATED) != 0)
603                return; // lagging termination on a backout
605              if ((wc & RUNNING_COUNT_MASK)  - dr < 0 ||
606 <                (wc >>> TOTAL_COUNT_SHIFT) - dt < 0)
606 >                (wc >>> TOTAL_COUNT_SHIFT) - dt < 0) {
607 >                if ((runState & TERMINATED) != 0)
608 >                    return; // lagging termination on a backout
609                  Thread.yield();
610 +            }
611              if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
612                                           wc, wc - (dr + dt)))
613                  return;
# Line 628 | Line 632 | public class ForkJoinPool extends Abstra
632      final boolean tryIncrementActiveCount() {
633          int c;
634          return UNSAFE.compareAndSwapInt(this, runStateOffset,
635 <                                        c = runState, c + ONE_ACTIVE);
635 >                                        c = runState, c + 1);
636      }
637  
638      /**
# Line 638 | Line 642 | public class ForkJoinPool extends Abstra
642      final boolean tryDecrementActiveCount() {
643          int c;
644          return UNSAFE.compareAndSwapInt(this, runStateOffset,
645 <                                        c = runState, c - ONE_ACTIVE);
645 >                                        c = runState, c - 1);
646      }
647  
648      /**
# Line 705 | Line 709 | public class ForkJoinPool extends Abstra
709       * Tries to create and add new worker. Assumes that worker counts
710       * are already updated to accommodate the worker, so adjusts on
711       * failure.
712 +     *
713 +     * @return the worker, or null on failure
714       */
715 <    private void addWorker() {
715 >    private ForkJoinWorkerThread addWorker() {
716          ForkJoinWorkerThread w = null;
717          try {
718              w = factory.newThread(this);
# Line 716 | Line 722 | public class ForkJoinPool extends Abstra
722                  tryTerminate(false); // in case of failure during shutdown
723              }
724          }
725 <        if (w != null)
725 >        if (w != null) {
726              w.start(recordWorker(w), ueh);
727 +            advanceEventCount();
728 +        }
729 +        return w;
730      }
731  
732      /**
# Line 740 | Line 749 | public class ForkJoinPool extends Abstra
749      /**
750       * Releases workers blocked on a count not equal to current count.
751       * Normally called after precheck that eventWaiters isn't zero to
752 <     * avoid wasted array checks.
753 <     *
745 <     * @param signalling true if caller is a signalling worker so can
746 <     * exit upon (conservatively) detected contention by other threads
747 <     * who will continue to release
752 >     * avoid wasted array checks. Gives up upon a change in count or
753 >     * contention, letting other workers take over.
754       */
755 <    private void releaseEventWaiters(boolean signalling) {
755 >    private void releaseEventWaiters() {
756          ForkJoinWorkerThread[] ws = workers;
757          int n = ws.length;
758 <        long h; // head of stack
759 <        ForkJoinWorkerThread w; int id, ec;
760 <        while ((id = ((int)((h = eventWaiters) & WAITER_ID_MASK)) - 1) >= 0 &&
761 <               (int)(h >>> EVENT_COUNT_SHIFT) != (ec = eventCount) &&
762 <               id < n && (w = ws[id]) != null) {
763 <            if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
764 <                                          h, h = w.nextWaiter))
765 <                LockSupport.unpark(w);
766 <            if (signalling && (eventCount != ec || eventWaiters != h))
758 >        long h = eventWaiters;
759 >        int ec = eventCount;
760 >        ForkJoinWorkerThread w; int id;
761 >        while ((int)(h >>> EVENT_COUNT_SHIFT) != ec &&
762 >               (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
763 >               id < n && (w = ws[id]) != null &&
764 >               UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
765 >                                         h,  h = w.nextWaiter)) {
766 >            LockSupport.unpark(w);
767 >            if (eventWaiters != h || eventCount != ec)
768                  break;
769          }
770      }
# Line 770 | Line 777 | public class ForkJoinPool extends Abstra
777          int c; // try to increment event count -- CAS failure OK
778          UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
779          if (eventWaiters != 0L)
780 <            releaseEventWaiters(true);
780 >            releaseEventWaiters();
781      }
782  
783      /**
784 <     * Blocks worker until terminating or event count
785 <     * advances from last value held by worker
784 >     * Adds the given worker to event queue and blocks until
785 >     * terminating or event count advances from the workers
786 >     * lastEventCount value
787       *
788       * @param w the calling worker thread
789       */
790      private void eventSync(ForkJoinWorkerThread w) {
791 <        int wec = w.lastEventCount;
792 <        long nh = (((long)wec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
791 >        int ec = w.lastEventCount;
792 >        long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
793          long h;
794          while ((runState < SHUTDOWN || !tryTerminate(false)) &&
795 <               ((h = eventWaiters) == 0L ||
796 <                (int)(h >>> EVENT_COUNT_SHIFT) == wec) &&
797 <               eventCount == wec) {
795 >               (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
796 >                (int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
797 >               eventCount == ec) {
798              if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
799                                            w.nextWaiter = h, nh)) {
800 <                while (runState < TERMINATING && eventCount == wec) {
801 <                    if (!tryAccumulateStealCount(w))  // transfer while idle
802 <                        continue;
803 <                    Thread.interrupted();             // clear/ignore interrupt
804 <                    if (eventCount != wec)
805 <                        break;
800 >                awaitEvent(w, ec);
801 >                break;
802 >            }
803 >        }
804 >    }
805 >
806 >    /**
807 >     * Blocks the given worker (that has already been entered as an
808 >     * event waiter) until terminating or event count advances from
809 >     * the given value. The oldest (first) waiter uses a timed wait to
810 >     * occasionally one-by-one shrink the number of workers (to a
811 >     * minumum of one) if the pool has not been used for extended
812 >     * periods.
813 >     *
814 >     * @param w the calling worker thread
815 >     * @param ec the count
816 >     */
817 >    private void awaitEvent(ForkJoinWorkerThread w, int ec) {
818 >        while (eventCount == ec) {
819 >            if (tryAccumulateStealCount(w)) { // transfer while idle
820 >                boolean untimed = (w.nextWaiter != 0L ||
821 >                                   (workerCounts & RUNNING_COUNT_MASK) <= 1);
822 >                long startTime = untimed? 0 : System.nanoTime();
823 >                Thread.interrupted();         // clear/ignore interrupt
824 >                if (eventCount != ec || !w.isRunning() ||
825 >                    runState >= TERMINATING)  // recheck after clear
826 >                    break;
827 >                if (untimed)
828                      LockSupport.park(w);
829 +                else {
830 +                    LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
831 +                    if (eventCount != ec || !w.isRunning() ||
832 +                        runState >= TERMINATING)
833 +                        break;
834 +                    if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
835 +                        tryShutdownWaiter(ec);
836                  }
800                break;
837              }
838          }
839 <        w.lastEventCount = eventCount;
839 >    }
840 >
841 >    /**
842 >     * Callback from the oldest waiter in awaitEvent waking up after a
843 >     * period of non-use. Tries (once) to shutdown an event waiter (or
844 >     * a spare, if one exists). Note that we don't need CAS or locks
845 >     * here because the method is called only from one thread
846 >     * occasionally waking (and even misfires are OK). Note that
847 >     * until the shutdown worker fully terminates, workerCounts
848 >     * will overestimate total count, which is tolerable.
849 >     *
850 >     * @param ec the event count waited on by caller (to abort
851 >     * attempt if count has since changed).
852 >     */
853 >    private void tryShutdownWaiter(int ec) {
854 >        if (spareWaiters != 0) { // prefer killing spares
855 >            tryShutdownSpare();
856 >            return;
857 >        }
858 >        ForkJoinWorkerThread[] ws = workers;
859 >        int n = ws.length;
860 >        long h = eventWaiters;
861 >        ForkJoinWorkerThread w; int id; long nh;
862 >        if (runState == 0 &&
863 >            submissionQueue.isEmpty() &&
864 >            eventCount == ec &&
865 >            (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
866 >            id < n && (w = ws[id]) != null &&
867 >            (nh = w.nextWaiter) != 0L && // keep at least one worker
868 >            UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) {
869 >            w.shutdown();
870 >            LockSupport.unpark(w);
871 >        }
872 >        releaseEventWaiters();
873      }
874  
875      // Maintaining spares
# Line 809 | Line 878 | public class ForkJoinPool extends Abstra
878       * Pushes worker onto the spare stack
879       */
880      final void pushSpare(ForkJoinWorkerThread w) {
881 <        int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex+1);
881 >        int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
882          do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
883                                                 w.nextSpare = spareWaiters,ns));
884      }
885  
886      /**
887 <     * Tries (once) to resume a spare if running count is less than
888 <     * target parallelism. Fails on contention or stale workers.
887 >     * Callback from oldest spare occasionally waking up.  Tries
888 >     * (once) to shutdown a spare. Same idea as tryShutdownWaiter.
889       */
890 <    private void tryResumeSpare() {
890 >    final void tryShutdownSpare() {
891          int sw, id;
892          ForkJoinWorkerThread w;
893          ForkJoinWorkerThread[] ws;
894          if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
895              id < (ws = workers).length && (w = ws[id]) != null &&
896 <            (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
828 <            eventWaiters == 0L &&
829 <            spareWaiters == sw &&
896 >            (workerCounts & RUNNING_COUNT_MASK) >= parallelism &&
897              UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
898 <                                     sw, w.nextSpare) &&
899 <            w.tryUnsuspend()) {
833 <            int c; // try increment; if contended, finish after unpark
834 <            boolean inc = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
835 <                                                   c = workerCounts,
836 <                                                   c + ONE_RUNNING);
898 >                                     sw, w.nextSpare)) {
899 >            w.shutdown();
900              LockSupport.unpark(w);
901 <            if (!inc) {
902 <                do {} while(!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
903 <                                                      c = workerCounts,
904 <                                                      c + ONE_RUNNING));
901 >            advanceEventCount();
902 >        }
903 >    }
904 >
905 >    /**
906 >     * Tries (once) to resume a spare if worker counts match
907 >     * the given count.
908 >     *
909 >     * @param wc workerCounts value on invocation of this method
910 >     */
911 >    private void tryResumeSpare(int wc) {
912 >        ForkJoinWorkerThread[] ws = workers;
913 >        int n = ws.length;
914 >        int sw, id, rs;  ForkJoinWorkerThread w;
915 >        if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
916 >            id < n && (w = ws[id]) != null &&
917 >            (rs = runState) < TERMINATING &&
918 >            eventWaiters == 0L && workerCounts == wc) {
919 >            // In case all workers busy, heuristically back off to let settle
920 >            Thread.yield();
921 >            if (eventWaiters == 0L && runState == rs && // recheck
922 >                workerCounts == wc && spareWaiters == sw &&
923 >                UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
924 >                                         sw, w.nextSpare)) {
925 >                int c;              // increment running count before resume
926 >                do {} while(!UNSAFE.compareAndSwapInt
927 >                            (this, workerCountsOffset,
928 >                             c = workerCounts, c + ONE_RUNNING));
929 >                if (w.tryUnsuspend())
930 >                    LockSupport.unpark(w);
931 >                else               // back out if w was shutdown
932 >                    decrementWorkerCounts(ONE_RUNNING, 0);
933              }
934          }
935      }
936  
937 +    // adding workers on demand
938 +
939      /**
940 <     * Callback from oldest spare occasionally waking up.  Tries
941 <     * (once) to shutdown a spare if more than 25% spare overage, or
942 <     * if UNUSED_SPARE_TRIM_RATE_NANOS have elapsed and there are at
943 <     * least #parallelism running threads. Note that we don't need CAS
851 <     * or locks here because the method is called only from the oldest
852 <     * suspended spare occasionally waking (and even misfires are OK).
853 <     *
854 <     * @param now the wake up nanoTime of caller
855 <     */
856 <    final void tryTrimSpare(long now) {
857 <        long lastTrim = trimTime;
858 <        trimTime = now;
859 <        helpMaintainParallelism(); // first, help wake up any needed spares
860 <        int sw, id;
861 <        ForkJoinWorkerThread w;
862 <        ForkJoinWorkerThread[] ws;
940 >     * Adds one or more workers if needed to establish target parallelism.
941 >     * Retries upon contention.
942 >     */
943 >    private void addWorkerIfBelowTarget() {
944          int pc = parallelism;
945 <        int wc = workerCounts;
946 <        if ((wc & RUNNING_COUNT_MASK) >= pc &&
947 <            (((wc >>> TOTAL_COUNT_SHIFT) - pc) > (pc >>> 2) + 1 ||// approx 25%
948 <             now - lastTrim >= UNUSED_SPARE_TRIM_RATE_NANOS) &&
949 <            (id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
950 <            id < (ws = workers).length && (w = ws[id]) != null &&
951 <            UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
952 <                                     sw, w.nextSpare))
953 <            w.shutdown(false);
945 >        int wc;
946 >        while (((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < pc &&
947 >               runState < TERMINATING) {
948 >            if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
949 >                                         wc + (ONE_RUNNING|ONE_TOTAL))) {
950 >                if (addWorker() == null)
951 >                    break;
952 >            }
953 >        }
954 >    }
955 >
956 >    /**
957 >     * Tries (once) to add a new worker if all existing workers are
958 >     * busy, and there are either no running workers or the deficit is
959 >     * at least twice the surplus.
960 >     *
961 >     * @param wc workerCounts value on invocation of this method
962 >     */
963 >    private void tryAddWorkerIfBusy(int wc) {
964 >        int tc, rc, rs;
965 >        int pc = parallelism;
966 >        if ((tc = wc >>> TOTAL_COUNT_SHIFT) < MAX_WORKERS &&
967 >            ((rc = wc & RUNNING_COUNT_MASK) == 0 ||
968 >             rc < pc - ((tc - pc) << 1)) &&
969 >            (rs = runState) < TERMINATING &&
970 >            (rs & ACTIVE_COUNT_MASK) == tc) {
971 >            // Since all workers busy, heuristically back off to let settle
972 >            Thread.yield();
973 >            if (eventWaiters == 0L && spareWaiters == 0 && // recheck
974 >                runState == rs && workerCounts == wc &&
975 >                UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
976 >                                         wc + (ONE_RUNNING|ONE_TOTAL)))
977 >                addWorker();
978 >        }
979      }
980  
981      /**
982       * Does at most one of:
983       *
984       * 1. Help wake up existing workers waiting for work via
985 <     *    releaseEventWaiters. (If any exist, then it probably doesn't
985 >     *    releaseEventWaiters. (If any exist, then it doesn't
986       *    matter right now if under target parallelism level.)
987       *
988 <     * 2. If below parallelism level and a spare exists, try (once)
883 <     *    to resume it via tryResumeSpare.
988 >     * 2. If a spare exists, try (once) to resume it via tryResumeSpare.
989       *
990 <     * 3. If neither of the above, tries (once) to add a new
991 <     *    worker if either there are not enough total, or if all
992 <     *    existing workers are busy, there are either no running
993 <     *    workers or the deficit is at least twice the surplus.
990 >     * 3. If there are not enough total workers, add some
991 >     *    via addWorkerIfBelowTarget;
992 >     *
993 >     * 4. Try (once) to add a new worker if all existing workers
994 >     *     are busy, via tryAddWorkerIfBusy
995       */
996      private void helpMaintainParallelism() {
997 <        // uglified to work better when not compiled
998 <        int pc, wc, rc, tc, rs; long h;
893 <        if ((h = eventWaiters) != 0L) {
997 >        long h; int pc, wc;
998 >        if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0) {
999              if ((int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1000 <                releaseEventWaiters(false); // avoid useless call
1000 >                releaseEventWaiters(); // avoid useless call
1001          }
1002          else if ((pc = parallelism) >
1003 <                 (rc = ((wc = workerCounts) & RUNNING_COUNT_MASK))) {
1003 >                 ((wc = workerCounts) & RUNNING_COUNT_MASK)) {
1004              if (spareWaiters != 0)
1005 <                tryResumeSpare();
1006 <            else if ((rs = runState) < TERMINATING &&
1007 <                     ((tc = wc >>> TOTAL_COUNT_SHIFT) < pc ||
1008 <                      (tc == (rs & ACTIVE_COUNT_MASK) && // all busy
1009 <                       (rc == 0 ||                       // must add
905 <                        rc < pc - ((tc - pc) << 1)) &&   // within slack
906 <                       tc < MAX_WORKERS && runState == rs)) && // recheck busy
907 <                     workerCounts == wc &&
908 <                     UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
909 <                                              wc + (ONE_RUNNING|ONE_TOTAL)))
910 <                addWorker();
1005 >                tryResumeSpare(wc);
1006 >            else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
1007 >                addWorkerIfBelowTarget();
1008 >            else
1009 >                tryAddWorkerIfBusy(wc);
1010          }
1011      }
1012  
1013      /**
1014       * Callback from workers invoked upon each top-level action (i.e.,
1015 <     * stealing a task or taking a submission and running
1016 <     * it). Performs one or more of the following:
1015 >     * stealing a task or taking a submission and running it).
1016 >     * Performs one or more of the following:
1017       *
1018 <     * 1. If the worker cannot find work (misses > 0), updates its
1019 <     *    active status to inactive and updates activeCount unless
1020 <     *    this is the first miss and there is contention, in which
922 <     *    case it may try again (either in this or a subsequent
923 <     *    call).
1018 >     * 1. If the worker is active, try to set its active status to
1019 >     *    inactive and update activeCount. On contention, we may try
1020 >     *    again on this or subsequent call.
1021       *
1022 <     * 2. If there are at least 2 misses, awaits the next task event
926 <     *    via eventSync
1022 >     * 2. Release any existing event waiters that are now relesable
1023       *
1024 <     * 3. If there are too many running threads, suspends this worker
1025 <     *    (first forcing inactivation if necessary).  If it is not
1024 >     * 3. If there are too many running threads, suspend this worker
1025 >     *    (first forcing inactive if necessary).  If it is not
1026       *    needed, it may be killed while suspended via
1027 <     *    tryTrimSpare. Otherwise, upon resume it rechecks to make
1027 >     *    tryShutdownSpare. Otherwise, upon resume it rechecks to make
1028       *    sure that it is still needed.
1029       *
1030 <     * 4. Helps release and/or reactivate other workers via
1031 <     *    helpMaintainParallelism
1030 >     * 4. If more than 1 miss, await the next task event via
1031 >     *    eventSync (first forcing inactivation if necessary), upon
1032 >     *    which worker may also be killed, via tryShutdownWaiter.
1033 >     *
1034 >     * 5. Help reactivate other workers via helpMaintainParallelism
1035       *
1036       * @param w the worker
1037       * @param misses the number of scans by caller failing to find work
1038 <     * (saturating at 2 just to avoid wraparound)
1038 >     * (saturating at 2 to avoid wraparound)
1039       */
1040      final void preStep(ForkJoinWorkerThread w, int misses) {
1041          boolean active = w.active;
1042          int pc = parallelism;
1043          for (;;) {
1044 <            int wc = workerCounts;
1045 <            int rc = wc & RUNNING_COUNT_MASK;
1046 <            if (active && (misses > 0 || rc > pc)) {
1047 <                int rs;                      // try inactivate
1048 <                if (UNSAFE.compareAndSwapInt(this, runStateOffset,
1049 <                                             rs = runState, rs - ONE_ACTIVE))
1050 <                    active = w.active = false;
1051 <                else if (misses > 1 || rc > pc ||
1052 <                         (rs & ACTIVE_COUNT_MASK) >= pc)
954 <                    continue;                // force inactivate
1044 >            int rs, wc, rc, ec; long h;
1045 >            if (active && UNSAFE.compareAndSwapInt(this, runStateOffset,
1046 >                                                   rs = runState, rs - 1))
1047 >                active = w.active = false;
1048 >            if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0 &&
1049 >                (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) {
1050 >                releaseEventWaiters();
1051 >                if (misses > 1)
1052 >                    continue;                  // clear before sync below
1053              }
1054 <            if (misses > 1) {
1055 <                misses = 0;                  // don't re-sync
1056 <                eventSync(w);                // continue loop to recheck rc
959 <            }
960 <            else if (rc > pc) {
961 <                if (workerCounts == wc &&   // try to suspend as spare
1054 >            if ((rc = ((wc = workerCounts) & RUNNING_COUNT_MASK)) > pc) {
1055 >                if (!active &&                 // must inactivate to suspend
1056 >                    workerCounts == wc &&      // try to suspend as spare
1057                      UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1058 <                                             wc, wc - ONE_RUNNING) &&
1059 <                    !w.suspendAsSpare())    // false if killed
1060 <                    break;
1058 >                                             wc, wc - ONE_RUNNING)) {
1059 >                    w.suspendAsSpare();
1060 >                    if (!w.isRunning())
1061 >                        break;                 // was killed while spare
1062 >                }
1063 >                continue;
1064              }
1065 <            else {
1066 <                if (rc < pc || eventWaiters != 0L)
1067 <                    helpMaintainParallelism();
1068 <                break;
1065 >            if (misses > 0) {
1066 >                if ((ec = eventCount) == w.lastEventCount && misses > 1) {
1067 >                    if (!active) {             // must inactivate to sync
1068 >                        eventSync(w);
1069 >                        if (w.isRunning())
1070 >                            misses = 1;        // don't re-sync
1071 >                        else
1072 >                            break;             // was killed while waiting
1073 >                    }
1074 >                    continue;
1075 >                }
1076 >                w.lastEventCount = ec;
1077              }
1078 +            if (rc < pc)
1079 +                helpMaintainParallelism();
1080 +            break;
1081          }
1082      }
1083  
# Line 1073 | Line 1182 | public class ForkJoinPool extends Abstra
1182       * Actions on transition to TERMINATING
1183       *
1184       * Runs up to four passes through workers: (0) shutting down each
1185 <     * quietly (without waking up if parked) to quickly spread
1186 <     * notifications without unnecessary bouncing around event queues
1187 <     * etc (1) wake up and help cancel tasks (2) interrupt (3) mop up
1188 <     * races with interrupted workers
1185 >     * (without waking up if parked) to quickly spread notifications
1186 >     * without unnecessary bouncing around event queues etc (1) wake
1187 >     * up and help cancel tasks (2) interrupt (3) mop up races with
1188 >     * interrupted workers
1189       */
1190      private void startTerminating() {
1191          cancelSubmissions();
# Line 1089 | Line 1198 | public class ForkJoinPool extends Abstra
1198              for (int i = 0; i < n; ++i) {
1199                  ForkJoinWorkerThread w = ws[i];
1200                  if (w != null) {
1201 <                    w.shutdown(true);
1201 >                    w.shutdown();
1202                      if (passes > 0 && !w.isTerminated()) {
1203                          w.cancelTasks();
1204                          LockSupport.unpark(w);
# Line 1237 | Line 1346 | public class ForkJoinPool extends Abstra
1346          this.workerLock = new ReentrantLock();
1347          this.termination = new Phaser(1);
1348          this.poolNumber = poolNumberGenerator.incrementAndGet();
1240        this.trimTime = System.nanoTime();
1349      }
1350  
1351      /**
# Line 1266 | Line 1374 | public class ForkJoinPool extends Abstra
1374              throw new RejectedExecutionException();
1375          submissionQueue.offer(task);
1376          advanceEventCount();
1377 <        helpMaintainParallelism();         // start or wake up workers
1377 >        if (eventWaiters != 0L)
1378 >            releaseEventWaiters();
1379 >        if ((workerCounts >>> TOTAL_COUNT_SHIFT) < parallelism)
1380 >            addWorkerIfBelowTarget();
1381      }
1382  
1383      /**
# Line 1285 | Line 1396 | public class ForkJoinPool extends Abstra
1396  
1397      /**
1398       * Arranges for (asynchronous) execution of the given task.
1288     * If the caller is already engaged in a fork/join computation in
1289     * the current pool, this method is equivalent in effect to
1290     * {@link ForkJoinTask#fork}.
1399       *
1400       * @param task the task
1401       * @throws NullPointerException if the task is null
# Line 1316 | Line 1424 | public class ForkJoinPool extends Abstra
1424  
1425      /**
1426       * Submits a ForkJoinTask for execution.
1319     * If the caller is already engaged in a fork/join computation in
1320     * the current pool, this method is equivalent in effect to
1321     * {@link ForkJoinTask#fork}.
1427       *
1428       * @param task the task to submit
1429       * @return the task

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines