ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.23 by dl, Wed Aug 18 14:05:51 2010 UTC vs.
Revision 1.24 by dl, Sun Aug 29 23:35:07 2010 UTC

# Line 159 | Line 159 | public class ForkJoinPool extends Abstra
159       *      re-activate a spare thread to compensate for blocked
160       *      joiners until they unblock.
161       *
162 <     * Because the determining existence of conservatively safe
163 <     * helping targets, the availability of already-created spares,
164 <     * and the apparent need to create new spares are all racy and
165 <     * require heuristic guidance, we rely on multiple retries of
166 <     * each. Further, because it is impossible to keep exactly the
167 <     * target (parallelism) number of threads running at any given
168 <     * time, we allow compensation during joins to fail, and enlist
169 <     * all other threads to help out whenever they are not otherwise
170 <     * occupied (i.e., mainly in method preStep).
162 >     * It is impossible to keep exactly the target (parallelism)
163 >     * number of threads running at any given time.  Determining
164 >     * existence of conservatively safe helping targets, the
165 >     * availability of already-created spares, and the apparent need
166 >     * to create new spares are all racy and require heuristic
167 >     * guidance, so we rely on multiple retries of each.  Compensation
168 >     * occurs in slow-motion. It is triggered only upon timeouts of
169 >     * Object.wait used for joins. This reduces poor decisions that
170 >     * would otherwise be made when threads are waiting for others
171 >     * that are stalled because of unrelated activities such as
172 >     * garbage collection.
173       *
174       * The ManagedBlocker extension API can't use helping so relies
175       * only on compensation in method awaitBlocker.
# Line 269 | Line 271 | public class ForkJoinPool extends Abstra
271       * In addition to allowing simpler decisions about need for
272       * wakeup, the event count bits in eventWaiters serve the role of
273       * tags to avoid ABA errors in Treiber stacks. Upon any wakeup,
274 <     * released threads also try to release others (but give up upon
275 <     * contention to reduce useless flailing).  The net effect is a
276 <     * tree-like diffusion of signals, where released threads (and
277 <     * possibly others) help with unparks.  To further reduce
278 <     * contention effects a bit, failed CASes to increment field
277 <     * eventCount are tolerated without retries in signalWork.
274 >     * released threads also try to release at most two others.  The
275 >     * net effect is a tree-like diffusion of signals, where released
276 >     * threads (and possibly others) help with unparks.  To further
277 >     * reduce contention effects a bit, failed CASes to increment
278 >     * field eventCount are tolerated without retries in signalWork.
279       * Conceptually they are merged into the same event, which is OK
280       * when their only purpose is to enable workers to scan for work.
281       *
282 <     * 5. Managing suspension of extra workers. When a worker is about
283 <     * to block waiting for a join (or via ManagedBlockers), we may
284 <     * create a new thread to maintain parallelism level, or at least
285 <     * avoid starvation. Usually, extra threads are needed for only
286 <     * very short periods, yet join dependencies are such that we
287 <     * sometimes need them in bursts. Rather than create new threads
288 <     * each time this happens, we suspend no-longer-needed extra ones
289 <     * as "spares". For most purposes, we don't distinguish "extra"
290 <     * spare threads from normal "core" threads: On each call to
291 <     * preStep (the only point at which we can do this) a worker
292 <     * checks to see if there are now too many running workers, and if
293 <     * so, suspends itself.  Method helpMaintainParallelism looks for
294 <     * suspended threads to resume before considering creating a new
295 <     * replacement. The spares themselves are encoded on another
296 <     * variant of a Treiber Stack, headed at field "spareWaiters".
297 <     * Note that the use of spares is intrinsically racy.  One thread
298 <     * may become a spare at about the same time as another is
299 <     * needlessly being created. We counteract this and related slop
300 <     * in part by requiring resumed spares to immediately recheck (in
301 <     * preStep) to see whether they they should re-suspend.
302 <     *
303 <     * 6. Killing off unneeded workers. The Spare and Event queues use
304 <     * similar mechanisms to shed unused workers: The oldest (first)
305 <     * waiter uses a timed rather than hard wait. When this wait times
306 <     * out without a normal wakeup, it tries to shutdown any one (for
307 <     * convenience the newest) other waiter via tryShutdownSpare or
308 <     * tryShutdownWaiter, respectively. The wakeup rates for spares
309 <     * are much shorter than for waiters. Together, they will
310 <     * eventually reduce the number of worker threads to a minimum of
311 <     * one after a long enough period without use.
282 >     * 5. Managing suspension of extra workers. When a worker notices
283 >     * (usually upon timeout of a wait()) that there are too few
284 >     * running threads, we may create a new thread to maintain
285 >     * parallelism level, or at least avoid starvation. Usually, extra
286 >     * threads are needed for only very short periods, yet join
287 >     * dependencies are such that we sometimes need them in
288 >     * bursts. Rather than create new threads each time this happens,
289 >     * we suspend no-longer-needed extra ones as "spares". For most
290 >     * purposes, we don't distinguish "extra" spare threads from
291 >     * normal "core" threads: On each call to preStep (the only point
292 >     * at which we can do this) a worker checks to see if there are
293 >     * now too many running workers, and if so, suspends itself.
294 >     * Method helpMaintainParallelism looks for suspended threads to
295 >     * resume before considering creating a new replacement. The
296 >     * spares themselves are encoded on another variant of a Treiber
297 >     * Stack, headed at field "spareWaiters".  Note that the use of
298 >     * spares is intrinsically racy.  One thread may become a spare at
299 >     * about the same time as another is needlessly being created. We
300 >     * counteract this and related slop in part by requiring resumed
301 >     * spares to immediately recheck (in preStep) to see whether they
302 >     * they should re-suspend.
303 >     *
304 >     * 6. Killing off unneeded workers. A timeout mechanism is used to
305 >     * shed unused workers: The oldest (first) event queue waiter uses
306 >     * a timed rather than hard wait. When this wait times out without
307 >     * a normal wakeup, it tries to shutdown any one (for convenience
308 >     * the newest) other spare or event waiter via
309 >     * tryShutdownUnusedWorker. This eventually reduces the number of
310 >     * worker threads to a minimum of one after a long enough period
311 >     * without use.
312       *
313       * 7. Deciding when to create new workers. The main dynamic
314       * control in this class is deciding when to create extra threads
# Line 320 | Line 321 | public class ForkJoinPool extends Abstra
321       * compilation, and wake-up lags. These transients are extremely
322       * common -- we are normally trying to fully saturate the CPUs on
323       * a machine, so almost any activity other than running tasks
324 <     * impedes accuracy. Our main defense is to allow some slack in
325 <     * creation thresholds, using rules that reflect the fact that the
326 <     * more threads we have running, the more likely that we are
327 <     * underestimating the number running threads. (We also include
328 <     * some heuristic use of Thread.yield when all workers appear to
329 <     * be busy, to improve likelihood of counts settling.) The rules
330 <     * also better cope with the fact that some of the methods in this
331 <     * class tend to never become compiled (but are interpreted), so
332 <     * some components of the entire set of controls might execute 100
332 <     * times faster than others. And similarly for cases where the
333 <     * apparent lack of work is just due to GC stalls and other
334 <     * transient system activity.
324 >     * impedes accuracy. Our main defense is to allow parallelism to
325 >     * lapse for a while during joins, and use a timeout to see if,
326 >     * after the resulting settling, there is still a need for
327 >     * additional workers.  This also better copes with the fact that
328 >     * some of the methods in this class tend to never become compiled
329 >     * (but are interpreted), so some components of the entire set of
330 >     * controls might execute 100 times faster than others. And
331 >     * similarly for cases where the apparent lack of work is just due
332 >     * to GC stalls and other transient system activity.
333       *
334       * Beware that there is a lot of representation-level coupling
335       * among classes ForkJoinPool, ForkJoinWorkerThread, and
# Line 418 | Line 416 | public class ForkJoinPool extends Abstra
416          new AtomicInteger();
417  
418      /**
419 +     * The time to block in a join (see awaitJoin) before checking if
420 +     * a new worker should be (re)started to maintain parallelism
421 +     * level. The value should be short enough to maintain gloabal
422 +     * responsiveness and progress but long enough to avoid
423 +     * counterproductive firings during GC stalls or unrelated system
424 +     * activity, and to not bog down systems with continual re-firings
425 +     * on GCs or legitimately long waits.
426 +     */
427 +    private static final long JOIN_TIMEOUT_MILLIS = 250L; // 4 per second
428 +
429 +    /**
430       * The wakeup interval (in nanoseconds) for the oldest worker
431 <     * worker waiting for an event invokes tryShutdownWaiter to shrink
431 >     * worker waiting for an event invokes tryShutdownUnusedWorker to shrink
432       * the number of workers.  The exact value does not matter too
433       * much, but should be long enough to slowly release resources
434       * during long periods without use without disrupting normal use.
435       */
436      private static final long SHRINK_RATE_NANOS =
437 <        60L * 1000L * 1000L * 1000L; // one minute
437 >        30L * 1000L * 1000L * 1000L; // 2 per minute
438  
439      /**
440       * Absolute bound for parallelism level. Twice this number plus
# Line 565 | Line 574 | public class ForkJoinPool extends Abstra
574       */
575      private final int poolNumber;
576  
568
577      // Utilities for CASing fields. Note that most of these
578      // are usually manually inlined by callers
579  
# Line 613 | Line 621 | public class ForkJoinPool extends Abstra
621      }
622  
623      /**
616     * Increments event count
617     */
618    private void advanceEventCount() {
619        int c;
620        do {} while(!UNSAFE.compareAndSwapInt(this, eventCountOffset,
621                                              c = eventCount, c+1));
622    }
623
624    /**
625     * Tries incrementing active count; fails on contention.
626     * Called by workers before executing tasks.
627     *
628     * @return true on success
629     */
630    final boolean tryIncrementActiveCount() {
631        int c;
632        return UNSAFE.compareAndSwapInt(this, runStateOffset,
633                                        c = runState, c + 1);
634    }
635
636    /**
624       * Tries decrementing active count; fails on contention.
625       * Called when workers cannot find tasks to run.
626       */
# Line 701 | Line 688 | public class ForkJoinPool extends Abstra
688          }
689      }
690  
704    // adding and removing workers
705
706    /**
707     * Tries to create and add new worker. Assumes that worker counts
708     * are already updated to accommodate the worker, so adjusts on
709     * failure.
710     *
711     * @return the worker, or null on failure
712     */
713    private ForkJoinWorkerThread addWorker() {
714        ForkJoinWorkerThread w = null;
715        try {
716            w = factory.newThread(this);
717        } finally { // Adjust on either null or exceptional factory return
718            if (w == null) {
719                decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
720                tryTerminate(false); // in case of failure during shutdown
721            }
722        }
723        if (w != null) {
724            w.start(recordWorker(w), ueh);
725            advanceEventCount();
726        }
727        return w;
728    }
729
691      /**
692       * Final callback from terminating worker.  Removes record of
693       * worker from array, and adjusts counts. If pool is shutting
# Line 748 | Line 709 | public class ForkJoinPool extends Abstra
709       * Releases workers blocked on a count not equal to current count.
710       * Normally called after precheck that eventWaiters isn't zero to
711       * avoid wasted array checks. Gives up upon a change in count or
712 <     * contention, letting other workers take over.
712 >     * upon releasing two workers, letting others take over.
713       */
714      private void releaseEventWaiters() {
715          ForkJoinWorkerThread[] ws = workers;
716          int n = ws.length;
717          long h = eventWaiters;
718          int ec = eventCount;
719 +        boolean releasedOne = false;
720          ForkJoinWorkerThread w; int id;
721 <        while ((int)(h >>> EVENT_COUNT_SHIFT) != ec &&
722 <               (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
723 <               id < n && (w = ws[id]) != null &&
724 <               UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
725 <                                         h,  h = w.nextWaiter)) {
726 <            LockSupport.unpark(w);
727 <            if (eventWaiters != h || eventCount != ec)
721 >        while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
722 >               (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
723 >               id < n && (w = ws[id]) != null) {
724 >            if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
725 >                                          h,  w.nextWaiter)) {
726 >                LockSupport.unpark(w);
727 >                if (releasedOne) // exit on second release
728 >                    break;
729 >                releasedOne = true;
730 >            }
731 >            if (eventCount != ec)
732                  break;
733 +            h = eventWaiters;
734          }
735      }
736  
# Line 780 | Line 747 | public class ForkJoinPool extends Abstra
747  
748      /**
749       * Adds the given worker to event queue and blocks until
750 <     * terminating or event count advances from the workers
784 <     * lastEventCount value
750 >     * terminating or event count advances from the given value
751       *
752       * @param w the calling worker thread
753 +     * @param ec the count
754       */
755 <    private void eventSync(ForkJoinWorkerThread w) {
789 <        int ec = w.lastEventCount;
755 >    private void eventSync(ForkJoinWorkerThread w, int ec) {
756          long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
757          long h;
758          while ((runState < SHUTDOWN || !tryTerminate(false)) &&
# Line 806 | Line 772 | public class ForkJoinPool extends Abstra
772       * event waiter) until terminating or event count advances from
773       * the given value. The oldest (first) waiter uses a timed wait to
774       * occasionally one-by-one shrink the number of workers (to a
775 <     * minumum of one) if the pool has not been used for extended
775 >     * minimum of one) if the pool has not been used for extended
776       * periods.
777       *
778       * @param w the calling worker thread
# Line 819 | Line 785 | public class ForkJoinPool extends Abstra
785                                     (workerCounts & RUNNING_COUNT_MASK) <= 1);
786                  long startTime = untimed? 0 : System.nanoTime();
787                  Thread.interrupted();         // clear/ignore interrupt
788 <                if (eventCount != ec || !w.isRunning() ||
788 >                if (eventCount != ec || w.runState != 0 ||
789                      runState >= TERMINATING)  // recheck after clear
790                      break;
791                  if (untimed)
792                      LockSupport.park(w);
793                  else {
794                      LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
795 <                    if (eventCount != ec || !w.isRunning() ||
795 >                    if (eventCount != ec || w.runState != 0 ||
796                          runState >= TERMINATING)
797                          break;
798                      if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
799 <                        tryShutdownWaiter(ec);
799 >                        tryShutdownUnusedWorker(ec);
800                  }
801              }
802          }
803      }
804  
805 <    /**
840 <     * Callback from the oldest waiter in awaitEvent waking up after a
841 <     * period of non-use. Tries (once) to shutdown an event waiter (or
842 <     * a spare, if one exists). Note that we don't need CAS or locks
843 <     * here because the method is called only from one thread
844 <     * occasionally waking (and even misfires are OK). Note that
845 <     * until the shutdown worker fully terminates, workerCounts
846 <     * will overestimate total count, which is tolerable.
847 <     *
848 <     * @param ec the event count waited on by caller (to abort
849 <     * attempt if count has since changed).
850 <     */
851 <    private void tryShutdownWaiter(int ec) {
852 <        if (spareWaiters != 0) { // prefer killing spares
853 <            tryShutdownSpare();
854 <            return;
855 <        }
856 <        ForkJoinWorkerThread[] ws = workers;
857 <        int n = ws.length;
858 <        long h = eventWaiters;
859 <        ForkJoinWorkerThread w; int id; long nh;
860 <        if (runState == 0 &&
861 <            submissionQueue.isEmpty() &&
862 <            eventCount == ec &&
863 <            (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
864 <            id < n && (w = ws[id]) != null &&
865 <            (nh = w.nextWaiter) != 0L && // keep at least one worker
866 <            UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) {
867 <            w.shutdown();
868 <            LockSupport.unpark(w);
869 <        }
870 <        releaseEventWaiters();
871 <    }
872 <
873 <    // Maintaining spares
805 >    // Maintaining parallelism
806  
807      /**
808       * Pushes worker onto the spare stack
# Line 882 | Line 814 | public class ForkJoinPool extends Abstra
814      }
815  
816      /**
817 <     * Callback from oldest spare occasionally waking up.  Tries
818 <     * (once) to shutdown a spare. Same idea as tryShutdownWaiter.
817 >     * Tries (once) to resume a spare if the number of running
818 >     * threads is less than target.
819       */
820 <    final void tryShutdownSpare() {
820 >    private void tryResumeSpare() {
821          int sw, id;
890        ForkJoinWorkerThread w;
891        ForkJoinWorkerThread[] ws;
892        if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
893            id < (ws = workers).length && (w = ws[id]) != null &&
894            (workerCounts & RUNNING_COUNT_MASK) >= parallelism &&
895            UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
896                                     sw, w.nextSpare)) {
897            w.shutdown();
898            LockSupport.unpark(w);
899            advanceEventCount();
900        }
901    }
902
903    /**
904     * Tries (once) to resume a spare if worker counts match
905     * the given count.
906     *
907     * @param wc workerCounts value on invocation of this method
908     */
909    private void tryResumeSpare(int wc) {
822          ForkJoinWorkerThread[] ws = workers;
823          int n = ws.length;
824 <        int sw, id, rs;  ForkJoinWorkerThread w;
825 <        if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
824 >        ForkJoinWorkerThread w;
825 >        if ((sw = spareWaiters) != 0 &&
826 >            (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
827              id < n && (w = ws[id]) != null &&
828 <            (rs = runState) < TERMINATING &&
829 <            eventWaiters == 0L && workerCounts == wc) {
830 <            // In case all workers busy, heuristically back off to let settle
831 <            Thread.yield();
832 <            if (eventWaiters == 0L && runState == rs && // recheck
833 <                workerCounts == wc && spareWaiters == sw &&
834 <                UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
835 <                                         sw, w.nextSpare)) {
836 <                int c;              // increment running count before resume
837 <                do {} while(!UNSAFE.compareAndSwapInt
838 <                            (this, workerCountsOffset,
839 <                             c = workerCounts, c + ONE_RUNNING));
927 <                if (w.tryUnsuspend())
928 <                    LockSupport.unpark(w);
929 <                else               // back out if w was shutdown
930 <                    decrementWorkerCounts(ONE_RUNNING, 0);
931 <            }
828 >            (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
829 >            spareWaiters == sw &&
830 >            UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
831 >                                     sw, w.nextSpare)) {
832 >            int c; // increment running count before resume
833 >            do {} while(!UNSAFE.compareAndSwapInt
834 >                        (this, workerCountsOffset,
835 >                         c = workerCounts, c + ONE_RUNNING));
836 >            if (w.tryUnsuspend())
837 >                LockSupport.unpark(w);
838 >            else   // back out if w was shutdown
839 >                decrementWorkerCounts(ONE_RUNNING, 0);
840          }
841      }
842  
935    // adding workers on demand
936
843      /**
844 <     * Adds one or more workers if needed to establish target parallelism.
845 <     * Retries upon contention.
844 >     * Tries to increase the number of running workers if below target
845 >     * parallelism: If a spare exists tries to resume it via
846 >     * tryResumeSpare.  Otherwise, if not enough total workers or all
847 >     * existing workers are busy, adds a new worker. In all casses also
848 >     * helps wake up releasable workers waiting for work.
849       */
850 <    private void addWorkerIfBelowTarget() {
850 >    private void helpMaintainParallelism() {
851          int pc = parallelism;
852 <        int wc;
853 <        while (((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < pc &&
854 <               runState < TERMINATING) {
855 <            if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
856 <                                         wc + (ONE_RUNNING|ONE_TOTAL))) {
857 <                if (addWorker() == null)
852 >        int wc, rs, tc;
853 >        while (((wc = workerCounts) & RUNNING_COUNT_MASK) < pc &&
854 >               (rs = runState) < TERMINATING) {
855 >            if (spareWaiters != 0)
856 >                tryResumeSpare();
857 >            else if ((tc = wc >>> TOTAL_COUNT_SHIFT) >= MAX_WORKERS ||
858 >                     (tc >= pc && (rs & ACTIVE_COUNT_MASK) != tc))
859 >                break;   // enough total
860 >            else if (runState == rs && workerCounts == wc &&
861 >                     UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
862 >                                              wc + (ONE_RUNNING|ONE_TOTAL))) {
863 >                ForkJoinWorkerThread w = null;
864 >                try {
865 >                    w = factory.newThread(this);
866 >                } finally { // adjust on null or exceptional factory return
867 >                    if (w == null) {
868 >                        decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
869 >                        tryTerminate(false); // handle failure during shutdown
870 >                    }
871 >                }
872 >                if (w == null)
873                      break;
874 +                w.start(recordWorker(w), ueh);
875 +                if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
876 +                    int c; // advance event count
877 +                    UNSAFE.compareAndSwapInt(this, eventCountOffset,
878 +                                             c = eventCount, c+1);
879 +                    break; // add at most one unless total below target
880 +                }
881              }
882          }
883 +        if (eventWaiters != 0L)
884 +            releaseEventWaiters();
885      }
886  
887      /**
888 <     * Tries (once) to add a new worker if all existing workers are
889 <     * busy, and there are either no running workers or the deficit is
890 <     * at least twice the surplus.
891 <     *
892 <     * @param wc workerCounts value on invocation of this method
893 <     */
894 <    private void tryAddWorkerIfBusy(int wc) {
962 <        int tc, rc, rs;
963 <        int pc = parallelism;
964 <        if ((tc = wc >>> TOTAL_COUNT_SHIFT) < MAX_WORKERS &&
965 <            ((rc = wc & RUNNING_COUNT_MASK) == 0 ||
966 <             rc < pc - ((tc - pc) << 1)) &&
967 <            (rs = runState) < TERMINATING &&
968 <            (rs & ACTIVE_COUNT_MASK) == tc) {
969 <            // Since all workers busy, heuristically back off to let settle
970 <            Thread.yield();
971 <            if (eventWaiters == 0L && spareWaiters == 0 && // recheck
972 <                runState == rs && workerCounts == wc &&
973 <                UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
974 <                                         wc + (ONE_RUNNING|ONE_TOTAL)))
975 <                addWorker();
976 <        }
977 <    }
978 <
979 <    /**
980 <     * Does at most one of:
981 <     *
982 <     * 1. Help wake up existing workers waiting for work via
983 <     *    releaseEventWaiters. (If any exist, then it doesn't
984 <     *    matter right now if under target parallelism level.)
985 <     *
986 <     * 2. If a spare exists, try (once) to resume it via tryResumeSpare.
987 <     *
988 <     * 3. If there are not enough total workers, add some
989 <     *    via addWorkerIfBelowTarget;
888 >     * Callback from the oldest waiter in awaitEvent waking up after a
889 >     * period of non-use. If all workers are idle, tries (once) to
890 >     * shutdown an event waiter or a spare, if one exists. Note that
891 >     * we don't need CAS or locks here because the method is called
892 >     * only from one thread occasionally waking (and even misfires are
893 >     * OK). Note that until the shutdown worker fully terminates,
894 >     * workerCounts will overestimate total count, which is tolerable.
895       *
896 <     * 4. Try (once) to add a new worker if all existing workers
897 <     *     are busy, via tryAddWorkerIfBusy
896 >     * @param ec the event count waited on by caller (to abort
897 >     * attempt if count has since changed).
898       */
899 <    private void helpMaintainParallelism() {
900 <        long h; int pc, wc;
901 <        if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0) {
902 <            if ((int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
903 <                releaseEventWaiters(); // avoid useless call
904 <        }
905 <        else if ((pc = parallelism) >
906 <                 ((wc = workerCounts) & RUNNING_COUNT_MASK)) {
907 <            if (spareWaiters != 0)
908 <                tryResumeSpare(wc);
909 <            else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
910 <                addWorkerIfBelowTarget();
911 <            else
912 <                tryAddWorkerIfBusy(wc);
899 >    private void tryShutdownUnusedWorker(int ec) {
900 >        if (runState == 0 && eventCount == ec) { // only trigger if all idle
901 >            ForkJoinWorkerThread[] ws = workers;
902 >            int n = ws.length;
903 >            ForkJoinWorkerThread w = null;
904 >            boolean shutdown = false;
905 >            int sw;
906 >            long h;
907 >            if ((sw = spareWaiters) != 0) { // prefer killing spares
908 >                int id = (sw & SPARE_ID_MASK) - 1;
909 >                if (id >= 0 && id < n && (w = ws[id]) != null &&
910 >                    UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
911 >                                             sw, w.nextSpare))
912 >                    shutdown = true;
913 >            }
914 >            else if ((h = eventWaiters) != 0L) {
915 >                long nh;
916 >                int id = ((int)(h & WAITER_ID_MASK)) - 1;
917 >                if (id >= 0 && id < n && (w = ws[id]) != null &&
918 >                    (nh = w.nextWaiter) != 0L && // keep at least one worker
919 >                    UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
920 >                    shutdown = true;
921 >            }
922 >            if (w != null && shutdown) {
923 >                w.shutdown();
924 >                LockSupport.unpark(w);
925 >            }
926          }
927 +        releaseEventWaiters(); // in case of interference
928      }
929  
930      /**
# Line 1013 | Line 932 | public class ForkJoinPool extends Abstra
932       * stealing a task or taking a submission and running it).
933       * Performs one or more of the following:
934       *
935 <     * 1. If the worker is active, try to set its active status to
936 <     *    inactive and update activeCount. On contention, we may try
937 <     *    again on this or subsequent call.
938 <     *
939 <     * 2. Release any existing event waiters that are now relesable
940 <     *
941 <     * 3. If there are too many running threads, suspend this worker
942 <     *    (first forcing inactive if necessary).  If it is not
943 <     *    needed, it may be killed while suspended via
944 <     *    tryShutdownSpare. Otherwise, upon resume it rechecks to make
945 <     *    sure that it is still needed.
946 <     *
947 <     * 4. If more than 1 miss, await the next task event via
948 <     *    eventSync (first forcing inactivation if necessary), upon
949 <     *    which worker may also be killed, via tryShutdownWaiter.
950 <     *
951 <     * 5. Help reactivate other workers via helpMaintainParallelism
935 >     * 1. If the worker is active and either did not run a task
936 >     *    or there are too many workers, try to set its active status
937 >     *    to inactive and update activeCount. On contention, we may
938 >     *    try again in this or a subsequent call.
939 >     *
940 >     * 2. If not enough total workers, help create some.
941 >     *
942 >     * 3. If there are too many running workers, suspend this worker
943 >     *    (first forcing inactive if necessary).  If it is not needed,
944 >     *    it may be shutdown while suspended (via
945 >     *    tryShutdownUnusedWorker).  Otherwise, upon resume it
946 >     *    rechecks running thread count and need for event sync.
947 >     *
948 >     * 4. If worker did not run a task, await the next task event via
949 >     *    eventSync if necessary (first forcing inactivation), upon
950 >     *    which the worker may be shutdown via
951 >     *    tryShutdownUnusedWorker.  Otherwise, help release any
952 >     *    existing event waiters that are now releasable,
953       *
954       * @param w the worker
955 <     * @param misses the number of scans by caller failing to find work
1036 <     * (saturating at 2 to avoid wraparound)
955 >     * @param ran true if worker ran a task since last call to this method
956       */
957 <    final void preStep(ForkJoinWorkerThread w, int misses) {
957 >    final void preStep(ForkJoinWorkerThread w, boolean ran) {
958 >        int wec = w.lastEventCount;
959          boolean active = w.active;
960 +        boolean inactivate = false;
961          int pc = parallelism;
962 <        for (;;) {
963 <            int rs, wc, rc, ec; long h;
964 <            if (active && UNSAFE.compareAndSwapInt(this, runStateOffset,
965 <                                                   rs = runState, rs - 1))
966 <                active = w.active = false;
967 <            if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0 &&
968 <                (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) {
969 <                releaseEventWaiters();
1049 <                if (misses > 1)
1050 <                    continue;                  // clear before sync below
1051 <            }
1052 <            if ((rc = ((wc = workerCounts) & RUNNING_COUNT_MASK)) > pc) {
1053 <                if (!active &&                 // must inactivate to suspend
962 >        int rs;
963 >        while (w.runState == 0 && (rs = runState) < TERMINATING) {
964 >            if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
965 >                UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
966 >                inactivate = active = w.active = false;
967 >            int wc = workerCounts;
968 >            if ((wc & RUNNING_COUNT_MASK) > pc) {
969 >                if (!(inactivate |= active) && // must inactivate to suspend
970                      workerCounts == wc &&      // try to suspend as spare
971                      UNSAFE.compareAndSwapInt(this, workerCountsOffset,
972 <                                             wc, wc - ONE_RUNNING)) {
972 >                                             wc, wc - ONE_RUNNING))
973                      w.suspendAsSpare();
1058                    if (!w.isRunning())
1059                        break;                 // was killed while spare
1060                }
1061                continue;
974              }
975 <            if (misses > 0) {
976 <                if ((ec = eventCount) == w.lastEventCount && misses > 1) {
977 <                    if (!active) {             // must inactivate to sync
978 <                        eventSync(w);
979 <                        if (w.isRunning())
980 <                            misses = 1;        // don't re-sync
981 <                        else
982 <                            break;             // was killed while waiting
983 <                    }
984 <                    continue;
975 >            else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
976 >                helpMaintainParallelism();     // not enough workers
977 >            else if (!ran) {
978 >                long h = eventWaiters;
979 >                int ec = eventCount;
980 >                if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
981 >                    releaseEventWaiters();     // release others before waiting
982 >                else if (ec != wec) {
983 >                    w.lastEventCount = ec;     // no need to wait
984 >                    break;
985                  }
986 <                w.lastEventCount = ec;
986 >                else if (!(inactivate |= active))  
987 >                    eventSync(w, wec);         // must inactivate before sync
988              }
989 <            if (rc < pc)
990 <                helpMaintainParallelism();
1078 <            break;
989 >            else
990 >                break;
991          }
992      }
993  
994      /**
995       * Helps and/or blocks awaiting join of the given task.
996 <     * Alternates between helpJoinTask() and helpMaintainParallelism()
1085 <     * as many times as there is a deficit in running count (or longer
1086 <     * if running count would become zero), then blocks if task still
1087 <     * not done.
996 >     * See above for explanation.
997       *
998       * @param joinMe the task to join
999 +     * @param worker the current worker thread
1000       */
1001      final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
1002 <        int threshold = parallelism;         // descend blocking thresholds
1002 >        int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
1003          while (joinMe.status >= 0) {
1004 <            boolean block; int wc;
1004 >            int wc;
1005              worker.helpJoinTask(joinMe);
1006              if (joinMe.status < 0)
1007                  break;
1008 <            if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
1009 <                if (threshold > 0)
1010 <                    --threshold;
1011 <                else
1012 <                    advanceEventCount(); // force release
1013 <                block = false;
1014 <            }
1015 <            else
1016 <                block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1017 <                                                 wc, wc - ONE_RUNNING);
1018 <            helpMaintainParallelism();
1019 <            if (block) {
1020 <                int c;
1021 <                joinMe.internalAwaitDone();
1008 >            else if (retries > 0)
1009 >                --retries;
1010 >            else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
1011 >                     UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1012 >                                              wc, wc - ONE_RUNNING)) {
1013 >                int stat, c; long h;
1014 >                while ((stat = joinMe.status) >= 0 &&
1015 >                       (h = eventWaiters) != 0L && // help release others
1016 >                       (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1017 >                    releaseEventWaiters();
1018 >                if (stat >= 0 &&
1019 >                    ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
1020 >                     (stat =
1021 >                      joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
1022 >                    helpMaintainParallelism(); // timeout or no running workers
1023                  do {} while (!UNSAFE.compareAndSwapInt
1024                               (this, workerCountsOffset,
1025                                c = workerCounts, c + ONE_RUNNING));
1026 <                break;
1026 >                if (stat < 0)
1027 >                    break;   // else restart
1028              }
1029          }
1030      }
1031  
1032      /**
1033 <     * Same idea as awaitJoin, but no helping
1033 >     * Same idea as awaitJoin, but no helping, retries, or timeouts.
1034       */
1035      final void awaitBlocker(ManagedBlocker blocker)
1036          throws InterruptedException {
1125        int threshold = parallelism;
1037          while (!blocker.isReleasable()) {
1038 <            boolean block; int wc;
1039 <            if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
1040 <                if (threshold > 0)
1041 <                    --threshold;
1131 <                else
1132 <                    advanceEventCount();
1133 <                block = false;
1134 <            }
1135 <            else
1136 <                block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1137 <                                                 wc, wc - ONE_RUNNING);
1138 <            helpMaintainParallelism();
1139 <            if (block) {
1038 >            int wc = workerCounts;
1039 >            if ((wc & RUNNING_COUNT_MASK) != 0 &&
1040 >                UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1041 >                                         wc, wc - ONE_RUNNING)) {
1042                  try {
1043 <                    do {} while (!blocker.isReleasable() && !blocker.block());
1043 >                    while (!blocker.isReleasable()) {
1044 >                        long h = eventWaiters;
1045 >                        if (h != 0L &&
1046 >                            (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1047 >                            releaseEventWaiters();
1048 >                        else if ((workerCounts & RUNNING_COUNT_MASK) == 0 &&
1049 >                                 runState < TERMINATING)
1050 >                            helpMaintainParallelism();
1051 >                        else if (blocker.block())
1052 >                            break;
1053 >                    }
1054                  } finally {
1055                      int c;
1056                      do {} while (!UNSAFE.compareAndSwapInt
# Line 1188 | Line 1100 | public class ForkJoinPool extends Abstra
1100      private void startTerminating() {
1101          cancelSubmissions();
1102          for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
1103 <            advanceEventCount();
1103 >            int c; // advance event count
1104 >            UNSAFE.compareAndSwapInt(this, eventCountOffset,
1105 >                                     c = eventCount, c+1);
1106              eventWaiters = 0L; // clobber lists
1107              spareWaiters = 0;
1108              ForkJoinWorkerThread[] ws = workers;
# Line 1351 | Line 1265 | public class ForkJoinPool extends Abstra
1265       * @param pc the initial parallelism level
1266       */
1267      private static int initialArraySizeFor(int pc) {
1268 <        // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1268 >        // If possible, initially allocate enough space for one spare
1269          int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
1270 +        // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1271          size |= size >>> 1;
1272          size |= size >>> 2;
1273          size |= size >>> 4;
# Line 1371 | Line 1286 | public class ForkJoinPool extends Abstra
1286          if (runState >= SHUTDOWN)
1287              throw new RejectedExecutionException();
1288          submissionQueue.offer(task);
1289 <        advanceEventCount();
1290 <        if (eventWaiters != 0L)
1291 <            releaseEventWaiters();
1377 <        if ((workerCounts >>> TOTAL_COUNT_SHIFT) < parallelism)
1378 <            addWorkerIfBelowTarget();
1289 >        int c; // try to increment event count -- CAS failure OK
1290 >        UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
1291 >        helpMaintainParallelism(); // create, start, or resume some workers
1292      }
1293  
1294      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines