ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.65 by dl, Wed Aug 18 14:05:27 2010 UTC vs.
Revision 1.71 by jsr166, Mon Sep 6 21:36:43 2010 UTC

# Line 7 | Line 7
7   package jsr166y;
8  
9   import java.util.concurrent.*;
10
10   import java.util.ArrayList;
11   import java.util.Arrays;
12   import java.util.Collection;
# Line 69 | Line 68 | import java.util.concurrent.CountDownLat
68   *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
69   *  </tr>
70   *  <tr>
71 < *    <td> <b>Arange async execution</td>
71 > *    <td> <b>Arrange async execution</td>
72   *    <td> {@link #execute(ForkJoinTask)}</td>
73   *    <td> {@link ForkJoinTask#fork}</td>
74   *  </tr>
# Line 140 | Line 139 | public class ForkJoinPool extends Abstra
139       * Beyond work-stealing support and essential bookkeeping, the
140       * main responsibility of this framework is to take actions when
141       * one worker is waiting to join a task stolen (or always held by)
142 <     * another.  Becauae we are multiplexing many tasks on to a pool
142 >     * another.  Because we are multiplexing many tasks on to a pool
143       * of workers, we can't just let them block (as in Thread.join).
144       * We also cannot just reassign the joiner's run-time stack with
145       * another and replace it later, which would be a form of
# Line 157 | Line 156 | public class ForkJoinPool extends Abstra
156       *      links to try to find such a task.
157       *
158       *   Compensating: Unless there are already enough live threads,
159 <     *      method helpMaintainParallelism() may create or or
159 >     *      method helpMaintainParallelism() may create or
160       *      re-activate a spare thread to compensate for blocked
161       *      joiners until they unblock.
162       *
163 <     * Because the determining existence of conservatively safe
164 <     * helping targets, the availability of already-created spares,
165 <     * and the apparent need to create new spares are all racy and
166 <     * require heuristic guidance, we rely on multiple retries of
167 <     * each. Further, because it is impossible to keep exactly the
168 <     * target (parallelism) number of threads running at any given
169 <     * time, we allow compensation during joins to fail, and enlist
170 <     * all other threads to help out whenever they are not otherwise
171 <     * occupied (i.e., mainly in method preStep).
163 >     * It is impossible to keep exactly the target (parallelism)
164 >     * number of threads running at any given time.  Determining
165 >     * existence of conservatively safe helping targets, the
166 >     * availability of already-created spares, and the apparent need
167 >     * to create new spares are all racy and require heuristic
168 >     * guidance, so we rely on multiple retries of each.  Compensation
169 >     * occurs in slow-motion. It is triggered only upon timeouts of
170 >     * Object.wait used for joins. This reduces poor decisions that
171 >     * would otherwise be made when threads are waiting for others
172 >     * that are stalled because of unrelated activities such as
173 >     * garbage collection.
174       *
175       * The ManagedBlocker extension API can't use helping so relies
176       * only on compensation in method awaitBlocker.
# Line 224 | Line 225 | public class ForkJoinPool extends Abstra
225       * ManagedBlocker), we may create or resume others to take their
226       * place until they unblock (see below). Implementing this
227       * requires counts of the number of "running" threads (i.e., those
228 <     * that are neither blocked nor artifically suspended) as well as
228 >     * that are neither blocked nor artificially suspended) as well as
229       * the total number.  These two values are packed into one field,
230       * "workerCounts" because we need accurate snapshots when deciding
231       * to create, resume or suspend.  Note however that the
232 <     * correspondance of these counts to reality is not guaranteed. In
232 >     * correspondence of these counts to reality is not guaranteed. In
233       * particular updates for unblocked threads may lag until they
234       * actually wake up.
235       *
# Line 271 | Line 272 | public class ForkJoinPool extends Abstra
272       * In addition to allowing simpler decisions about need for
273       * wakeup, the event count bits in eventWaiters serve the role of
274       * tags to avoid ABA errors in Treiber stacks. Upon any wakeup,
275 <     * released threads also try to release others (but give up upon
276 <     * contention to reduce useless flailing).  The net effect is a
277 <     * tree-like diffusion of signals, where released threads (and
278 <     * possibly others) help with unparks.  To further reduce
279 <     * contention effects a bit, failed CASes to increment field
279 <     * eventCount are tolerated without retries in signalWork.
275 >     * released threads also try to release at most two others.  The
276 >     * net effect is a tree-like diffusion of signals, where released
277 >     * threads (and possibly others) help with unparks.  To further
278 >     * reduce contention effects a bit, failed CASes to increment
279 >     * field eventCount are tolerated without retries in signalWork.
280       * Conceptually they are merged into the same event, which is OK
281       * when their only purpose is to enable workers to scan for work.
282       *
283 <     * 5. Managing suspension of extra workers. When a worker is about
284 <     * to block waiting for a join (or via ManagedBlockers), we may
285 <     * create a new thread to maintain parallelism level, or at least
286 <     * avoid starvation. Usually, extra threads are needed for only
287 <     * very short periods, yet join dependencies are such that we
288 <     * sometimes need them in bursts. Rather than create new threads
289 <     * each time this happens, we suspend no-longer-needed extra ones
290 <     * as "spares". For most purposes, we don't distinguish "extra"
291 <     * spare threads from normal "core" threads: On each call to
292 <     * preStep (the only point at which we can do this) a worker
293 <     * checks to see if there are now too many running workers, and if
294 <     * so, suspends itself.  Method helpMaintainParallelism looks for
295 <     * suspended threads to resume before considering creating a new
296 <     * replacement. The spares themselves are encoded on another
297 <     * variant of a Treiber Stack, headed at field "spareWaiters".
298 <     * Note that the use of spares is intrinsically racy.  One thread
299 <     * may become a spare at about the same time as another is
300 <     * needlessly being created. We counteract this and related slop
301 <     * in part by requiring resumed spares to immediately recheck (in
302 <     * preStep) to see whether they they should re-suspend.
303 <     *
304 <     * 6. Killing off unneeded workers. The Spare and Event queues use
305 <     * similar mechanisms to shed unused workers: The oldest (first)
306 <     * waiter uses a timed rather than hard wait. When this wait times
307 <     * out without a normal wakeup, it tries to shutdown any one (for
308 <     * convenience the newest) other waiter via tryShutdownSpare or
309 <     * tryShutdownWaiter, respectively. The wakeup rates for spares
310 <     * are much shorter than for waiters. Together, they will
311 <     * eventually reduce the number of worker threads to a minimum of
312 <     * one after a long enough period without use.
283 >     * 5. Managing suspension of extra workers. When a worker notices
284 >     * (usually upon timeout of a wait()) that there are too few
285 >     * running threads, we may create a new thread to maintain
286 >     * parallelism level, or at least avoid starvation. Usually, extra
287 >     * threads are needed for only very short periods, yet join
288 >     * dependencies are such that we sometimes need them in
289 >     * bursts. Rather than create new threads each time this happens,
290 >     * we suspend no-longer-needed extra ones as "spares". For most
291 >     * purposes, we don't distinguish "extra" spare threads from
292 >     * normal "core" threads: On each call to preStep (the only point
293 >     * at which we can do this) a worker checks to see if there are
294 >     * now too many running workers, and if so, suspends itself.
295 >     * Method helpMaintainParallelism looks for suspended threads to
296 >     * resume before considering creating a new replacement. The
297 >     * spares themselves are encoded on another variant of a Treiber
298 >     * Stack, headed at field "spareWaiters".  Note that the use of
299 >     * spares is intrinsically racy.  One thread may become a spare at
300 >     * about the same time as another is needlessly being created. We
301 >     * counteract this and related slop in part by requiring resumed
302 >     * spares to immediately recheck (in preStep) to see whether they
303 >     * they should re-suspend.
304 >     *
305 >     * 6. Killing off unneeded workers. A timeout mechanism is used to
306 >     * shed unused workers: The oldest (first) event queue waiter uses
307 >     * a timed rather than hard wait. When this wait times out without
308 >     * a normal wakeup, it tries to shutdown any one (for convenience
309 >     * the newest) other spare or event waiter via
310 >     * tryShutdownUnusedWorker. This eventually reduces the number of
311 >     * worker threads to a minimum of one after a long enough period
312 >     * without use.
313       *
314       * 7. Deciding when to create new workers. The main dynamic
315       * control in this class is deciding when to create extra threads
316       * in method helpMaintainParallelism. We would like to keep
317 <     * exactly #parallelism threads running, which is an impossble
317 >     * exactly #parallelism threads running, which is an impossible
318       * task. We always need to create one when the number of running
319       * threads would become zero and all workers are busy. Beyond
320 <     * this, we must rely on heuristics that work well in the the
321 <     * presence of transients phenomena such as GC stalls, dynamic
320 >     * this, we must rely on heuristics that work well in the
321 >     * presence of transient phenomena such as GC stalls, dynamic
322       * compilation, and wake-up lags. These transients are extremely
323       * common -- we are normally trying to fully saturate the CPUs on
324       * a machine, so almost any activity other than running tasks
325 <     * impedes accuracy. Our main defense is to allow some slack in
326 <     * creation thresholds, using rules that reflect the fact that the
327 <     * more threads we have running, the more likely that we are
328 <     * underestimating the number running threads. (We also include
329 <     * some heuristic use of Thread.yield when all workers appear to
330 <     * be busy, to improve likelihood of counts settling.) The rules
331 <     * also better cope with the fact that some of the methods in this
332 <     * class tend to never become compiled (but are interpreted), so
333 <     * some components of the entire set of controls might execute 100
334 <     * times faster than others. And similarly for cases where the
335 <     * apparent lack of work is just due to GC stalls and other
336 <     * transient system activity.
325 >     * impedes accuracy. Our main defense is to allow parallelism to
326 >     * lapse for a while during joins, and use a timeout to see if,
327 >     * after the resulting settling, there is still a need for
328 >     * additional workers.  This also better copes with the fact that
329 >     * some of the methods in this class tend to never become compiled
330 >     * (but are interpreted), so some components of the entire set of
331 >     * controls might execute 100 times faster than others. And
332 >     * similarly for cases where the apparent lack of work is just due
333 >     * to GC stalls and other transient system activity.
334       *
335       * Beware that there is a lot of representation-level coupling
336       * among classes ForkJoinPool, ForkJoinWorkerThread, and
# Line 348 | Line 345 | public class ForkJoinPool extends Abstra
345       * "while ((local = field) != 0)") which are usually the simplest
346       * way to ensure the required read orderings (which are sometimes
347       * critical). Also several occurrences of the unusual "do {}
348 <     * while(!cas...)" which is the simplest way to force an update of
348 >     * while (!cas...)" which is the simplest way to force an update of
349       * a CAS'ed variable. There are also other coding oddities that
350       * help some methods perform reasonably even when interpreted (not
351       * compiled), at the expense of some messy constructions that
# Line 420 | Line 417 | public class ForkJoinPool extends Abstra
417          new AtomicInteger();
418  
419      /**
420 +     * The time to block in a join (see awaitJoin) before checking if
421 +     * a new worker should be (re)started to maintain parallelism
422 +     * level. The value should be short enough to maintain global
423 +     * responsiveness and progress but long enough to avoid
424 +     * counterproductive firings during GC stalls or unrelated system
425 +     * activity, and to not bog down systems with continual re-firings
426 +     * on GCs or legitimately long waits.
427 +     */
428 +    private static final long JOIN_TIMEOUT_MILLIS = 250L; // 4 per second
429 +
430 +    /**
431       * The wakeup interval (in nanoseconds) for the oldest worker
432 <     * worker waiting for an event invokes tryShutdownWaiter to shrink
432 >     * worker waiting for an event invokes tryShutdownUnusedWorker to shrink
433       * the number of workers.  The exact value does not matter too
434       * much, but should be long enough to slowly release resources
435       * during long periods without use without disrupting normal use.
436       */
437      private static final long SHRINK_RATE_NANOS =
438 <        60L * 1000L * 1000L * 1000L; // one minute
438 >        30L * 1000L * 1000L * 1000L; // 2 per minute
439  
440      /**
441       * Absolute bound for parallelism level. Twice this number plus
# Line 474 | Line 482 | public class ForkJoinPool extends Abstra
482      private volatile long stealCount;
483  
484      /**
485 <     * Encoded record of top of treiber stack of threads waiting for
485 >     * Encoded record of top of Treiber stack of threads waiting for
486       * events. The top 32 bits contain the count being waited for. The
487       * bottom 16 bits contains one plus the pool index of waiting
488       * worker thread. (Bits 16-31 are unused.)
# Line 493 | Line 501 | public class ForkJoinPool extends Abstra
501      private volatile int eventCount;
502  
503      /**
504 <     * Encoded record of top of treiber stack of spare threads waiting
504 >     * Encoded record of top of Treiber stack of spare threads waiting
505       * for resumption. The top 16 bits contain an arbitrary count to
506       * avoid ABA effects. The bottom 16bits contains one plus the pool
507       * index of waiting worker thread.
# Line 567 | Line 575 | public class ForkJoinPool extends Abstra
575       */
576      private final int poolNumber;
577  
570
578      // Utilities for CASing fields. Note that most of these
579      // are usually manually inlined by callers
580  
# Line 615 | Line 622 | public class ForkJoinPool extends Abstra
622      }
623  
624      /**
618     * Increments event count
619     */
620    private void advanceEventCount() {
621        int c;
622        do {} while(!UNSAFE.compareAndSwapInt(this, eventCountOffset,
623                                              c = eventCount, c+1));
624    }
625
626    /**
627     * Tries incrementing active count; fails on contention.
628     * Called by workers before executing tasks.
629     *
630     * @return true on success
631     */
632    final boolean tryIncrementActiveCount() {
633        int c;
634        return UNSAFE.compareAndSwapInt(this, runStateOffset,
635                                        c = runState, c + 1);
636    }
637
638    /**
625       * Tries decrementing active count; fails on contention.
626       * Called when workers cannot find tasks to run.
627       */
# Line 687 | Line 673 | public class ForkJoinPool extends Abstra
673      }
674  
675      /**
676 <     * Nulls out record of worker in workers array
676 >     * Nulls out record of worker in workers array.
677       */
678      private void forgetWorker(ForkJoinWorkerThread w) {
679          int idx = w.poolIndex;
680 <        // Locking helps method recordWorker avoid unecessary expansion
680 >        // Locking helps method recordWorker avoid unnecessary expansion
681          final ReentrantLock lock = this.workerLock;
682          lock.lock();
683          try {
# Line 703 | Line 689 | public class ForkJoinPool extends Abstra
689          }
690      }
691  
706    // adding and removing workers
707
708    /**
709     * Tries to create and add new worker. Assumes that worker counts
710     * are already updated to accommodate the worker, so adjusts on
711     * failure.
712     *
713     * @return the worker, or null on failure
714     */
715    private ForkJoinWorkerThread addWorker() {
716        ForkJoinWorkerThread w = null;
717        try {
718            w = factory.newThread(this);
719        } finally { // Adjust on either null or exceptional factory return
720            if (w == null) {
721                decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
722                tryTerminate(false); // in case of failure during shutdown
723            }
724        }
725        if (w != null) {
726            w.start(recordWorker(w), ueh);
727            advanceEventCount();
728        }
729        return w;
730    }
731
692      /**
693       * Final callback from terminating worker.  Removes record of
694       * worker from array, and adjusts counts. If pool is shutting
695 <     * down, tries to complete terminatation.
695 >     * down, tries to complete termination.
696       *
697       * @param w the worker
698       */
# Line 750 | Line 710 | public class ForkJoinPool extends Abstra
710       * Releases workers blocked on a count not equal to current count.
711       * Normally called after precheck that eventWaiters isn't zero to
712       * avoid wasted array checks. Gives up upon a change in count or
713 <     * contention, letting other workers take over.
713 >     * upon releasing two workers, letting others take over.
714       */
715      private void releaseEventWaiters() {
716          ForkJoinWorkerThread[] ws = workers;
717          int n = ws.length;
718          long h = eventWaiters;
719          int ec = eventCount;
720 +        boolean releasedOne = false;
721          ForkJoinWorkerThread w; int id;
722 <        while ((int)(h >>> EVENT_COUNT_SHIFT) != ec &&
723 <               (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
724 <               id < n && (w = ws[id]) != null &&
725 <               UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
726 <                                         h,  h = w.nextWaiter)) {
727 <            LockSupport.unpark(w);
728 <            if (eventWaiters != h || eventCount != ec)
722 >        while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
723 >               (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
724 >               id < n && (w = ws[id]) != null) {
725 >            if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
726 >                                          h,  w.nextWaiter)) {
727 >                LockSupport.unpark(w);
728 >                if (releasedOne) // exit on second release
729 >                    break;
730 >                releasedOne = true;
731 >            }
732 >            if (eventCount != ec)
733                  break;
734 +            h = eventWaiters;
735          }
736      }
737  
# Line 782 | Line 748 | public class ForkJoinPool extends Abstra
748  
749      /**
750       * Adds the given worker to event queue and blocks until
751 <     * terminating or event count advances from the workers
786 <     * lastEventCount value
751 >     * terminating or event count advances from the given value
752       *
753       * @param w the calling worker thread
754 +     * @param ec the count
755       */
756 <    private void eventSync(ForkJoinWorkerThread w) {
791 <        int ec = w.lastEventCount;
756 >    private void eventSync(ForkJoinWorkerThread w, int ec) {
757          long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
758          long h;
759          while ((runState < SHUTDOWN || !tryTerminate(false)) &&
# Line 808 | Line 773 | public class ForkJoinPool extends Abstra
773       * event waiter) until terminating or event count advances from
774       * the given value. The oldest (first) waiter uses a timed wait to
775       * occasionally one-by-one shrink the number of workers (to a
776 <     * minumum of one) if the pool has not been used for extended
776 >     * minimum of one) if the pool has not been used for extended
777       * periods.
778       *
779       * @param w the calling worker thread
# Line 821 | Line 786 | public class ForkJoinPool extends Abstra
786                                     (workerCounts & RUNNING_COUNT_MASK) <= 1);
787                  long startTime = untimed? 0 : System.nanoTime();
788                  Thread.interrupted();         // clear/ignore interrupt
789 <                if (eventCount != ec || !w.isRunning() ||
789 >                if (eventCount != ec || w.runState != 0 ||
790                      runState >= TERMINATING)  // recheck after clear
791                      break;
792                  if (untimed)
793                      LockSupport.park(w);
794                  else {
795                      LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
796 <                    if (eventCount != ec || !w.isRunning() ||
796 >                    if (eventCount != ec || w.runState != 0 ||
797                          runState >= TERMINATING)
798                          break;
799                      if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
800 <                        tryShutdownWaiter(ec);
800 >                        tryShutdownUnusedWorker(ec);
801                  }
802              }
803          }
804      }
805  
806 <    /**
842 <     * Callback from the oldest waiter in awaitEvent waking up after a
843 <     * period of non-use. Tries (once) to shutdown an event waiter (or
844 <     * a spare, if one exists). Note that we don't need CAS or locks
845 <     * here because the method is called only from one thread
846 <     * occasionally waking (and even misfires are OK). Note that
847 <     * until the shutdown worker fully terminates, workerCounts
848 <     * will overestimate total count, which is tolerable.
849 <     *
850 <     * @param ec the event count waited on by caller (to abort
851 <     * attempt if count has since changed).
852 <     */
853 <    private void tryShutdownWaiter(int ec) {
854 <        if (spareWaiters != 0) { // prefer killing spares
855 <            tryShutdownSpare();
856 <            return;
857 <        }
858 <        ForkJoinWorkerThread[] ws = workers;
859 <        int n = ws.length;
860 <        long h = eventWaiters;
861 <        ForkJoinWorkerThread w; int id; long nh;
862 <        if (runState == 0 &&
863 <            submissionQueue.isEmpty() &&
864 <            eventCount == ec &&
865 <            (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
866 <            id < n && (w = ws[id]) != null &&
867 <            (nh = w.nextWaiter) != 0L && // keep at least one worker
868 <            UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) {
869 <            w.shutdown();
870 <            LockSupport.unpark(w);
871 <        }
872 <        releaseEventWaiters();
873 <    }
874 <
875 <    // Maintaining spares
806 >    // Maintaining parallelism
807  
808      /**
809       * Pushes worker onto the spare stack
# Line 884 | Line 815 | public class ForkJoinPool extends Abstra
815      }
816  
817      /**
818 <     * Callback from oldest spare occasionally waking up.  Tries
819 <     * (once) to shutdown a spare. Same idea as tryShutdownWaiter.
818 >     * Tries (once) to resume a spare if the number of running
819 >     * threads is less than target.
820       */
821 <    final void tryShutdownSpare() {
821 >    private void tryResumeSpare() {
822          int sw, id;
892        ForkJoinWorkerThread w;
893        ForkJoinWorkerThread[] ws;
894        if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
895            id < (ws = workers).length && (w = ws[id]) != null &&
896            (workerCounts & RUNNING_COUNT_MASK) >= parallelism &&
897            UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
898                                     sw, w.nextSpare)) {
899            w.shutdown();
900            LockSupport.unpark(w);
901            advanceEventCount();
902        }
903    }
904
905    /**
906     * Tries (once) to resume a spare if worker counts match
907     * the given count.
908     *
909     * @param wc workerCounts value on invocation of this method
910     */
911    private void tryResumeSpare(int wc) {
823          ForkJoinWorkerThread[] ws = workers;
824          int n = ws.length;
825 <        int sw, id, rs;  ForkJoinWorkerThread w;
826 <        if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
825 >        ForkJoinWorkerThread w;
826 >        if ((sw = spareWaiters) != 0 &&
827 >            (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
828              id < n && (w = ws[id]) != null &&
829 <            (rs = runState) < TERMINATING &&
830 <            eventWaiters == 0L && workerCounts == wc) {
831 <            // In case all workers busy, heuristically back off to let settle
832 <            Thread.yield();
833 <            if (eventWaiters == 0L && runState == rs && // recheck
834 <                workerCounts == wc && spareWaiters == sw &&
835 <                UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
836 <                                         sw, w.nextSpare)) {
837 <                int c;              // increment running count before resume
838 <                do {} while(!UNSAFE.compareAndSwapInt
839 <                            (this, workerCountsOffset,
840 <                             c = workerCounts, c + ONE_RUNNING));
929 <                if (w.tryUnsuspend())
930 <                    LockSupport.unpark(w);
931 <                else               // back out if w was shutdown
932 <                    decrementWorkerCounts(ONE_RUNNING, 0);
933 <            }
829 >            (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
830 >            spareWaiters == sw &&
831 >            UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
832 >                                     sw, w.nextSpare)) {
833 >            int c; // increment running count before resume
834 >            do {} while (!UNSAFE.compareAndSwapInt
835 >                         (this, workerCountsOffset,
836 >                          c = workerCounts, c + ONE_RUNNING));
837 >            if (w.tryUnsuspend())
838 >                LockSupport.unpark(w);
839 >            else   // back out if w was shutdown
840 >                decrementWorkerCounts(ONE_RUNNING, 0);
841          }
842      }
843  
937    // adding workers on demand
938
844      /**
845 <     * Adds one or more workers if needed to establish target parallelism.
846 <     * Retries upon contention.
845 >     * Tries to increase the number of running workers if below target
846 >     * parallelism: If a spare exists tries to resume it via
847 >     * tryResumeSpare.  Otherwise, if not enough total workers or all
848 >     * existing workers are busy, adds a new worker. In all cases also
849 >     * helps wake up releasable workers waiting for work.
850       */
851 <    private void addWorkerIfBelowTarget() {
851 >    private void helpMaintainParallelism() {
852          int pc = parallelism;
853 <        int wc;
854 <        while (((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < pc &&
855 <               runState < TERMINATING) {
856 <            if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
857 <                                         wc + (ONE_RUNNING|ONE_TOTAL))) {
858 <                if (addWorker() == null)
853 >        int wc, rs, tc;
854 >        while (((wc = workerCounts) & RUNNING_COUNT_MASK) < pc &&
855 >               (rs = runState) < TERMINATING) {
856 >            if (spareWaiters != 0)
857 >                tryResumeSpare();
858 >            else if ((tc = wc >>> TOTAL_COUNT_SHIFT) >= MAX_WORKERS ||
859 >                     (tc >= pc && (rs & ACTIVE_COUNT_MASK) != tc))
860 >                break;   // enough total
861 >            else if (runState == rs && workerCounts == wc &&
862 >                     UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
863 >                                              wc + (ONE_RUNNING|ONE_TOTAL))) {
864 >                ForkJoinWorkerThread w = null;
865 >                try {
866 >                    w = factory.newThread(this);
867 >                } finally { // adjust on null or exceptional factory return
868 >                    if (w == null) {
869 >                        decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
870 >                        tryTerminate(false); // handle failure during shutdown
871 >                    }
872 >                }
873 >                if (w == null)
874                      break;
875 +                w.start(recordWorker(w), ueh);
876 +                if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
877 +                    int c; // advance event count
878 +                    UNSAFE.compareAndSwapInt(this, eventCountOffset,
879 +                                             c = eventCount, c+1);
880 +                    break; // add at most one unless total below target
881 +                }
882              }
883          }
884 +        if (eventWaiters != 0L)
885 +            releaseEventWaiters();
886      }
887  
888      /**
889 <     * Tries (once) to add a new worker if all existing workers are
890 <     * busy, and there are either no running workers or the deficit is
891 <     * at least twice the surplus.
892 <     *
893 <     * @param wc workerCounts value on invocation of this method
894 <     */
895 <    private void tryAddWorkerIfBusy(int wc) {
964 <        int tc, rc, rs;
965 <        int pc = parallelism;
966 <        if ((tc = wc >>> TOTAL_COUNT_SHIFT) < MAX_WORKERS &&
967 <            ((rc = wc & RUNNING_COUNT_MASK) == 0 ||
968 <             rc < pc - ((tc - pc) << 1)) &&
969 <            (rs = runState) < TERMINATING &&
970 <            (rs & ACTIVE_COUNT_MASK) == tc) {
971 <            // Since all workers busy, heuristically back off to let settle
972 <            Thread.yield();
973 <            if (eventWaiters == 0L && spareWaiters == 0 && // recheck
974 <                runState == rs && workerCounts == wc &&
975 <                UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
976 <                                         wc + (ONE_RUNNING|ONE_TOTAL)))
977 <                addWorker();
978 <        }
979 <    }
980 <
981 <    /**
982 <     * Does at most one of:
983 <     *
984 <     * 1. Help wake up existing workers waiting for work via
985 <     *    releaseEventWaiters. (If any exist, then it doesn't
986 <     *    matter right now if under target parallelism level.)
987 <     *
988 <     * 2. If a spare exists, try (once) to resume it via tryResumeSpare.
989 <     *
990 <     * 3. If there are not enough total workers, add some
991 <     *    via addWorkerIfBelowTarget;
889 >     * Callback from the oldest waiter in awaitEvent waking up after a
890 >     * period of non-use. If all workers are idle, tries (once) to
891 >     * shutdown an event waiter or a spare, if one exists. Note that
892 >     * we don't need CAS or locks here because the method is called
893 >     * only from one thread occasionally waking (and even misfires are
894 >     * OK). Note that until the shutdown worker fully terminates,
895 >     * workerCounts will overestimate total count, which is tolerable.
896       *
897 <     * 4. Try (once) to add a new worker if all existing workers
898 <     *     are busy, via tryAddWorkerIfBusy
897 >     * @param ec the event count waited on by caller (to abort
898 >     * attempt if count has since changed).
899       */
900 <    private void helpMaintainParallelism() {
901 <        long h; int pc, wc;
902 <        if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0) {
903 <            if ((int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
904 <                releaseEventWaiters(); // avoid useless call
905 <        }
906 <        else if ((pc = parallelism) >
907 <                 ((wc = workerCounts) & RUNNING_COUNT_MASK)) {
908 <            if (spareWaiters != 0)
909 <                tryResumeSpare(wc);
910 <            else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
911 <                addWorkerIfBelowTarget();
912 <            else
913 <                tryAddWorkerIfBusy(wc);
900 >    private void tryShutdownUnusedWorker(int ec) {
901 >        if (runState == 0 && eventCount == ec) { // only trigger if all idle
902 >            ForkJoinWorkerThread[] ws = workers;
903 >            int n = ws.length;
904 >            ForkJoinWorkerThread w = null;
905 >            boolean shutdown = false;
906 >            int sw;
907 >            long h;
908 >            if ((sw = spareWaiters) != 0) { // prefer killing spares
909 >                int id = (sw & SPARE_ID_MASK) - 1;
910 >                if (id >= 0 && id < n && (w = ws[id]) != null &&
911 >                    UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
912 >                                             sw, w.nextSpare))
913 >                    shutdown = true;
914 >            }
915 >            else if ((h = eventWaiters) != 0L) {
916 >                long nh;
917 >                int id = ((int)(h & WAITER_ID_MASK)) - 1;
918 >                if (id >= 0 && id < n && (w = ws[id]) != null &&
919 >                    (nh = w.nextWaiter) != 0L && // keep at least one worker
920 >                    UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
921 >                    shutdown = true;
922 >            }
923 >            if (w != null && shutdown) {
924 >                w.shutdown();
925 >                LockSupport.unpark(w);
926 >            }
927          }
928 +        releaseEventWaiters(); // in case of interference
929      }
930  
931      /**
# Line 1015 | Line 933 | public class ForkJoinPool extends Abstra
933       * stealing a task or taking a submission and running it).
934       * Performs one or more of the following:
935       *
936 <     * 1. If the worker is active, try to set its active status to
937 <     *    inactive and update activeCount. On contention, we may try
938 <     *    again on this or subsequent call.
939 <     *
940 <     * 2. Release any existing event waiters that are now relesable
941 <     *
942 <     * 3. If there are too many running threads, suspend this worker
943 <     *    (first forcing inactive if necessary).  If it is not
944 <     *    needed, it may be killed while suspended via
945 <     *    tryShutdownSpare. Otherwise, upon resume it rechecks to make
946 <     *    sure that it is still needed.
947 <     *
948 <     * 4. If more than 1 miss, await the next task event via
949 <     *    eventSync (first forcing inactivation if necessary), upon
950 <     *    which worker may also be killed, via tryShutdownWaiter.
951 <     *
952 <     * 5. Help reactivate other workers via helpMaintainParallelism
936 >     * 1. If the worker is active and either did not run a task
937 >     *    or there are too many workers, try to set its active status
938 >     *    to inactive and update activeCount. On contention, we may
939 >     *    try again in this or a subsequent call.
940 >     *
941 >     * 2. If not enough total workers, help create some.
942 >     *
943 >     * 3. If there are too many running workers, suspend this worker
944 >     *    (first forcing inactive if necessary).  If it is not needed,
945 >     *    it may be shutdown while suspended (via
946 >     *    tryShutdownUnusedWorker).  Otherwise, upon resume it
947 >     *    rechecks running thread count and need for event sync.
948 >     *
949 >     * 4. If worker did not run a task, await the next task event via
950 >     *    eventSync if necessary (first forcing inactivation), upon
951 >     *    which the worker may be shutdown via
952 >     *    tryShutdownUnusedWorker.  Otherwise, help release any
953 >     *    existing event waiters that are now releasable,
954       *
955       * @param w the worker
956 <     * @param misses the number of scans by caller failing to find work
1038 <     * (saturating at 2 to avoid wraparound)
956 >     * @param ran true if worker ran a task since last call to this method
957       */
958 <    final void preStep(ForkJoinWorkerThread w, int misses) {
958 >    final void preStep(ForkJoinWorkerThread w, boolean ran) {
959 >        int wec = w.lastEventCount;
960          boolean active = w.active;
961 +        boolean inactivate = false;
962          int pc = parallelism;
963 <        for (;;) {
964 <            int rs, wc, rc, ec; long h;
965 <            if (active && UNSAFE.compareAndSwapInt(this, runStateOffset,
966 <                                                   rs = runState, rs - 1))
967 <                active = w.active = false;
968 <            if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0 &&
969 <                (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) {
970 <                releaseEventWaiters();
1051 <                if (misses > 1)
1052 <                    continue;                  // clear before sync below
1053 <            }
1054 <            if ((rc = ((wc = workerCounts) & RUNNING_COUNT_MASK)) > pc) {
1055 <                if (!active &&                 // must inactivate to suspend
963 >        int rs;
964 >        while (w.runState == 0 && (rs = runState) < TERMINATING) {
965 >            if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
966 >                UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
967 >                inactivate = active = w.active = false;
968 >            int wc = workerCounts;
969 >            if ((wc & RUNNING_COUNT_MASK) > pc) {
970 >                if (!(inactivate |= active) && // must inactivate to suspend
971                      workerCounts == wc &&      // try to suspend as spare
972                      UNSAFE.compareAndSwapInt(this, workerCountsOffset,
973 <                                             wc, wc - ONE_RUNNING)) {
973 >                                             wc, wc - ONE_RUNNING))
974                      w.suspendAsSpare();
1060                    if (!w.isRunning())
1061                        break;                 // was killed while spare
1062                }
1063                continue;
975              }
976 <            if (misses > 0) {
977 <                if ((ec = eventCount) == w.lastEventCount && misses > 1) {
978 <                    if (!active) {             // must inactivate to sync
979 <                        eventSync(w);
980 <                        if (w.isRunning())
981 <                            misses = 1;        // don't re-sync
982 <                        else
983 <                            break;             // was killed while waiting
984 <                    }
985 <                    continue;
976 >            else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
977 >                helpMaintainParallelism();     // not enough workers
978 >            else if (!ran) {
979 >                long h = eventWaiters;
980 >                int ec = eventCount;
981 >                if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
982 >                    releaseEventWaiters();     // release others before waiting
983 >                else if (ec != wec) {
984 >                    w.lastEventCount = ec;     // no need to wait
985 >                    break;
986                  }
987 <                w.lastEventCount = ec;
987 >                else if (!(inactivate |= active))
988 >                    eventSync(w, wec);         // must inactivate before sync
989              }
990 <            if (rc < pc)
991 <                helpMaintainParallelism();
1080 <            break;
990 >            else
991 >                break;
992          }
993      }
994  
995      /**
996       * Helps and/or blocks awaiting join of the given task.
997 <     * Alternates between helpJoinTask() and helpMaintainParallelism()
1087 <     * as many times as there is a deficit in running count (or longer
1088 <     * if running count would become zero), then blocks if task still
1089 <     * not done.
997 >     * See above for explanation.
998       *
999       * @param joinMe the task to join
1000 +     * @param worker the current worker thread
1001       */
1002      final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
1003 <        int threshold = parallelism;         // descend blocking thresholds
1003 >        int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
1004          while (joinMe.status >= 0) {
1005 <            boolean block; int wc;
1005 >            int wc;
1006              worker.helpJoinTask(joinMe);
1007              if (joinMe.status < 0)
1008                  break;
1009 <            if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
1010 <                if (threshold > 0)
1011 <                    --threshold;
1012 <                else
1013 <                    advanceEventCount(); // force release
1014 <                block = false;
1015 <            }
1016 <            else
1017 <                block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1018 <                                                 wc, wc - ONE_RUNNING);
1019 <            helpMaintainParallelism();
1020 <            if (block) {
1021 <                int c;
1022 <                joinMe.internalAwaitDone();
1009 >            else if (retries > 0)
1010 >                --retries;
1011 >            else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
1012 >                     UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1013 >                                              wc, wc - ONE_RUNNING)) {
1014 >                int stat, c; long h;
1015 >                while ((stat = joinMe.status) >= 0 &&
1016 >                       (h = eventWaiters) != 0L && // help release others
1017 >                       (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1018 >                    releaseEventWaiters();
1019 >                if (stat >= 0 &&
1020 >                    ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
1021 >                     (stat =
1022 >                      joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
1023 >                    helpMaintainParallelism(); // timeout or no running workers
1024                  do {} while (!UNSAFE.compareAndSwapInt
1025                               (this, workerCountsOffset,
1026                                c = workerCounts, c + ONE_RUNNING));
1027 <                break;
1027 >                if (stat < 0)
1028 >                    break;   // else restart
1029              }
1030          }
1031      }
1032  
1033      /**
1034 <     * Same idea as awaitJoin, but no helping
1034 >     * Same idea as awaitJoin, but no helping, retries, or timeouts.
1035       */
1036      final void awaitBlocker(ManagedBlocker blocker)
1037          throws InterruptedException {
1127        int threshold = parallelism;
1038          while (!blocker.isReleasable()) {
1039 <            boolean block; int wc;
1040 <            if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
1041 <                if (threshold > 0)
1042 <                    --threshold;
1133 <                else
1134 <                    advanceEventCount();
1135 <                block = false;
1136 <            }
1137 <            else
1138 <                block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1139 <                                                 wc, wc - ONE_RUNNING);
1140 <            helpMaintainParallelism();
1141 <            if (block) {
1039 >            int wc = workerCounts;
1040 >            if ((wc & RUNNING_COUNT_MASK) != 0 &&
1041 >                UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1042 >                                         wc, wc - ONE_RUNNING)) {
1043                  try {
1044 <                    do {} while (!blocker.isReleasable() && !blocker.block());
1044 >                    while (!blocker.isReleasable()) {
1045 >                        long h = eventWaiters;
1046 >                        if (h != 0L &&
1047 >                            (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1048 >                            releaseEventWaiters();
1049 >                        else if ((workerCounts & RUNNING_COUNT_MASK) == 0 &&
1050 >                                 runState < TERMINATING)
1051 >                            helpMaintainParallelism();
1052 >                        else if (blocker.block())
1053 >                            break;
1054 >                    }
1055                  } finally {
1056                      int c;
1057                      do {} while (!UNSAFE.compareAndSwapInt
# Line 1190 | Line 1101 | public class ForkJoinPool extends Abstra
1101      private void startTerminating() {
1102          cancelSubmissions();
1103          for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
1104 <            advanceEventCount();
1104 >            int c; // advance event count
1105 >            UNSAFE.compareAndSwapInt(this, eventCountOffset,
1106 >                                     c = eventCount, c+1);
1107              eventWaiters = 0L; // clobber lists
1108              spareWaiters = 0;
1109 <            ForkJoinWorkerThread[] ws = workers;
1197 <            int n = ws.length;
1198 <            for (int i = 0; i < n; ++i) {
1199 <                ForkJoinWorkerThread w = ws[i];
1109 >            for (ForkJoinWorkerThread w : workers) {
1110                  if (w != null) {
1111                      w.shutdown();
1112                      if (passes > 0 && !w.isTerminated()) {
# Line 1260 | Line 1170 | public class ForkJoinPool extends Abstra
1170       */
1171      final int idlePerActive() {
1172          int pc = parallelism; // use parallelism, not rc
1173 <        int ac = runState;    // no mask -- artifically boosts during shutdown
1173 >        int ac = runState;    // no mask -- artificially boosts during shutdown
1174          // Use exact results for small values, saturate past 4
1175          return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3;
1176      }
# Line 1353 | Line 1263 | public class ForkJoinPool extends Abstra
1263       * @param pc the initial parallelism level
1264       */
1265      private static int initialArraySizeFor(int pc) {
1266 <        // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1266 >        // If possible, initially allocate enough space for one spare
1267          int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
1268 +        // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1269          size |= size >>> 1;
1270          size |= size >>> 2;
1271          size |= size >>> 4;
# Line 1373 | Line 1284 | public class ForkJoinPool extends Abstra
1284          if (runState >= SHUTDOWN)
1285              throw new RejectedExecutionException();
1286          submissionQueue.offer(task);
1287 <        advanceEventCount();
1288 <        if (eventWaiters != 0L)
1289 <            releaseEventWaiters();
1379 <        if ((workerCounts >>> TOTAL_COUNT_SHIFT) < parallelism)
1380 <            addWorkerIfBelowTarget();
1287 >        int c; // try to increment event count -- CAS failure OK
1288 >        UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
1289 >        helpMaintainParallelism(); // create, start, or resume some workers
1290      }
1291  
1292      /**
# Line 1614 | Line 1523 | public class ForkJoinPool extends Abstra
1523       */
1524      public long getQueuedTaskCount() {
1525          long count = 0;
1526 <        ForkJoinWorkerThread[] ws = workers;
1618 <        int n = ws.length;
1619 <        for (int i = 0; i < n; ++i) {
1620 <            ForkJoinWorkerThread w = ws[i];
1526 >        for (ForkJoinWorkerThread w : workers)
1527              if (w != null)
1528                  count += w.getQueueSize();
1623        }
1529          return count;
1530      }
1531  
# Line 1675 | Line 1580 | public class ForkJoinPool extends Abstra
1580       */
1581      protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1582          int count = submissionQueue.drainTo(c);
1583 <        ForkJoinWorkerThread[] ws = workers;
1679 <        int n = ws.length;
1680 <        for (int i = 0; i < n; ++i) {
1681 <            ForkJoinWorkerThread w = ws[i];
1583 >        for (ForkJoinWorkerThread w : workers)
1584              if (w != null)
1585                  count += w.drainTasksTo(c);
1684        }
1586          return count;
1587      }
1588  
# Line 1808 | Line 1709 | public class ForkJoinPool extends Abstra
1709          throws InterruptedException {
1710          try {
1711              return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
1712 <        } catch(TimeoutException ex) {
1712 >        } catch (TimeoutException ex) {
1713              return false;
1714          }
1715      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines