ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.65 by dl, Wed Aug 18 14:05:27 2010 UTC vs.
Revision 1.66 by dl, Sun Aug 29 23:34:46 2010 UTC

# Line 161 | Line 161 | public class ForkJoinPool extends Abstra
161       *      re-activate a spare thread to compensate for blocked
162       *      joiners until they unblock.
163       *
164 <     * Because the determining existence of conservatively safe
165 <     * helping targets, the availability of already-created spares,
166 <     * and the apparent need to create new spares are all racy and
167 <     * require heuristic guidance, we rely on multiple retries of
168 <     * each. Further, because it is impossible to keep exactly the
169 <     * target (parallelism) number of threads running at any given
170 <     * time, we allow compensation during joins to fail, and enlist
171 <     * all other threads to help out whenever they are not otherwise
172 <     * occupied (i.e., mainly in method preStep).
164 >     * It is impossible to keep exactly the target (parallelism)
165 >     * number of threads running at any given time.  Determining
166 >     * existence of conservatively safe helping targets, the
167 >     * availability of already-created spares, and the apparent need
168 >     * to create new spares are all racy and require heuristic
169 >     * guidance, so we rely on multiple retries of each.  Compensation
170 >     * occurs in slow-motion. It is triggered only upon timeouts of
171 >     * Object.wait used for joins. This reduces poor decisions that
172 >     * would otherwise be made when threads are waiting for others
173 >     * that are stalled because of unrelated activities such as
174 >     * garbage collection.
175       *
176       * The ManagedBlocker extension API can't use helping so relies
177       * only on compensation in method awaitBlocker.
# Line 271 | Line 273 | public class ForkJoinPool extends Abstra
273       * In addition to allowing simpler decisions about need for
274       * wakeup, the event count bits in eventWaiters serve the role of
275       * tags to avoid ABA errors in Treiber stacks. Upon any wakeup,
276 <     * released threads also try to release others (but give up upon
277 <     * contention to reduce useless flailing).  The net effect is a
278 <     * tree-like diffusion of signals, where released threads (and
279 <     * possibly others) help with unparks.  To further reduce
280 <     * contention effects a bit, failed CASes to increment field
279 <     * eventCount are tolerated without retries in signalWork.
276 >     * released threads also try to release at most two others.  The
277 >     * net effect is a tree-like diffusion of signals, where released
278 >     * threads (and possibly others) help with unparks.  To further
279 >     * reduce contention effects a bit, failed CASes to increment
280 >     * field eventCount are tolerated without retries in signalWork.
281       * Conceptually they are merged into the same event, which is OK
282       * when their only purpose is to enable workers to scan for work.
283       *
284 <     * 5. Managing suspension of extra workers. When a worker is about
285 <     * to block waiting for a join (or via ManagedBlockers), we may
286 <     * create a new thread to maintain parallelism level, or at least
287 <     * avoid starvation. Usually, extra threads are needed for only
288 <     * very short periods, yet join dependencies are such that we
289 <     * sometimes need them in bursts. Rather than create new threads
290 <     * each time this happens, we suspend no-longer-needed extra ones
291 <     * as "spares". For most purposes, we don't distinguish "extra"
292 <     * spare threads from normal "core" threads: On each call to
293 <     * preStep (the only point at which we can do this) a worker
294 <     * checks to see if there are now too many running workers, and if
295 <     * so, suspends itself.  Method helpMaintainParallelism looks for
296 <     * suspended threads to resume before considering creating a new
297 <     * replacement. The spares themselves are encoded on another
298 <     * variant of a Treiber Stack, headed at field "spareWaiters".
299 <     * Note that the use of spares is intrinsically racy.  One thread
300 <     * may become a spare at about the same time as another is
301 <     * needlessly being created. We counteract this and related slop
302 <     * in part by requiring resumed spares to immediately recheck (in
303 <     * preStep) to see whether they they should re-suspend.
304 <     *
305 <     * 6. Killing off unneeded workers. The Spare and Event queues use
306 <     * similar mechanisms to shed unused workers: The oldest (first)
307 <     * waiter uses a timed rather than hard wait. When this wait times
308 <     * out without a normal wakeup, it tries to shutdown any one (for
309 <     * convenience the newest) other waiter via tryShutdownSpare or
310 <     * tryShutdownWaiter, respectively. The wakeup rates for spares
311 <     * are much shorter than for waiters. Together, they will
312 <     * eventually reduce the number of worker threads to a minimum of
313 <     * one after a long enough period without use.
284 >     * 5. Managing suspension of extra workers. When a worker notices
285 >     * (usually upon timeout of a wait()) that there are too few
286 >     * running threads, we may create a new thread to maintain
287 >     * parallelism level, or at least avoid starvation. Usually, extra
288 >     * threads are needed for only very short periods, yet join
289 >     * dependencies are such that we sometimes need them in
290 >     * bursts. Rather than create new threads each time this happens,
291 >     * we suspend no-longer-needed extra ones as "spares". For most
292 >     * purposes, we don't distinguish "extra" spare threads from
293 >     * normal "core" threads: On each call to preStep (the only point
294 >     * at which we can do this) a worker checks to see if there are
295 >     * now too many running workers, and if so, suspends itself.
296 >     * Method helpMaintainParallelism looks for suspended threads to
297 >     * resume before considering creating a new replacement. The
298 >     * spares themselves are encoded on another variant of a Treiber
299 >     * Stack, headed at field "spareWaiters".  Note that the use of
300 >     * spares is intrinsically racy.  One thread may become a spare at
301 >     * about the same time as another is needlessly being created. We
302 >     * counteract this and related slop in part by requiring resumed
303 >     * spares to immediately recheck (in preStep) to see whether they
304 >     * they should re-suspend.
305 >     *
306 >     * 6. Killing off unneeded workers. A timeout mechanism is used to
307 >     * shed unused workers: The oldest (first) event queue waiter uses
308 >     * a timed rather than hard wait. When this wait times out without
309 >     * a normal wakeup, it tries to shutdown any one (for convenience
310 >     * the newest) other spare or event waiter via
311 >     * tryShutdownUnusedWorker. This eventually reduces the number of
312 >     * worker threads to a minimum of one after a long enough period
313 >     * without use.
314       *
315       * 7. Deciding when to create new workers. The main dynamic
316       * control in this class is deciding when to create extra threads
# Line 322 | Line 323 | public class ForkJoinPool extends Abstra
323       * compilation, and wake-up lags. These transients are extremely
324       * common -- we are normally trying to fully saturate the CPUs on
325       * a machine, so almost any activity other than running tasks
326 <     * impedes accuracy. Our main defense is to allow some slack in
327 <     * creation thresholds, using rules that reflect the fact that the
328 <     * more threads we have running, the more likely that we are
329 <     * underestimating the number running threads. (We also include
330 <     * some heuristic use of Thread.yield when all workers appear to
331 <     * be busy, to improve likelihood of counts settling.) The rules
332 <     * also better cope with the fact that some of the methods in this
333 <     * class tend to never become compiled (but are interpreted), so
334 <     * some components of the entire set of controls might execute 100
334 <     * times faster than others. And similarly for cases where the
335 <     * apparent lack of work is just due to GC stalls and other
336 <     * transient system activity.
326 >     * impedes accuracy. Our main defense is to allow parallelism to
327 >     * lapse for a while during joins, and use a timeout to see if,
328 >     * after the resulting settling, there is still a need for
329 >     * additional workers.  This also better copes with the fact that
330 >     * some of the methods in this class tend to never become compiled
331 >     * (but are interpreted), so some components of the entire set of
332 >     * controls might execute 100 times faster than others. And
333 >     * similarly for cases where the apparent lack of work is just due
334 >     * to GC stalls and other transient system activity.
335       *
336       * Beware that there is a lot of representation-level coupling
337       * among classes ForkJoinPool, ForkJoinWorkerThread, and
# Line 420 | Line 418 | public class ForkJoinPool extends Abstra
418          new AtomicInteger();
419  
420      /**
421 +     * The time to block in a join (see awaitJoin) before checking if
422 +     * a new worker should be (re)started to maintain parallelism
423 +     * level. The value should be short enough to maintain gloabal
424 +     * responsiveness and progress but long enough to avoid
425 +     * counterproductive firings during GC stalls or unrelated system
426 +     * activity, and to not bog down systems with continual re-firings
427 +     * on GCs or legitimately long waits.
428 +     */
429 +    private static final long JOIN_TIMEOUT_MILLIS = 250L; // 4 per second
430 +
431 +    /**
432       * The wakeup interval (in nanoseconds) for the oldest worker
433 <     * worker waiting for an event invokes tryShutdownWaiter to shrink
433 >     * worker waiting for an event invokes tryShutdownUnusedWorker to shrink
434       * the number of workers.  The exact value does not matter too
435       * much, but should be long enough to slowly release resources
436       * during long periods without use without disrupting normal use.
437       */
438      private static final long SHRINK_RATE_NANOS =
439 <        60L * 1000L * 1000L * 1000L; // one minute
439 >        30L * 1000L * 1000L * 1000L; // 2 per minute
440  
441      /**
442       * Absolute bound for parallelism level. Twice this number plus
# Line 567 | Line 576 | public class ForkJoinPool extends Abstra
576       */
577      private final int poolNumber;
578  
570
579      // Utilities for CASing fields. Note that most of these
580      // are usually manually inlined by callers
581  
# Line 615 | Line 623 | public class ForkJoinPool extends Abstra
623      }
624  
625      /**
618     * Increments event count
619     */
620    private void advanceEventCount() {
621        int c;
622        do {} while(!UNSAFE.compareAndSwapInt(this, eventCountOffset,
623                                              c = eventCount, c+1));
624    }
625
626    /**
627     * Tries incrementing active count; fails on contention.
628     * Called by workers before executing tasks.
629     *
630     * @return true on success
631     */
632    final boolean tryIncrementActiveCount() {
633        int c;
634        return UNSAFE.compareAndSwapInt(this, runStateOffset,
635                                        c = runState, c + 1);
636    }
637
638    /**
626       * Tries decrementing active count; fails on contention.
627       * Called when workers cannot find tasks to run.
628       */
# Line 703 | Line 690 | public class ForkJoinPool extends Abstra
690          }
691      }
692  
706    // adding and removing workers
707
708    /**
709     * Tries to create and add new worker. Assumes that worker counts
710     * are already updated to accommodate the worker, so adjusts on
711     * failure.
712     *
713     * @return the worker, or null on failure
714     */
715    private ForkJoinWorkerThread addWorker() {
716        ForkJoinWorkerThread w = null;
717        try {
718            w = factory.newThread(this);
719        } finally { // Adjust on either null or exceptional factory return
720            if (w == null) {
721                decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
722                tryTerminate(false); // in case of failure during shutdown
723            }
724        }
725        if (w != null) {
726            w.start(recordWorker(w), ueh);
727            advanceEventCount();
728        }
729        return w;
730    }
731
693      /**
694       * Final callback from terminating worker.  Removes record of
695       * worker from array, and adjusts counts. If pool is shutting
# Line 750 | Line 711 | public class ForkJoinPool extends Abstra
711       * Releases workers blocked on a count not equal to current count.
712       * Normally called after precheck that eventWaiters isn't zero to
713       * avoid wasted array checks. Gives up upon a change in count or
714 <     * contention, letting other workers take over.
714 >     * upon releasing two workers, letting others take over.
715       */
716      private void releaseEventWaiters() {
717          ForkJoinWorkerThread[] ws = workers;
718          int n = ws.length;
719          long h = eventWaiters;
720          int ec = eventCount;
721 +        boolean releasedOne = false;
722          ForkJoinWorkerThread w; int id;
723 <        while ((int)(h >>> EVENT_COUNT_SHIFT) != ec &&
724 <               (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
725 <               id < n && (w = ws[id]) != null &&
726 <               UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
727 <                                         h,  h = w.nextWaiter)) {
728 <            LockSupport.unpark(w);
729 <            if (eventWaiters != h || eventCount != ec)
723 >        while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
724 >               (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
725 >               id < n && (w = ws[id]) != null) {
726 >            if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
727 >                                          h,  w.nextWaiter)) {
728 >                LockSupport.unpark(w);
729 >                if (releasedOne) // exit on second release
730 >                    break;
731 >                releasedOne = true;
732 >            }
733 >            if (eventCount != ec)
734                  break;
735 +            h = eventWaiters;
736          }
737      }
738  
# Line 782 | Line 749 | public class ForkJoinPool extends Abstra
749  
750      /**
751       * Adds the given worker to event queue and blocks until
752 <     * terminating or event count advances from the workers
786 <     * lastEventCount value
752 >     * terminating or event count advances from the given value
753       *
754       * @param w the calling worker thread
755 +     * @param ec the count
756       */
757 <    private void eventSync(ForkJoinWorkerThread w) {
791 <        int ec = w.lastEventCount;
757 >    private void eventSync(ForkJoinWorkerThread w, int ec) {
758          long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
759          long h;
760          while ((runState < SHUTDOWN || !tryTerminate(false)) &&
# Line 808 | Line 774 | public class ForkJoinPool extends Abstra
774       * event waiter) until terminating or event count advances from
775       * the given value. The oldest (first) waiter uses a timed wait to
776       * occasionally one-by-one shrink the number of workers (to a
777 <     * minumum of one) if the pool has not been used for extended
777 >     * minimum of one) if the pool has not been used for extended
778       * periods.
779       *
780       * @param w the calling worker thread
# Line 821 | Line 787 | public class ForkJoinPool extends Abstra
787                                     (workerCounts & RUNNING_COUNT_MASK) <= 1);
788                  long startTime = untimed? 0 : System.nanoTime();
789                  Thread.interrupted();         // clear/ignore interrupt
790 <                if (eventCount != ec || !w.isRunning() ||
790 >                if (eventCount != ec || w.runState != 0 ||
791                      runState >= TERMINATING)  // recheck after clear
792                      break;
793                  if (untimed)
794                      LockSupport.park(w);
795                  else {
796                      LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
797 <                    if (eventCount != ec || !w.isRunning() ||
797 >                    if (eventCount != ec || w.runState != 0 ||
798                          runState >= TERMINATING)
799                          break;
800                      if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
801 <                        tryShutdownWaiter(ec);
801 >                        tryShutdownUnusedWorker(ec);
802                  }
803              }
804          }
805      }
806  
807 <    /**
842 <     * Callback from the oldest waiter in awaitEvent waking up after a
843 <     * period of non-use. Tries (once) to shutdown an event waiter (or
844 <     * a spare, if one exists). Note that we don't need CAS or locks
845 <     * here because the method is called only from one thread
846 <     * occasionally waking (and even misfires are OK). Note that
847 <     * until the shutdown worker fully terminates, workerCounts
848 <     * will overestimate total count, which is tolerable.
849 <     *
850 <     * @param ec the event count waited on by caller (to abort
851 <     * attempt if count has since changed).
852 <     */
853 <    private void tryShutdownWaiter(int ec) {
854 <        if (spareWaiters != 0) { // prefer killing spares
855 <            tryShutdownSpare();
856 <            return;
857 <        }
858 <        ForkJoinWorkerThread[] ws = workers;
859 <        int n = ws.length;
860 <        long h = eventWaiters;
861 <        ForkJoinWorkerThread w; int id; long nh;
862 <        if (runState == 0 &&
863 <            submissionQueue.isEmpty() &&
864 <            eventCount == ec &&
865 <            (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
866 <            id < n && (w = ws[id]) != null &&
867 <            (nh = w.nextWaiter) != 0L && // keep at least one worker
868 <            UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) {
869 <            w.shutdown();
870 <            LockSupport.unpark(w);
871 <        }
872 <        releaseEventWaiters();
873 <    }
874 <
875 <    // Maintaining spares
807 >    // Maintaining parallelism
808  
809      /**
810       * Pushes worker onto the spare stack
# Line 884 | Line 816 | public class ForkJoinPool extends Abstra
816      }
817  
818      /**
819 <     * Callback from oldest spare occasionally waking up.  Tries
820 <     * (once) to shutdown a spare. Same idea as tryShutdownWaiter.
819 >     * Tries (once) to resume a spare if the number of running
820 >     * threads is less than target.
821       */
822 <    final void tryShutdownSpare() {
822 >    private void tryResumeSpare() {
823          int sw, id;
892        ForkJoinWorkerThread w;
893        ForkJoinWorkerThread[] ws;
894        if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
895            id < (ws = workers).length && (w = ws[id]) != null &&
896            (workerCounts & RUNNING_COUNT_MASK) >= parallelism &&
897            UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
898                                     sw, w.nextSpare)) {
899            w.shutdown();
900            LockSupport.unpark(w);
901            advanceEventCount();
902        }
903    }
904
905    /**
906     * Tries (once) to resume a spare if worker counts match
907     * the given count.
908     *
909     * @param wc workerCounts value on invocation of this method
910     */
911    private void tryResumeSpare(int wc) {
824          ForkJoinWorkerThread[] ws = workers;
825          int n = ws.length;
826 <        int sw, id, rs;  ForkJoinWorkerThread w;
827 <        if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
826 >        ForkJoinWorkerThread w;
827 >        if ((sw = spareWaiters) != 0 &&
828 >            (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
829              id < n && (w = ws[id]) != null &&
830 <            (rs = runState) < TERMINATING &&
831 <            eventWaiters == 0L && workerCounts == wc) {
832 <            // In case all workers busy, heuristically back off to let settle
833 <            Thread.yield();
834 <            if (eventWaiters == 0L && runState == rs && // recheck
835 <                workerCounts == wc && spareWaiters == sw &&
836 <                UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
837 <                                         sw, w.nextSpare)) {
838 <                int c;              // increment running count before resume
839 <                do {} while(!UNSAFE.compareAndSwapInt
840 <                            (this, workerCountsOffset,
841 <                             c = workerCounts, c + ONE_RUNNING));
929 <                if (w.tryUnsuspend())
930 <                    LockSupport.unpark(w);
931 <                else               // back out if w was shutdown
932 <                    decrementWorkerCounts(ONE_RUNNING, 0);
933 <            }
830 >            (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
831 >            spareWaiters == sw &&
832 >            UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
833 >                                     sw, w.nextSpare)) {
834 >            int c; // increment running count before resume
835 >            do {} while(!UNSAFE.compareAndSwapInt
836 >                        (this, workerCountsOffset,
837 >                         c = workerCounts, c + ONE_RUNNING));
838 >            if (w.tryUnsuspend())
839 >                LockSupport.unpark(w);
840 >            else   // back out if w was shutdown
841 >                decrementWorkerCounts(ONE_RUNNING, 0);
842          }
843      }
844  
937    // adding workers on demand
938
845      /**
846 <     * Adds one or more workers if needed to establish target parallelism.
847 <     * Retries upon contention.
846 >     * Tries to increase the number of running workers if below target
847 >     * parallelism: If a spare exists tries to resume it via
848 >     * tryResumeSpare.  Otherwise, if not enough total workers or all
849 >     * existing workers are busy, adds a new worker. In all casses also
850 >     * helps wake up releasable workers waiting for work.
851       */
852 <    private void addWorkerIfBelowTarget() {
852 >    private void helpMaintainParallelism() {
853          int pc = parallelism;
854 <        int wc;
855 <        while (((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < pc &&
856 <               runState < TERMINATING) {
857 <            if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
858 <                                         wc + (ONE_RUNNING|ONE_TOTAL))) {
859 <                if (addWorker() == null)
854 >        int wc, rs, tc;
855 >        while (((wc = workerCounts) & RUNNING_COUNT_MASK) < pc &&
856 >               (rs = runState) < TERMINATING) {
857 >            if (spareWaiters != 0)
858 >                tryResumeSpare();
859 >            else if ((tc = wc >>> TOTAL_COUNT_SHIFT) >= MAX_WORKERS ||
860 >                     (tc >= pc && (rs & ACTIVE_COUNT_MASK) != tc))
861 >                break;   // enough total
862 >            else if (runState == rs && workerCounts == wc &&
863 >                     UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
864 >                                              wc + (ONE_RUNNING|ONE_TOTAL))) {
865 >                ForkJoinWorkerThread w = null;
866 >                try {
867 >                    w = factory.newThread(this);
868 >                } finally { // adjust on null or exceptional factory return
869 >                    if (w == null) {
870 >                        decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
871 >                        tryTerminate(false); // handle failure during shutdown
872 >                    }
873 >                }
874 >                if (w == null)
875                      break;
876 +                w.start(recordWorker(w), ueh);
877 +                if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
878 +                    int c; // advance event count
879 +                    UNSAFE.compareAndSwapInt(this, eventCountOffset,
880 +                                             c = eventCount, c+1);
881 +                    break; // add at most one unless total below target
882 +                }
883              }
884          }
885 +        if (eventWaiters != 0L)
886 +            releaseEventWaiters();
887      }
888  
889      /**
890 <     * Tries (once) to add a new worker if all existing workers are
891 <     * busy, and there are either no running workers or the deficit is
892 <     * at least twice the surplus.
893 <     *
894 <     * @param wc workerCounts value on invocation of this method
895 <     */
896 <    private void tryAddWorkerIfBusy(int wc) {
964 <        int tc, rc, rs;
965 <        int pc = parallelism;
966 <        if ((tc = wc >>> TOTAL_COUNT_SHIFT) < MAX_WORKERS &&
967 <            ((rc = wc & RUNNING_COUNT_MASK) == 0 ||
968 <             rc < pc - ((tc - pc) << 1)) &&
969 <            (rs = runState) < TERMINATING &&
970 <            (rs & ACTIVE_COUNT_MASK) == tc) {
971 <            // Since all workers busy, heuristically back off to let settle
972 <            Thread.yield();
973 <            if (eventWaiters == 0L && spareWaiters == 0 && // recheck
974 <                runState == rs && workerCounts == wc &&
975 <                UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
976 <                                         wc + (ONE_RUNNING|ONE_TOTAL)))
977 <                addWorker();
978 <        }
979 <    }
980 <
981 <    /**
982 <     * Does at most one of:
983 <     *
984 <     * 1. Help wake up existing workers waiting for work via
985 <     *    releaseEventWaiters. (If any exist, then it doesn't
986 <     *    matter right now if under target parallelism level.)
987 <     *
988 <     * 2. If a spare exists, try (once) to resume it via tryResumeSpare.
989 <     *
990 <     * 3. If there are not enough total workers, add some
991 <     *    via addWorkerIfBelowTarget;
890 >     * Callback from the oldest waiter in awaitEvent waking up after a
891 >     * period of non-use. If all workers are idle, tries (once) to
892 >     * shutdown an event waiter or a spare, if one exists. Note that
893 >     * we don't need CAS or locks here because the method is called
894 >     * only from one thread occasionally waking (and even misfires are
895 >     * OK). Note that until the shutdown worker fully terminates,
896 >     * workerCounts will overestimate total count, which is tolerable.
897       *
898 <     * 4. Try (once) to add a new worker if all existing workers
899 <     *     are busy, via tryAddWorkerIfBusy
898 >     * @param ec the event count waited on by caller (to abort
899 >     * attempt if count has since changed).
900       */
901 <    private void helpMaintainParallelism() {
902 <        long h; int pc, wc;
903 <        if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0) {
904 <            if ((int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
905 <                releaseEventWaiters(); // avoid useless call
906 <        }
907 <        else if ((pc = parallelism) >
908 <                 ((wc = workerCounts) & RUNNING_COUNT_MASK)) {
909 <            if (spareWaiters != 0)
910 <                tryResumeSpare(wc);
911 <            else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
912 <                addWorkerIfBelowTarget();
913 <            else
914 <                tryAddWorkerIfBusy(wc);
901 >    private void tryShutdownUnusedWorker(int ec) {
902 >        if (runState == 0 && eventCount == ec) { // only trigger if all idle
903 >            ForkJoinWorkerThread[] ws = workers;
904 >            int n = ws.length;
905 >            ForkJoinWorkerThread w = null;
906 >            boolean shutdown = false;
907 >            int sw;
908 >            long h;
909 >            if ((sw = spareWaiters) != 0) { // prefer killing spares
910 >                int id = (sw & SPARE_ID_MASK) - 1;
911 >                if (id >= 0 && id < n && (w = ws[id]) != null &&
912 >                    UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
913 >                                             sw, w.nextSpare))
914 >                    shutdown = true;
915 >            }
916 >            else if ((h = eventWaiters) != 0L) {
917 >                long nh;
918 >                int id = ((int)(h & WAITER_ID_MASK)) - 1;
919 >                if (id >= 0 && id < n && (w = ws[id]) != null &&
920 >                    (nh = w.nextWaiter) != 0L && // keep at least one worker
921 >                    UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
922 >                    shutdown = true;
923 >            }
924 >            if (w != null && shutdown) {
925 >                w.shutdown();
926 >                LockSupport.unpark(w);
927 >            }
928          }
929 +        releaseEventWaiters(); // in case of interference
930      }
931  
932      /**
# Line 1015 | Line 934 | public class ForkJoinPool extends Abstra
934       * stealing a task or taking a submission and running it).
935       * Performs one or more of the following:
936       *
937 <     * 1. If the worker is active, try to set its active status to
938 <     *    inactive and update activeCount. On contention, we may try
939 <     *    again on this or subsequent call.
940 <     *
941 <     * 2. Release any existing event waiters that are now relesable
942 <     *
943 <     * 3. If there are too many running threads, suspend this worker
944 <     *    (first forcing inactive if necessary).  If it is not
945 <     *    needed, it may be killed while suspended via
946 <     *    tryShutdownSpare. Otherwise, upon resume it rechecks to make
947 <     *    sure that it is still needed.
948 <     *
949 <     * 4. If more than 1 miss, await the next task event via
950 <     *    eventSync (first forcing inactivation if necessary), upon
951 <     *    which worker may also be killed, via tryShutdownWaiter.
952 <     *
953 <     * 5. Help reactivate other workers via helpMaintainParallelism
937 >     * 1. If the worker is active and either did not run a task
938 >     *    or there are too many workers, try to set its active status
939 >     *    to inactive and update activeCount. On contention, we may
940 >     *    try again in this or a subsequent call.
941 >     *
942 >     * 2. If not enough total workers, help create some.
943 >     *
944 >     * 3. If there are too many running workers, suspend this worker
945 >     *    (first forcing inactive if necessary).  If it is not needed,
946 >     *    it may be shutdown while suspended (via
947 >     *    tryShutdownUnusedWorker).  Otherwise, upon resume it
948 >     *    rechecks running thread count and need for event sync.
949 >     *
950 >     * 4. If worker did not run a task, await the next task event via
951 >     *    eventSync if necessary (first forcing inactivation), upon
952 >     *    which the worker may be shutdown via
953 >     *    tryShutdownUnusedWorker.  Otherwise, help release any
954 >     *    existing event waiters that are now releasable,
955       *
956       * @param w the worker
957 <     * @param misses the number of scans by caller failing to find work
1038 <     * (saturating at 2 to avoid wraparound)
957 >     * @param ran true if worker ran a task since last call to this method
958       */
959 <    final void preStep(ForkJoinWorkerThread w, int misses) {
959 >    final void preStep(ForkJoinWorkerThread w, boolean ran) {
960 >        int wec = w.lastEventCount;
961          boolean active = w.active;
962 +        boolean inactivate = false;
963          int pc = parallelism;
964 <        for (;;) {
965 <            int rs, wc, rc, ec; long h;
966 <            if (active && UNSAFE.compareAndSwapInt(this, runStateOffset,
967 <                                                   rs = runState, rs - 1))
968 <                active = w.active = false;
969 <            if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0 &&
970 <                (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) {
971 <                releaseEventWaiters();
1051 <                if (misses > 1)
1052 <                    continue;                  // clear before sync below
1053 <            }
1054 <            if ((rc = ((wc = workerCounts) & RUNNING_COUNT_MASK)) > pc) {
1055 <                if (!active &&                 // must inactivate to suspend
964 >        int rs;
965 >        while (w.runState == 0 && (rs = runState) < TERMINATING) {
966 >            if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
967 >                UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
968 >                inactivate = active = w.active = false;
969 >            int wc = workerCounts;
970 >            if ((wc & RUNNING_COUNT_MASK) > pc) {
971 >                if (!(inactivate |= active) && // must inactivate to suspend
972                      workerCounts == wc &&      // try to suspend as spare
973                      UNSAFE.compareAndSwapInt(this, workerCountsOffset,
974 <                                             wc, wc - ONE_RUNNING)) {
974 >                                             wc, wc - ONE_RUNNING))
975                      w.suspendAsSpare();
1060                    if (!w.isRunning())
1061                        break;                 // was killed while spare
1062                }
1063                continue;
976              }
977 <            if (misses > 0) {
978 <                if ((ec = eventCount) == w.lastEventCount && misses > 1) {
979 <                    if (!active) {             // must inactivate to sync
980 <                        eventSync(w);
981 <                        if (w.isRunning())
982 <                            misses = 1;        // don't re-sync
983 <                        else
984 <                            break;             // was killed while waiting
985 <                    }
986 <                    continue;
977 >            else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
978 >                helpMaintainParallelism();     // not enough workers
979 >            else if (!ran) {
980 >                long h = eventWaiters;
981 >                int ec = eventCount;
982 >                if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
983 >                    releaseEventWaiters();     // release others before waiting
984 >                else if (ec != wec) {
985 >                    w.lastEventCount = ec;     // no need to wait
986 >                    break;
987                  }
988 <                w.lastEventCount = ec;
988 >                else if (!(inactivate |= active))  
989 >                    eventSync(w, wec);         // must inactivate before sync
990              }
991 <            if (rc < pc)
992 <                helpMaintainParallelism();
1080 <            break;
991 >            else
992 >                break;
993          }
994      }
995  
996      /**
997       * Helps and/or blocks awaiting join of the given task.
998 <     * Alternates between helpJoinTask() and helpMaintainParallelism()
1087 <     * as many times as there is a deficit in running count (or longer
1088 <     * if running count would become zero), then blocks if task still
1089 <     * not done.
998 >     * See above for explanation.
999       *
1000       * @param joinMe the task to join
1001 +     * @param worker the current worker thread
1002       */
1003      final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
1004 <        int threshold = parallelism;         // descend blocking thresholds
1004 >        int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
1005          while (joinMe.status >= 0) {
1006 <            boolean block; int wc;
1006 >            int wc;
1007              worker.helpJoinTask(joinMe);
1008              if (joinMe.status < 0)
1009                  break;
1010 <            if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
1011 <                if (threshold > 0)
1012 <                    --threshold;
1013 <                else
1014 <                    advanceEventCount(); // force release
1015 <                block = false;
1016 <            }
1017 <            else
1018 <                block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1019 <                                                 wc, wc - ONE_RUNNING);
1020 <            helpMaintainParallelism();
1021 <            if (block) {
1022 <                int c;
1023 <                joinMe.internalAwaitDone();
1010 >            else if (retries > 0)
1011 >                --retries;
1012 >            else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
1013 >                     UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1014 >                                              wc, wc - ONE_RUNNING)) {
1015 >                int stat, c; long h;
1016 >                while ((stat = joinMe.status) >= 0 &&
1017 >                       (h = eventWaiters) != 0L && // help release others
1018 >                       (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1019 >                    releaseEventWaiters();
1020 >                if (stat >= 0 &&
1021 >                    ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
1022 >                     (stat =
1023 >                      joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
1024 >                    helpMaintainParallelism(); // timeout or no running workers
1025                  do {} while (!UNSAFE.compareAndSwapInt
1026                               (this, workerCountsOffset,
1027                                c = workerCounts, c + ONE_RUNNING));
1028 <                break;
1028 >                if (stat < 0)
1029 >                    break;   // else restart
1030              }
1031          }
1032      }
1033  
1034      /**
1035 <     * Same idea as awaitJoin, but no helping
1035 >     * Same idea as awaitJoin, but no helping, retries, or timeouts.
1036       */
1037      final void awaitBlocker(ManagedBlocker blocker)
1038          throws InterruptedException {
1127        int threshold = parallelism;
1039          while (!blocker.isReleasable()) {
1040 <            boolean block; int wc;
1041 <            if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
1042 <                if (threshold > 0)
1043 <                    --threshold;
1133 <                else
1134 <                    advanceEventCount();
1135 <                block = false;
1136 <            }
1137 <            else
1138 <                block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1139 <                                                 wc, wc - ONE_RUNNING);
1140 <            helpMaintainParallelism();
1141 <            if (block) {
1040 >            int wc = workerCounts;
1041 >            if ((wc & RUNNING_COUNT_MASK) != 0 &&
1042 >                UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1043 >                                         wc, wc - ONE_RUNNING)) {
1044                  try {
1045 <                    do {} while (!blocker.isReleasable() && !blocker.block());
1045 >                    while (!blocker.isReleasable()) {
1046 >                        long h = eventWaiters;
1047 >                        if (h != 0L &&
1048 >                            (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1049 >                            releaseEventWaiters();
1050 >                        else if ((workerCounts & RUNNING_COUNT_MASK) == 0 &&
1051 >                                 runState < TERMINATING)
1052 >                            helpMaintainParallelism();
1053 >                        else if (blocker.block())
1054 >                            break;
1055 >                    }
1056                  } finally {
1057                      int c;
1058                      do {} while (!UNSAFE.compareAndSwapInt
# Line 1190 | Line 1102 | public class ForkJoinPool extends Abstra
1102      private void startTerminating() {
1103          cancelSubmissions();
1104          for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
1105 <            advanceEventCount();
1105 >            int c; // advance event count
1106 >            UNSAFE.compareAndSwapInt(this, eventCountOffset,
1107 >                                     c = eventCount, c+1);
1108              eventWaiters = 0L; // clobber lists
1109              spareWaiters = 0;
1110              ForkJoinWorkerThread[] ws = workers;
# Line 1353 | Line 1267 | public class ForkJoinPool extends Abstra
1267       * @param pc the initial parallelism level
1268       */
1269      private static int initialArraySizeFor(int pc) {
1270 <        // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1270 >        // If possible, initially allocate enough space for one spare
1271          int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
1272 +        // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1273          size |= size >>> 1;
1274          size |= size >>> 2;
1275          size |= size >>> 4;
# Line 1373 | Line 1288 | public class ForkJoinPool extends Abstra
1288          if (runState >= SHUTDOWN)
1289              throw new RejectedExecutionException();
1290          submissionQueue.offer(task);
1291 <        advanceEventCount();
1292 <        if (eventWaiters != 0L)
1293 <            releaseEventWaiters();
1379 <        if ((workerCounts >>> TOTAL_COUNT_SHIFT) < parallelism)
1380 <            addWorkerIfBelowTarget();
1291 >        int c; // try to increment event count -- CAS failure OK
1292 >        UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
1293 >        helpMaintainParallelism(); // create, start, or resume some workers
1294      }
1295  
1296      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines