ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.17 by dl, Thu May 27 16:47:21 2010 UTC vs.
Revision 1.18 by dl, Wed Jul 7 20:41:24 2010 UTC

# Line 19 | Line 19 | import java.util.concurrent.CountDownLat
19   /**
20   * An {@link ExecutorService} for running {@link ForkJoinTask}s.
21   * A {@code ForkJoinPool} provides the entry point for submissions
22 < * from non-{@code ForkJoinTask}s, as well as management and
22 > * from non-{@code ForkJoinTask} clients, as well as management and
23   * monitoring operations.
24   *
25   * <p>A {@code ForkJoinPool} differs from other kinds of {@link
# Line 28 | Line 28 | import java.util.concurrent.CountDownLat
28   * execute subtasks created by other active tasks (eventually blocking
29   * waiting for work if none exist). This enables efficient processing
30   * when most tasks spawn other subtasks (as do most {@code
31 < * ForkJoinTask}s). A {@code ForkJoinPool} may also be used for mixed
32 < * execution of some plain {@code Runnable}- or {@code Callable}-
33 < * based activities along with {@code ForkJoinTask}s. When setting
34 < * {@linkplain #setAsyncMode async mode}, a {@code ForkJoinPool} may
35 < * also be appropriate for use with fine-grained tasks of any form
36 < * that are never joined. Otherwise, other {@code ExecutorService}
37 < * implementations are typically more appropriate choices.
31 > * ForkJoinTask}s). When setting <em>asyncMode</em> to true in
32 > * constructors, {@code ForkJoinPool}s may also be appropriate for use
33 > * with event-style tasks that are never joined.
34   *
35   * <p>A {@code ForkJoinPool} is constructed with a given target
36   * parallelism level; by default, equal to the number of available
37 < * processors. Unless configured otherwise via {@link
38 < * #setMaintainsParallelism}, the pool attempts to maintain this
39 < * number of active (or available) threads by dynamically adding,
40 < * suspending, or resuming internal worker threads, even if some tasks
41 < * are stalled waiting to join others. However, no such adjustments
42 < * are performed in the face of blocked IO or other unmanaged
43 < * synchronization. The nested {@link ManagedBlocker} interface
48 < * enables extension of the kinds of synchronization accommodated.
49 < * The target parallelism level may also be changed dynamically
50 < * ({@link #setParallelism}). The total number of threads may be
51 < * limited using method {@link #setMaximumPoolSize}, in which case it
52 < * may become possible for the activities of a pool to stall due to
53 < * the lack of available threads to process new tasks. When the pool
54 < * is executing tasks, these and other configuration setting methods
55 < * may only gradually affect actual pool sizes. It is normally best
56 < * practice to invoke these methods only when the pool is known to be
57 < * quiescent.
37 > * processors. The pool attempts to maintain enough active (or
38 > * available) threads by dynamically adding, suspending, or resuming
39 > * internal worker threads, even if some tasks are stalled waiting to
40 > * join others. However, no such adjustments are guaranteed in the
41 > * face of blocked IO or other unmanaged synchronization. The nested
42 > * {@link ManagedBlocker} interface enables extension of the kinds of
43 > * synchronization accommodated.
44   *
45   * <p>In addition to execution and lifecycle control methods, this
46   * class provides status check methods (for example
# Line 63 | Line 49 | import java.util.concurrent.CountDownLat
49   * {@link #toString} returns indications of pool state in a
50   * convenient form for informal monitoring.
51   *
52 + * <p> As is the case with other ExecutorServices, there are three
53 + * main task execution methods summarized in the follwoing
54 + * table. These are designed to be used by clients not already engaged
55 + * in fork/join computations in the current pool.  The main forms of
56 + * these methods accept instances of {@code ForkJoinTask}, but
57 + * overloaded forms also allow mixed execution of plain {@code
58 + * Runnable}- or {@code Callable}- based activities as well.  However,
59 + * tasks that are already executing in a pool should normally
60 + * <em>NOT</em> use these pool execution methods, but instead use the
61 + * within-computation forms listed in the table. To avoid inadvertant
62 + * cyclic task dependencies and to improve performance, task
63 + * submissions to the current pool by an ongoing fork/join
64 + * computations may be implicitly translated to the corresponding
65 + * ForkJoinTask forms.
66 + *
67 + * <table BORDER CELLPADDING=3 CELLSPACING=1>
68 + *  <tr>
69 + *    <td></td>
70 + *    <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
71 + *    <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
72 + *  </tr>
73 + *  <tr>
74 + *    <td> <b>Arange async execution</td>
75 + *    <td> {@link #execute(ForkJoinTask)}</td>
76 + *    <td> {@link ForkJoinTask#fork}</td>
77 + *  </tr>
78 + *  <tr>
79 + *    <td> <b>Await and obtain result</td>
80 + *    <td> {@link #invoke(ForkJoinTask)}</td>
81 + *    <td> {@link ForkJoinTask#invoke}</td>
82 + *  </tr>
83 + *  <tr>
84 + *    <td> <b>Arrange exec and obtain Future</td>
85 + *    <td> {@link #submit(ForkJoinTask)}</td>
86 + *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
87 + *  </tr>
88 + * </table>
89 + *
90   * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
91   * used for all parallel task execution in a program or subsystem.
92   * Otherwise, use would not usually outweigh the construction and
# Line 145 | Line 169 | public class ForkJoinPool extends Abstra
169       * work with other policies.
170       *
171       * 2. Bookkeeping for dynamically adding and removing workers. We
172 <     * maintain a given level of parallelism (or, if
173 <     * maintainsParallelism is false, at least avoid starvation). When
150 <     * some workers are known to be blocked (on joins or via
172 >     * aim to approximately maintain the given level of parallelism.
173 >     * When some workers are known to be blocked (on joins or via
174       * ManagedBlocker), we may create or resume others to take their
175       * place until they unblock (see below). Implementing this
176       * requires counts of the number of "running" threads (i.e., those
# Line 165 | Line 188 | public class ForkJoinPool extends Abstra
188       * threads Updates to the workerCounts field sometimes transiently
189       * encounter a fair amount of contention when join dependencies
190       * are such that many threads block or unblock at about the same
191 <     * time. We alleviate this by sometimes bundling updates (for
192 <     * example blocking one thread on join and resuming a spare cancel
170 <     * each other out), and in most other cases performing an
171 <     * alternative action like releasing waiters or locating spares.
191 >     * time. We alleviate this by sometimes performing an alternative
192 >     * action on contention like releasing waiters or locating spares.
193       *
194       * 3. Maintaining global run state. The run state of the pool
195       * consists of a runLevel (SHUTDOWN, TERMINATING, etc) similar to
# Line 256 | Line 277 | public class ForkJoinPool extends Abstra
277       *
278       * 6. Deciding when to create new workers. The main dynamic
279       * control in this class is deciding when to create extra threads,
280 <     * in methods awaitJoin and awaitBlocker. We always
281 <     * need to create one when the number of running threads becomes
282 <     * zero. But because blocked joins are typically dependent, we
283 <     * don't necessarily need or want one-to-one replacement. Using a
284 <     * one-to-one compensation rule often leads to enough useless
285 <     * overhead creating, suspending, resuming, and/or killing threads
286 <     * to signficantly degrade throughput.  We use a rule reflecting
287 <     * the idea that, the more spare threads you already have, the
288 <     * more evidence you need to create another one. The "evidence"
289 <     * here takes two forms: (1) Using a creation threshold expressed
290 <     * in terms of the current deficit -- target minus running
291 <     * threads. To reduce flickering and drift around target values,
292 <     * the relation is quadratic: adding a spare if (dc*dc)>=(sc*pc)
293 <     * (where dc is deficit, sc is number of spare threads and pc is
294 <     * target parallelism.)  (2) Using a form of adaptive
295 <     * spionning. requiring a number of threshold checks proportional
296 <     * to the number of spare threads.  This effectively reduces churn
276 <     * at the price of systematically undershooting target parallelism
277 <     * when many threads are blocked.  However, biasing toward
278 <     * undeshooting partially compensates for the above mechanics to
279 <     * suspend extra threads, that normally lead to overshoot because
280 <     * we can only suspend workers in-between top-level actions. It
281 <     * also better copes with the fact that some of the methods in
282 <     * this class tend to never become compiled (but are interpreted),
283 <     * so some components of the entire set of controls might execute
284 <     * many times faster than others. And similarly for cases where
285 <     * the apparent lack of work is just due to GC stalls and other
286 <     * transient system activity.
287 <     *
288 <     * 7. Maintaining other configuration parameters and monitoring
289 <     * statistics. Updates to fields controlling parallelism level,
290 <     * max size, etc can only meaningfully take effect for individual
291 <     * threads upon their next top-level actions; i.e., between
292 <     * stealing/running tasks/submission, which are separated by calls
293 <     * to preStep.  Memory ordering for these (assumed infrequent)
294 <     * reconfiguration calls is ensured by using reads and writes to
295 <     * volatile field workerCounts (that must be read in preStep anyway)
296 <     * as "fences" -- user-level reads are preceded by reads of
297 <     * workCounts, and writes are followed by no-op CAS to
298 <     * workerCounts. The values reported by other management and
299 <     * monitoring methods are either computed on demand, or are kept
300 <     * in fields that are only updated when threads are otherwise
301 <     * idle.
280 >     * in methods awaitJoin and awaitBlocker. We always need to create
281 >     * one when the number of running threads becomes zero. But
282 >     * because blocked joins are typically dependent, we don't
283 >     * necessarily need or want one-to-one replacement. Instead, we
284 >     * use a combination of heuristics that adds threads only when the
285 >     * pool appears to be approaching starvation.  These effectively
286 >     * reduce churn at the price of systematically undershooting
287 >     * target parallelism when many threads are blocked.  However,
288 >     * biasing toward undeshooting partially compensates for the above
289 >     * mechanics to suspend extra threads, that normally lead to
290 >     * overshoot because we can only suspend workers in-between
291 >     * top-level actions. It also better copes with the fact that some
292 >     * of the methods in this class tend to never become compiled (but
293 >     * are interpreted), so some components of the entire set of
294 >     * controls might execute many times faster than others. And
295 >     * similarly for cases where the apparent lack of work is just due
296 >     * to GC stalls and other transient system activity.
297       *
298       * Beware that there is a lot of representation-level coupling
299       * among classes ForkJoinPool, ForkJoinWorkerThread, and
# Line 344 | Line 339 | public class ForkJoinPool extends Abstra
339       * Default ForkJoinWorkerThreadFactory implementation; creates a
340       * new ForkJoinWorkerThread.
341       */
342 <    static class  DefaultForkJoinWorkerThreadFactory
342 >    static class DefaultForkJoinWorkerThreadFactory
343          implements ForkJoinWorkerThreadFactory {
344          public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
345              return new ForkJoinWorkerThread(pool);
# Line 412 | Line 407 | public class ForkJoinPool extends Abstra
407      /**
408       * Latch released upon termination.
409       */
410 <    private final CountDownLatch terminationLatch;
410 >    private final Phaser termination;
411  
412      /**
413       * Creation factory for worker threads.
# Line 483 | Line 478 | public class ForkJoinPool extends Abstra
478      private static final int ONE_RUNNING        = 1;
479      private static final int ONE_TOTAL          = 1 << TOTAL_COUNT_SHIFT;
480  
486    /*
487     * Fields parallelism. maxPoolSize, and maintainsParallelism are
488     * non-volatile, but external reads/writes use workerCount fences
489     * to ensure visability.
490     */
491
481      /**
482       * The target parallelism level.
483 +     * Accessed directly by ForkJoinWorkerThreads.
484       */
485 <    private int parallelism;
496 <
497 <    /**
498 <     * The maximum allowed pool size.
499 <     */
500 <    private int maxPoolSize;
485 >    final int parallelism;
486  
487      /**
488       * True if use local fifo, not default lifo, for local polling
489 <     * Replicated by ForkJoinWorkerThreads
489 >     * Read by, and replicated by ForkJoinWorkerThreads
490       */
491 <    private volatile boolean locallyFifo;
491 >    final boolean locallyFifo;
492  
493      /**
494 <     * Controls whether to add spares to maintain parallelism
494 >     * The uncaught exception handler used when any worker abruptly
495 >     * terminates.
496       */
497 <    private boolean maintainsParallelism;
512 <
513 <    /**
514 <     * The uncaught exception handler used when any worker
515 <     * abruptly terminates
516 <     */
517 <    private volatile Thread.UncaughtExceptionHandler ueh;
497 >    private final Thread.UncaughtExceptionHandler ueh;
498  
499      /**
500       * Pool number, just for assigning useful names to worker threads
# Line 524 | Line 504 | public class ForkJoinPool extends Abstra
504      // utilities for updating fields
505  
506      /**
507 <     * Adds delta to running count.  Used mainly by ForkJoinTask.
507 >     * Increments running count.  Also used by ForkJoinTask.
508       */
509 <    final void updateRunningCount(int delta) {
510 <        int wc;
509 >    final void incrementRunningCount() {
510 >        int c;
511          do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
512 <                                               wc = workerCounts,
513 <                                               wc + delta));
512 >                                               c = workerCounts,
513 >                                               c + ONE_RUNNING));
514      }
515 <
515 >    
516      /**
517 <     * Decrements running count unless already zero
517 >     * Tries to decrement running count unless already zero
518       */
519      final boolean tryDecrementRunningCount() {
520          int wc = workerCounts;
# Line 545 | Line 525 | public class ForkJoinPool extends Abstra
525      }
526  
527      /**
548     * Write fence for user modifications of pool parameters
549     * (parallelism. etc).  Note that it doesn't matter if CAS fails.
550     */
551    private void workerCountWriteFence() {
552        int wc;
553        UNSAFE.compareAndSwapInt(this, workerCountsOffset,
554                                 wc = workerCounts, wc);
555    }
556
557    /**
558     * Read fence for external reads of pool parameters
559     * (parallelism. maxPoolSize, etc).
560     */
561    private void workerCountReadFence() {
562        int ignore = workerCounts;
563    }
564
565    /**
528       * Tries incrementing active count; fails on contention.
529       * Called by workers before executing tasks.
530       *
# Line 661 | Line 623 | public class ForkJoinPool extends Abstra
623                  return null;
624              }
625          }
626 <        w.start(recordWorker(w), locallyFifo, ueh);
626 >        w.start(recordWorker(w), ueh);
627          return w;
628      }
629  
# Line 731 | Line 693 | public class ForkJoinPool extends Abstra
693      // Waiting for and signalling events
694  
695      /**
734     * Ensures eventCount on exit is different (mod 2^32) than on
735     * entry.  CAS failures are OK -- any change in count suffices.
736     */
737    private void advanceEventCount() {
738        int c;
739        UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
740    }
741
742    /**
696       * Releases workers blocked on a count not equal to current count.
697       */
698 <    final void releaseWaiters() {
698 >    private void releaseWaiters() {
699          long top;
700          int id;
701          while ((id = (int)((top = eventWaiters) & WAITER_INDEX_MASK)) > 0 &&
# Line 757 | Line 710 | public class ForkJoinPool extends Abstra
710      }
711  
712      /**
713 +     * Ensures eventCount on exit is different (mod 2^32) than on
714 +     * entry and wakes up all waiters
715 +     */
716 +    private void signalEvent() {
717 +        int c;
718 +        do {} while (!UNSAFE.compareAndSwapInt(this, eventCountOffset,
719 +                                               c = eventCount, c+1));
720 +        releaseWaiters();
721 +    }
722 +
723 +    /**
724       * Advances eventCount and releases waiters until interference by
725       * other releasing threads is detected.
726       */
727      final void signalWork() {
728 +        // EventCount CAS failures are OK -- any change in count suffices.
729          int ec;
730          UNSAFE.compareAndSwapInt(this, eventCountOffset, ec=eventCount, ec+1);
731          outer:for (;;) {
# Line 839 | Line 804 | public class ForkJoinPool extends Abstra
804          boolean inactivate = !worked & active;
805          for (;;) {
806              if (inactivate) {
807 <                int c = runState;
807 >                int rs = runState;
808                  if (UNSAFE.compareAndSwapInt(this, runStateOffset,
809 <                                             c, c - ONE_ACTIVE))
809 >                                             rs, rs - ONE_ACTIVE))
810                      inactivate = active = w.active = false;
811              }
812              int wc = workerCounts;
# Line 859 | Line 824 | public class ForkJoinPool extends Abstra
824      }
825  
826      /**
827 <     * Adjusts counts and creates or resumes compensating threads for
828 <     * a worker blocking on task joinMe.  First tries resuming an
829 <     * existing spare (which usually also avoids any count
830 <     * adjustment), but must then decrement running count to determine
866 <     * whether a new thread is needed. See above for fuller
867 <     * explanation. This code is sprawled out non-modularly mainly
868 <     * because adaptive spinning works best if the entire method is
869 <     * either interpreted or compiled vs having only some pieces of it
870 <     * compiled.
827 >     * Tries to decrement running count, and if so, possibly creates
828 >     * or resumes compensating threads before blocking on task joinMe.
829 >     * This code is sprawled out with manual inlining to evade some
830 >     * JIT oddities.
831       *
832       * @param joinMe the task to join
833 <     * @return task status on exit (to simplify usage by callers)
833 >     * @return task status on exit
834       */
835 <    final int awaitJoin(ForkJoinTask<?> joinMe) {
836 <        int pc = parallelism;
837 <        boolean adj = false;        // true when running count adjusted
838 <        int scans = 0;
839 <
840 <        while (joinMe.status >= 0) {
841 <            ForkJoinWorkerThread spare = null;
842 <            if ((workerCounts & RUNNING_COUNT_MASK) < pc) {
835 >    final int tryAwaitJoin(ForkJoinTask<?> joinMe) {
836 >        int cw = workerCounts; // read now to spoil CAS if counts change as ...
837 >        releaseWaiters();      // ... a byproduct of releaseWaiters
838 >        int stat = joinMe.status;
839 >        if (stat >= 0 && // inline variant of tryDecrementRunningCount
840 >            (cw & RUNNING_COUNT_MASK) > 0 &&
841 >            UNSAFE.compareAndSwapInt(this, workerCountsOffset,
842 >                                     cw, cw - ONE_RUNNING)) {
843 >            int pc = parallelism;
844 >            int scans = 0;  // to require confirming passes to add threads
845 >            outer: while ((workerCounts & RUNNING_COUNT_MASK) < pc) {
846 >                if ((stat = joinMe.status) < 0)
847 >                    break;
848 >                ForkJoinWorkerThread spare = null;
849                  ForkJoinWorkerThread[] ws = workers;
850                  int nws = ws.length;
851                  for (int i = 0; i < nws; ++i) {
# Line 889 | Line 855 | public class ForkJoinPool extends Abstra
855                          break;
856                      }
857                  }
858 <                if (joinMe.status < 0)
858 >                if ((stat = joinMe.status) < 0) // recheck to narrow race
859                      break;
860 <            }
861 <            int wc = workerCounts;
862 <            int rc = wc & RUNNING_COUNT_MASK;
897 <            int dc = pc - rc;
898 <            if (dc > 0 && spare != null && spare.tryUnsuspend()) {
899 <                if (adj) {
900 <                    int c;
901 <                    do {} while (!UNSAFE.compareAndSwapInt
902 <                                 (this, workerCountsOffset,
903 <                                  c = workerCounts, c + ONE_RUNNING));
904 <                }
905 <                adj = true;
906 <                LockSupport.unpark(spare);
907 <            }
908 <            else if (adj) {
909 <                if (dc <= 0)
860 >                int wc = workerCounts;
861 >                int rc = wc & RUNNING_COUNT_MASK;
862 >                if (rc >= pc)
863                      break;
864 <                int tc = wc >>> TOTAL_COUNT_SHIFT;
865 <                if (scans > tc) {
866 <                    int ts = (tc - pc) * pc;
867 <                    if (rc != 0 &&  (dc * dc < ts || !maintainsParallelism))
868 <                        break;
869 <                    if (scans > ts && tc < maxPoolSize &&
870 <                        UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
918 <                                                 wc+(ONE_RUNNING|ONE_TOTAL))){
919 <                        addWorker();
864 >                if (spare != null) {
865 >                    if (spare.tryUnsuspend()) {
866 >                        int c; // inline incrementRunningCount
867 >                        do {} while (!UNSAFE.compareAndSwapInt
868 >                                     (this, workerCountsOffset,
869 >                                      c = workerCounts, c + ONE_RUNNING));
870 >                        LockSupport.unpark(spare);
871                          break;
872                      }
873 +                    continue;
874 +                }
875 +                int tc = wc >>> TOTAL_COUNT_SHIFT;
876 +                int sc = tc - pc;
877 +                if (rc > 0) {
878 +                    int p = pc;
879 +                    int s = sc;
880 +                    while (s-- >= 0) { // try keeping 3/4 live
881 +                        if (rc > (p -= (p >>> 2) + 1))
882 +                            break outer;
883 +                    }
884 +                }
885 +                if (scans++ > sc && tc < MAX_THREADS &&
886 +                    UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
887 +                                             wc + (ONE_RUNNING|ONE_TOTAL))) {
888 +                    addWorker();
889 +                    break;
890                  }
891              }
892 <            else if (rc != 0)
893 <                adj = UNSAFE.compareAndSwapInt (this, workerCountsOffset,
894 <                                                wc, wc - ONE_RUNNING);
927 <            if ((scans++ & 1) == 0)
928 <                releaseWaiters();   // help others progress
929 <            else
930 <                Thread.yield();     // avoid starving productive threads
931 <        }
932 <
933 <        if (adj) {
934 <            joinMe.internalAwaitDone();
935 <            int c;
892 >            if (stat >= 0)
893 >                stat = joinMe.internalAwaitDone();
894 >            int c; // inline incrementRunningCount
895              do {} while (!UNSAFE.compareAndSwapInt
896                           (this, workerCountsOffset,
897                            c = workerCounts, c + ONE_RUNNING));
898          }
899 <        return joinMe.status;
899 >        return stat;
900      }
901  
902      /**
903 <     * Same idea as awaitJoin
903 >     * Same idea as (and mostly pasted from) tryAwaitJoin, but
904 >     * self-contained
905       */
906 <    final void awaitBlocker(ManagedBlocker blocker, boolean maintainPar)
906 >    final void awaitBlocker(ManagedBlocker blocker)
907          throws InterruptedException {
908 <        maintainPar &= maintainsParallelism;
908 >        for (;;) {
909 >            if (blocker.isReleasable())
910 >                return;
911 >            int cw = workerCounts;
912 >            releaseWaiters();
913 >            if ((cw & RUNNING_COUNT_MASK) > 0 &&
914 >                UNSAFE.compareAndSwapInt(this, workerCountsOffset,
915 >                                         cw, cw - ONE_RUNNING))
916 >                break;
917 >        }
918 >        boolean done = false;
919          int pc = parallelism;
950        boolean adj = false;        // true when running count adjusted
920          int scans = 0;
921 <        boolean done;
953 <
954 <        for (;;) {
921 >        outer: while ((workerCounts & RUNNING_COUNT_MASK) < pc) {
922              if (done = blocker.isReleasable())
923                  break;
924              ForkJoinWorkerThread spare = null;
925 <            if ((workerCounts & RUNNING_COUNT_MASK) < pc) {
926 <                ForkJoinWorkerThread[] ws = workers;
927 <                int nws = ws.length;
928 <                for (int i = 0; i < nws; ++i) {
929 <                    ForkJoinWorkerThread w = ws[i];
930 <                    if (w != null && w.isSuspended()) {
964 <                        spare = w;
965 <                        break;
966 <                    }
967 <                }
968 <                if (done = blocker.isReleasable())
925 >            ForkJoinWorkerThread[] ws = workers;
926 >            int nws = ws.length;
927 >            for (int i = 0; i < nws; ++i) {
928 >                ForkJoinWorkerThread w = ws[i];
929 >                if (w != null && w.isSuspended()) {
930 >                    spare = w;
931                      break;
932 +                }
933              }
934 +            if (done = blocker.isReleasable())
935 +                break;
936              int wc = workerCounts;
937              int rc = wc & RUNNING_COUNT_MASK;
938 <            int dc = pc - rc;
939 <            if (dc > 0 && spare != null && spare.tryUnsuspend()) {
940 <                if (adj) {
938 >            if (rc >= pc)
939 >                break;
940 >            if (spare != null) {
941 >                if (spare.tryUnsuspend()) {
942                      int c;
943                      do {} while (!UNSAFE.compareAndSwapInt
944                                   (this, workerCountsOffset,
945                                    c = workerCounts, c + ONE_RUNNING));
946 +                    LockSupport.unpark(spare);
947 +                    break;
948                  }
949 <                adj = true;
982 <                LockSupport.unpark(spare);
949 >                continue;
950              }
951 <            else if (adj) {
952 <                if (dc <= 0)
953 <                    break;
954 <                int tc = wc >>> TOTAL_COUNT_SHIFT;
955 <                if (scans > tc) {
956 <                    int ts = (tc - pc) * pc;
957 <                    if (rc != 0 &&  (dc * dc < ts || !maintainPar))
958 <                        break;
992 <                    if (scans > ts && tc < maxPoolSize &&
993 <                        UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
994 <                                                 wc+(ONE_RUNNING|ONE_TOTAL))){
995 <                        addWorker();
996 <                        break;
997 <                    }
951 >            int tc = wc >>> TOTAL_COUNT_SHIFT;
952 >            int sc = tc - pc;
953 >            if (rc > 0) {
954 >                int p = pc;
955 >                int s = sc;
956 >                while (s-- >= 0) {
957 >                    if (rc > (p -= (p >>> 2) + 1))
958 >                        break outer;
959                  }
960              }
961 <            else if (rc != 0)
962 <                adj = UNSAFE.compareAndSwapInt (this, workerCountsOffset,
963 <                                                wc, wc - ONE_RUNNING);
964 <            if ((++scans & 1) == 0)
965 <                releaseWaiters();   // help others progress
966 <            else
1006 <                Thread.yield();     // avoid starving productive threads
961 >            if (scans++ > sc && tc < MAX_THREADS &&
962 >                UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
963 >                                         wc + (ONE_RUNNING|ONE_TOTAL))) {
964 >                addWorker();
965 >                break;
966 >            }
967          }
1008
968          try {
969              if (!done)
970 <                do {} while (!blocker.isReleasable() && !blocker.block());
970 >                do {} while (!blocker.isReleasable() &&
971 >                             !blocker.block());
972          } finally {
973 <            if (adj) {
974 <                int c;
975 <                do {} while (!UNSAFE.compareAndSwapInt
976 <                             (this, workerCountsOffset,
1017 <                              c = workerCounts, c + ONE_RUNNING));
1018 <            }
1019 <        }
1020 <    }
1021 <
1022 <    /**
1023 <     * Unless there are not enough other running threads, adjusts
1024 <     * counts and blocks a worker performing helpJoin that cannot find
1025 <     * any work.
1026 <     *
1027 <     * @return true if joinMe now done
1028 <     */
1029 <    final boolean tryAwaitBusyJoin(ForkJoinTask<?> joinMe) {
1030 <        int pc = parallelism;
1031 <        outer:for (;;) {
1032 <            releaseWaiters();
1033 <            if ((workerCounts & RUNNING_COUNT_MASK) < pc) {
1034 <                ForkJoinWorkerThread[] ws = workers;
1035 <                int nws = ws.length;
1036 <                for (int i = 0; i < nws; ++i) {
1037 <                    ForkJoinWorkerThread w = ws[i];
1038 <                    if (w != null && w.isSuspended()) {
1039 <                        if (joinMe.status < 0)
1040 <                            return true;
1041 <                        if ((workerCounts & RUNNING_COUNT_MASK) > pc)
1042 <                            break;
1043 <                        if (w.tryUnsuspend()) {
1044 <                            LockSupport.unpark(w);
1045 <                            break outer;
1046 <                        }
1047 <                        continue outer;
1048 <                    }
1049 <                }
1050 <            }
1051 <            if (joinMe.status < 0)
1052 <                return true;
1053 <            int wc = workerCounts;
1054 <            if ((wc & RUNNING_COUNT_MASK) <= 2 ||
1055 <                (wc >>> TOTAL_COUNT_SHIFT) < pc)
1056 <                return false;  // keep this thread alive
1057 <            if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1058 <                                         wc, wc - ONE_RUNNING))
1059 <                break;
973 >            int c;
974 >            do {} while (!UNSAFE.compareAndSwapInt
975 >                         (this, workerCountsOffset,
976 >                          c = workerCounts, c + ONE_RUNNING));
977          }
978 <
1062 <        joinMe.internalAwaitDone();
1063 <        int c;
1064 <        do {} while (!UNSAFE.compareAndSwapInt
1065 <                     (this, workerCountsOffset,
1066 <                      c = workerCounts, c + ONE_RUNNING));
1067 <        return true;
1068 <    }
978 >    }  
979  
980      /**
981       * Possibly initiates and/or completes termination.
# Line 1088 | Line 998 | public class ForkJoinPool extends Abstra
998          // Finish now if all threads terminated; else in some subsequent call
999          if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
1000              advanceRunLevel(TERMINATED);
1001 <            terminationLatch.countDown();
1001 >            termination.arrive();
1002          }
1003          return true;
1004      }
# Line 1101 | Line 1011 | public class ForkJoinPool extends Abstra
1011              cancelSubmissions();
1012              shutdownWorkers();
1013              cancelWorkerTasks();
1014 <            advanceEventCount();
1105 <            releaseWaiters();
1014 >            signalEvent();
1015              interruptWorkers();
1016          }
1017      }
# Line 1192 | Line 1101 | public class ForkJoinPool extends Abstra
1101       * active thread.
1102       */
1103      final int idlePerActive() {
1195        int ac = runState;    // no mask -- artifically boosts during shutdown
1104          int pc = parallelism; // use targeted parallelism, not rc
1105 +        int ac = runState;    // no mask -- artifically boosts during shutdown
1106          // Use exact results for small values, saturate past 4
1107          return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3;
1108      }
# Line 1204 | Line 1113 | public class ForkJoinPool extends Abstra
1113  
1114      /**
1115       * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1116 <     * java.lang.Runtime#availableProcessors}, and using the {@linkplain
1117 <     * #defaultForkJoinWorkerThreadFactory default thread factory}.
1116 >     * java.lang.Runtime#availableProcessors}, using the {@linkplain
1117 >     * #defaultForkJoinWorkerThreadFactory default thread factory},
1118 >     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1119       *
1120       * @throws SecurityException if a security manager exists and
1121       *         the caller is not permitted to modify threads
# Line 1214 | Line 1124 | public class ForkJoinPool extends Abstra
1124       */
1125      public ForkJoinPool() {
1126          this(Runtime.getRuntime().availableProcessors(),
1127 <             defaultForkJoinWorkerThreadFactory);
1127 >             defaultForkJoinWorkerThreadFactory, null, false);
1128      }
1129  
1130      /**
1131       * Creates a {@code ForkJoinPool} with the indicated parallelism
1132 <     * level and using the {@linkplain
1133 <     * #defaultForkJoinWorkerThreadFactory default thread factory}.
1132 >     * level, the {@linkplain
1133 >     * #defaultForkJoinWorkerThreadFactory default thread factory},
1134 >     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
1135       *
1136       * @param parallelism the parallelism level
1137       * @throws IllegalArgumentException if parallelism less than or
# Line 1231 | Line 1142 | public class ForkJoinPool extends Abstra
1142       *         java.lang.RuntimePermission}{@code ("modifyThread")}
1143       */
1144      public ForkJoinPool(int parallelism) {
1145 <        this(parallelism, defaultForkJoinWorkerThreadFactory);
1145 >        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
1146      }
1147  
1148      /**
1149 <     * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1239 <     * java.lang.Runtime#availableProcessors}, and using the given
1240 <     * thread factory.
1149 >     * Creates a {@code ForkJoinPool} with the given parameters.
1150       *
1151 <     * @param factory the factory for creating new threads
1152 <     * @throws NullPointerException if the factory is null
1153 <     * @throws SecurityException if a security manager exists and
1154 <     *         the caller is not permitted to modify threads
1155 <     *         because it does not hold {@link
1156 <     *         java.lang.RuntimePermission}{@code ("modifyThread")}
1157 <     */
1158 <    public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
1159 <        this(Runtime.getRuntime().availableProcessors(), factory);
1160 <    }
1161 <
1162 <    /**
1163 <     * Creates a {@code ForkJoinPool} with the given parallelism and
1255 <     * thread factory.
1256 <     *
1257 <     * @param parallelism the parallelism level
1258 <     * @param factory the factory for creating new threads
1151 >     * @param parallelism the parallelism level. For default value,
1152 >     * use {@link java.lang.Runtime#availableProcessors}.
1153 >     * @param factory the factory for creating new threads. For default value,
1154 >     * use {@link #defaultForkJoinWorkerThreadFactory}.
1155 >     * @param handler the handler for internal worker threads that
1156 >     * terminate due to unrecoverable errors encountered while executing
1157 >     * tasks. For default value, use <code>null</code>.
1158 >     * @param asyncMode if true,
1159 >     * establishes local first-in-first-out scheduling mode for forked
1160 >     * tasks that are never joined. This mode may be more appropriate
1161 >     * than default locally stack-based mode in applications in which
1162 >     * worker threads only process event-style asynchronous tasks.
1163 >     * For default value, use <code>false</code>.
1164       * @throws IllegalArgumentException if parallelism less than or
1165       *         equal to zero, or greater than implementation limit
1166       * @throws NullPointerException if the factory is null
# Line 1264 | Line 1169 | public class ForkJoinPool extends Abstra
1169       *         because it does not hold {@link
1170       *         java.lang.RuntimePermission}{@code ("modifyThread")}
1171       */
1172 <    public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
1172 >    public ForkJoinPool(int parallelism,
1173 >                        ForkJoinWorkerThreadFactory factory,
1174 >                        Thread.UncaughtExceptionHandler handler,
1175 >                        boolean asyncMode) {
1176          checkPermission();
1177          if (factory == null)
1178              throw new NullPointerException();
1179          if (parallelism <= 0 || parallelism > MAX_THREADS)
1180              throw new IllegalArgumentException();
1273        this.poolNumber = poolNumberGenerator.incrementAndGet();
1274        int arraySize = initialArraySizeFor(parallelism);
1181          this.parallelism = parallelism;
1182          this.factory = factory;
1183 <        this.maxPoolSize = MAX_THREADS;
1184 <        this.maintainsParallelism = true;
1183 >        this.ueh = handler;
1184 >        this.locallyFifo = asyncMode;
1185 >        int arraySize = initialArraySizeFor(parallelism);
1186          this.workers = new ForkJoinWorkerThread[arraySize];
1187          this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
1188          this.workerLock = new ReentrantLock();
1189 <        this.terminationLatch = new CountDownLatch(1);
1189 >        this.termination = new Phaser(1);
1190 >        this.poolNumber = poolNumberGenerator.incrementAndGet();
1191      }
1192  
1193      /**
# Line 1306 | Line 1214 | public class ForkJoinPool extends Abstra
1214              throw new NullPointerException();
1215          if (runState >= SHUTDOWN)
1216              throw new RejectedExecutionException();
1217 <        submissionQueue.offer(task);
1218 <        advanceEventCount();
1219 <        releaseWaiters();
1220 <        ensureEnoughTotalWorkers();
1217 >        // Convert submissions to current pool into forks
1218 >        Thread t = Thread.currentThread();
1219 >        ForkJoinWorkerThread w;
1220 >        if ((t instanceof ForkJoinWorkerThread) &&
1221 >            (w = (ForkJoinWorkerThread) t).pool == this)
1222 >            w.pushTask(task);
1223 >        else {
1224 >            submissionQueue.offer(task);
1225 >            signalEvent();
1226 >            ensureEnoughTotalWorkers();
1227 >        }
1228      }
1229  
1230      /**
1231       * Performs the given task, returning its result upon completion.
1232 +     * If the caller is already engaged in a fork/join computation in
1233 +     * the current pool, this method is equivalent in effect to
1234 +     * {@link ForkJoinTask#invoke}.
1235       *
1236       * @param task the task
1237       * @return the task's result
# Line 1328 | Line 1246 | public class ForkJoinPool extends Abstra
1246  
1247      /**
1248       * Arranges for (asynchronous) execution of the given task.
1249 +     * If the caller is already engaged in a fork/join computation in
1250 +     * the current pool, this method is equivalent in effect to
1251 +     * {@link ForkJoinTask#fork}.
1252       *
1253       * @param task the task
1254       * @throws NullPointerException if the task is null
# Line 1355 | Line 1276 | public class ForkJoinPool extends Abstra
1276      }
1277  
1278      /**
1279 +     * Submits a ForkJoinTask for execution.
1280 +     * If the caller is already engaged in a fork/join computation in
1281 +     * the current pool, this method is equivalent in effect to
1282 +     * {@link ForkJoinTask#fork}.
1283 +     *
1284 +     * @param task the task to submit
1285 +     * @return the task
1286 +     * @throws NullPointerException if the task is null
1287 +     * @throws RejectedExecutionException if the task cannot be
1288 +     *         scheduled for execution
1289 +     */
1290 +    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1291 +        doSubmit(task);
1292 +        return task;
1293 +    }
1294 +
1295 +    /**
1296       * @throws NullPointerException if the task is null
1297       * @throws RejectedExecutionException if the task cannot be
1298       *         scheduled for execution
# Line 1392 | Line 1330 | public class ForkJoinPool extends Abstra
1330      }
1331  
1332      /**
1395     * Submits a ForkJoinTask for execution.
1396     *
1397     * @param task the task to submit
1398     * @return the task
1399     * @throws NullPointerException if the task is null
1400     * @throws RejectedExecutionException if the task cannot be
1401     *         scheduled for execution
1402     */
1403    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1404        doSubmit(task);
1405        return task;
1406    }
1407
1408    /**
1333       * @throws NullPointerException       {@inheritDoc}
1334       * @throws RejectedExecutionException {@inheritDoc}
1335       */
# Line 1447 | Line 1371 | public class ForkJoinPool extends Abstra
1371       * @return the handler, or {@code null} if none
1372       */
1373      public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
1450        workerCountReadFence();
1374          return ueh;
1375      }
1376  
1377      /**
1455     * Sets the handler for internal worker threads that terminate due
1456     * to unrecoverable errors encountered while executing tasks.
1457     * Unless set, the current default or ThreadGroup handler is used
1458     * as handler.
1459     *
1460     * @param h the new handler
1461     * @return the old handler, or {@code null} if none
1462     * @throws SecurityException if a security manager exists and
1463     *         the caller is not permitted to modify threads
1464     *         because it does not hold {@link
1465     *         java.lang.RuntimePermission}{@code ("modifyThread")}
1466     */
1467    public Thread.UncaughtExceptionHandler
1468        setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
1469        checkPermission();
1470        Thread.UncaughtExceptionHandler old = ueh;
1471        if (h != old) {
1472            ueh = h;
1473            ForkJoinWorkerThread[] ws = workers;
1474            int nws = ws.length;
1475            for (int i = 0; i < nws; ++i) {
1476                ForkJoinWorkerThread w = ws[i];
1477                if (w != null)
1478                    w.setUncaughtExceptionHandler(h);
1479            }
1480        }
1481        return old;
1482    }
1483
1484    /**
1485     * Sets the target parallelism level of this pool.
1486     *
1487     * @param parallelism the target parallelism
1488     * @throws IllegalArgumentException if parallelism less than or
1489     * equal to zero or greater than maximum size bounds
1490     * @throws SecurityException if a security manager exists and
1491     *         the caller is not permitted to modify threads
1492     *         because it does not hold {@link
1493     *         java.lang.RuntimePermission}{@code ("modifyThread")}
1494     */
1495    public void setParallelism(int parallelism) {
1496        checkPermission();
1497        if (parallelism <= 0 || parallelism > maxPoolSize)
1498            throw new IllegalArgumentException();
1499        workerCountReadFence();
1500        int pc = this.parallelism;
1501        if (pc != parallelism) {
1502            this.parallelism = parallelism;
1503            workerCountWriteFence();
1504            // Release spares. If too many, some will die after re-suspend
1505            ForkJoinWorkerThread[] ws = workers;
1506            int nws = ws.length;
1507            for (int i = 0; i < nws; ++i) {
1508                ForkJoinWorkerThread w = ws[i];
1509                if (w != null && w.tryUnsuspend()) {
1510                    int c;
1511                    do {} while (!UNSAFE.compareAndSwapInt
1512                                 (this, workerCountsOffset,
1513                                  c = workerCounts, c + ONE_RUNNING));
1514                    LockSupport.unpark(w);
1515                }
1516            }
1517            ensureEnoughTotalWorkers();
1518            advanceEventCount();
1519            releaseWaiters(); // force config recheck by existing workers
1520        }
1521    }
1522
1523    /**
1378       * Returns the targeted parallelism level of this pool.
1379       *
1380       * @return the targeted parallelism level of this pool
1381       */
1382      public int getParallelism() {
1529        //        workerCountReadFence(); // inlined below
1530        int ignore = workerCounts;
1383          return parallelism;
1384      }
1385  
# Line 1544 | Line 1396 | public class ForkJoinPool extends Abstra
1396      }
1397  
1398      /**
1547     * Returns the maximum number of threads allowed to exist in the
1548     * pool. Unless set using {@link #setMaximumPoolSize}, the
1549     * maximum is an implementation-defined value designed only to
1550     * prevent runaway growth.
1551     *
1552     * @return the maximum
1553     */
1554    public int getMaximumPoolSize() {
1555        workerCountReadFence();
1556        return maxPoolSize;
1557    }
1558
1559    /**
1560     * Sets the maximum number of threads allowed to exist in the
1561     * pool. The given value should normally be greater than or equal
1562     * to the {@link #getParallelism parallelism} level. Setting this
1563     * value has no effect on current pool size. It controls
1564     * construction of new threads. The use of this method may cause
1565     * tasks that intrinsically require extra threads for dependent
1566     * computations to indefinitely stall. If you are instead trying
1567     * to minimize internal thread creation, consider setting {@link
1568     * #setMaintainsParallelism} as false.
1569     *
1570     * @throws IllegalArgumentException if negative or greater than
1571     * internal implementation limit
1572     */
1573    public void setMaximumPoolSize(int newMax) {
1574        if (newMax < 0 || newMax > MAX_THREADS)
1575            throw new IllegalArgumentException();
1576        maxPoolSize = newMax;
1577        workerCountWriteFence();
1578    }
1579
1580    /**
1581     * Returns {@code true} if this pool dynamically maintains its
1582     * target parallelism level. If false, new threads are added only
1583     * to avoid possible starvation.  This setting is by default true.
1584     *
1585     * @return {@code true} if maintains parallelism
1586     */
1587    public boolean getMaintainsParallelism() {
1588        workerCountReadFence();
1589        return maintainsParallelism;
1590    }
1591
1592    /**
1593     * Sets whether this pool dynamically maintains its target
1594     * parallelism level. If false, new threads are added only to
1595     * avoid possible starvation.
1596     *
1597     * @param enable {@code true} to maintain parallelism
1598     */
1599    public void setMaintainsParallelism(boolean enable) {
1600        maintainsParallelism = enable;
1601        workerCountWriteFence();
1602    }
1603
1604    /**
1605     * Establishes local first-in-first-out scheduling mode for forked
1606     * tasks that are never joined. This mode may be more appropriate
1607     * than default locally stack-based mode in applications in which
1608     * worker threads only process asynchronous tasks.  This method is
1609     * designed to be invoked only when the pool is quiescent, and
1610     * typically only before any tasks are submitted. The effects of
1611     * invocations at other times may be unpredictable.
1612     *
1613     * @param async if {@code true}, use locally FIFO scheduling
1614     * @return the previous mode
1615     * @see #getAsyncMode
1616     */
1617    public boolean setAsyncMode(boolean async) {
1618        workerCountReadFence();
1619        boolean oldMode = locallyFifo;
1620        if (oldMode != async) {
1621            locallyFifo = async;
1622            workerCountWriteFence();
1623            ForkJoinWorkerThread[] ws = workers;
1624            int nws = ws.length;
1625            for (int i = 0; i < nws; ++i) {
1626                ForkJoinWorkerThread w = ws[i];
1627                if (w != null)
1628                    w.setAsyncMode(async);
1629            }
1630        }
1631        return oldMode;
1632    }
1633
1634    /**
1399       * Returns {@code true} if this pool uses local first-in-first-out
1400       * scheduling mode for forked tasks that are never joined.
1401       *
1402       * @return {@code true} if this pool uses async mode
1639     * @see #setAsyncMode
1403       */
1404      public boolean getAsyncMode() {
1642        workerCountReadFence();
1405          return locallyFifo;
1406      }
1407  
# Line 1780 | Line 1542 | public class ForkJoinPool extends Abstra
1542      }
1543  
1544      /**
1545 +     * Returns count of total parks by existing workers.
1546 +     * Used during development only since not meaningful to users.
1547 +     */
1548 +    private int collectParkCount() {
1549 +        int count = 0;
1550 +        ForkJoinWorkerThread[] ws = workers;
1551 +        int nws = ws.length;
1552 +        for (int i = 0; i < nws; ++i) {
1553 +            ForkJoinWorkerThread w = ws[i];
1554 +            if (w != null)
1555 +                count += w.parkCount;
1556 +        }
1557 +        return count;
1558 +    }
1559 +
1560 +    /**
1561       * Returns a string identifying this pool, as well as its state,
1562       * including indications of run state, parallelism level, and
1563       * worker and task counts.
# Line 1796 | Line 1574 | public class ForkJoinPool extends Abstra
1574          int pc = parallelism;
1575          int rs = runState;
1576          int ac = rs & ACTIVE_COUNT_MASK;
1577 +        //        int pk = collectParkCount();
1578          return super.toString() +
1579              "[" + runLevelToString(rs) +
1580              ", parallelism = " + pc +
# Line 1805 | Line 1584 | public class ForkJoinPool extends Abstra
1584              ", steals = " + st +
1585              ", tasks = " + qt +
1586              ", submissions = " + qs +
1587 +            //            ", parks = " + pk +
1588              "]";
1589      }
1590  
# Line 1900 | Line 1680 | public class ForkJoinPool extends Abstra
1680       */
1681      public boolean awaitTermination(long timeout, TimeUnit unit)
1682          throws InterruptedException {
1683 <        return terminationLatch.await(timeout, unit);
1683 >        try {
1684 >            return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
1685 >        } catch(TimeoutException ex) {
1686 >            return false;
1687 >        }
1688      }
1689  
1690      /**
# Line 1952 | Line 1736 | public class ForkJoinPool extends Abstra
1736       * Blocks in accord with the given blocker.  If the current thread
1737       * is a {@link ForkJoinWorkerThread}, this method possibly
1738       * arranges for a spare thread to be activated if necessary to
1739 <     * ensure parallelism while the current thread is blocked.
1956 <     *
1957 <     * <p>If {@code maintainParallelism} is {@code true} and the pool
1958 <     * supports it ({@link #getMaintainsParallelism}), this method
1959 <     * attempts to maintain the pool's nominal parallelism. Otherwise
1960 <     * it activates a thread only if necessary to avoid complete
1961 <     * starvation. This option may be preferable when blockages use
1962 <     * timeouts, or are almost always brief.
1739 >     * ensure sufficient parallelism while the current thread is blocked.
1740       *
1741       * <p>If the caller is not a {@link ForkJoinTask}, this method is
1742       * behaviorally equivalent to
# Line 1973 | Line 1750 | public class ForkJoinPool extends Abstra
1750       * first be expanded to ensure parallelism, and later adjusted.
1751       *
1752       * @param blocker the blocker
1976     * @param maintainParallelism if {@code true} and supported by
1977     * this pool, attempt to maintain the pool's nominal parallelism;
1978     * otherwise activate a thread only if necessary to avoid
1979     * complete starvation.
1753       * @throws InterruptedException if blocker.block did so
1754       */
1755 <    public static void managedBlock(ManagedBlocker blocker,
1983 <                                    boolean maintainParallelism)
1755 >    public static void managedBlock(ManagedBlocker blocker)
1756          throws InterruptedException {
1757          Thread t = Thread.currentThread();
1758          if (t instanceof ForkJoinWorkerThread)
1759 <            ((ForkJoinWorkerThread) t).pool.
1760 <                awaitBlocker(blocker, maintainParallelism);
1761 <        else
1762 <            awaitBlocker(blocker);
1991 <    }
1992 <
1993 <    /**
1994 <     * Performs Non-FJ blocking
1995 <     */
1996 <    private static void awaitBlocker(ManagedBlocker blocker)
1997 <        throws InterruptedException {
1998 <        do {} while (!blocker.isReleasable() && !blocker.block());
1759 >            ((ForkJoinWorkerThread) t).pool.awaitBlocker(blocker);
1760 >        else {
1761 >            do {} while (!blocker.isReleasable() && !blocker.block());
1762 >        }
1763      }
1764  
1765      // AbstractExecutorService overrides.  These rely on undocumented

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines