ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.39 by jsr166, Sun Aug 2 17:55:51 2009 UTC vs.
Revision 1.50 by dl, Fri Dec 4 12:09:46 2009 UTC

# Line 23 | Line 23 | import java.util.concurrent.atomic.Atomi
23   * An {@link ExecutorService} for running {@link ForkJoinTask}s.
24   * A {@code ForkJoinPool} provides the entry point for submissions
25   * from non-{@code ForkJoinTask}s, as well as management and
26 < * monitoring operations.  Normally a single {@code ForkJoinPool} is
27 < * used for a large number of submitted tasks. Otherwise, use would
28 < * not usually outweigh the construction and bookkeeping overhead of
29 < * creating a large set of threads.
26 > * monitoring operations.
27   *
28 < * <p>{@code ForkJoinPool}s differ from other kinds of {@link
29 < * Executor}s mainly in that they provide <em>work-stealing</em>: all
30 < * threads in the pool attempt to find and execute subtasks created by
31 < * other active tasks (eventually blocking if none exist). This makes
32 < * them efficient when most tasks spawn other subtasks (as do most
33 < * {@code ForkJoinTask}s), as well as the mixed execution of some
34 < * plain {@code Runnable}- or {@code Callable}- based activities along
35 < * with {@code ForkJoinTask}s. When setting {@linkplain #setAsyncMode
36 < * async mode}, a {@code ForkJoinPool} may also be appropriate for use
37 < * with fine-grained tasks that are never joined. Otherwise, other
38 < * {@code ExecutorService} implementations are typically more
39 < * appropriate choices.
28 > * <p>A {@code ForkJoinPool} differs from other kinds of {@link
29 > * ExecutorService} mainly by virtue of employing
30 > * <em>work-stealing</em>: all threads in the pool attempt to find and
31 > * execute subtasks created by other active tasks (eventually blocking
32 > * waiting for work if none exist). This enables efficient processing
33 > * when most tasks spawn other subtasks (as do most {@code
34 > * ForkJoinTask}s). A {@code ForkJoinPool} may also be used for mixed
35 > * execution of some plain {@code Runnable}- or {@code Callable}-
36 > * based activities along with {@code ForkJoinTask}s. When setting
37 > * {@linkplain #setAsyncMode async mode}, a {@code ForkJoinPool} may
38 > * also be appropriate for use with fine-grained tasks of any form
39 > * that are never joined. Otherwise, other {@code ExecutorService}
40 > * implementations are typically more appropriate choices.
41   *
42 < * <p>A {@code ForkJoinPool} may be constructed with a given
43 < * parallelism level (target pool size), which it attempts to maintain
44 < * by dynamically adding, suspending, or resuming threads, even if
45 < * some tasks are waiting to join others. However, no such adjustments
42 > * <p>A {@code ForkJoinPool} is constructed with a given target
43 > * parallelism level; by default, equal to the number of available
44 > * processors. Unless configured otherwise via {@link
45 > * #setMaintainsParallelism}, the pool attempts to maintain this
46 > * number of active (or available) threads by dynamically adding,
47 > * suspending, or resuming internal worker threads, even if some tasks
48 > * are stalled waiting to join others. However, no such adjustments
49   * are performed in the face of blocked IO or other unmanaged
50   * synchronization. The nested {@link ManagedBlocker} interface
51   * enables extension of the kinds of synchronization accommodated.
52   * The target parallelism level may also be changed dynamically
53 < * ({@link #setParallelism}) and thread construction can be limited
54 < * using methods {@link #setMaximumPoolSize} and/or {@link
55 < * #setMaintainsParallelism}.
53 > * ({@link #setParallelism}). The total number of threads may be
54 > * limited using method {@link #setMaximumPoolSize}, in which case it
55 > * may become possible for the activities of a pool to stall due to
56 > * the lack of available threads to process new tasks.
57   *
58   * <p>In addition to execution and lifecycle control methods, this
59   * class provides status check methods (for example
# Line 60 | Line 62 | import java.util.concurrent.atomic.Atomi
62   * {@link #toString} returns indications of pool state in a
63   * convenient form for informal monitoring.
64   *
65 + * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
66 + * used for all parallel task execution in a program or subsystem.
67 + * Otherwise, use would not usually outweigh the construction and
68 + * bookkeeping overhead of creating a large set of threads. For
69 + * example, a common pool could be used for the {@code SortTasks}
70 + * illustrated in {@link RecursiveAction}. Because {@code
71 + * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
72 + * daemon} mode, there is typically no need to explicitly {@link
73 + * #shutdown} such a pool upon program exit.
74 + *
75 + * <pre>
76 + * static final ForkJoinPool mainPool = new ForkJoinPool();
77 + * ...
78 + * public void sort(long[] array) {
79 + *   mainPool.invoke(new SortTask(array, 0, array.length));
80 + * }
81 + * </pre>
82 + *
83   * <p><b>Implementation notes</b>: This implementation restricts the
84   * maximum number of running threads to 32767. Attempts to create
85 < * pools with greater than the maximum result in
85 > * pools with greater than the maximum number result in
86   * {@code IllegalArgumentException}.
87   *
88 + * <p>This implementation rejects submitted tasks (that is, by throwing
89 + * {@link RejectedExecutionException}) only when the pool is shut down.
90 + *
91   * @since 1.7
92   * @author Doug Lea
93   */
# Line 92 | Line 115 | public class ForkJoinPool extends Abstra
115           * Returns a new worker thread operating in the given pool.
116           *
117           * @param pool the pool this thread works in
118 <         * @throws NullPointerException if pool is null
118 >         * @throws NullPointerException if the pool is null
119           */
120          public ForkJoinWorkerThread newThread(ForkJoinPool pool);
121      }
# Line 343 | Line 366 | public class ForkJoinPool extends Abstra
366      // Constructors
367  
368      /**
369 <     * Creates a ForkJoinPool with a pool size equal to the number of
370 <     * processors available on the system, using the default
371 <     * ForkJoinWorkerThreadFactory.
369 >     * Creates a {@code ForkJoinPool} with parallelism equal to {@link
370 >     * java.lang.Runtime#availableProcessors}, and using the {@linkplain
371 >     * #defaultForkJoinWorkerThreadFactory default thread factory}.
372       *
373       * @throws SecurityException if a security manager exists and
374       *         the caller is not permitted to modify threads
# Line 358 | Line 381 | public class ForkJoinPool extends Abstra
381      }
382  
383      /**
384 <     * Creates a ForkJoinPool with the indicated parallelism level
385 <     * threads and using the default ForkJoinWorkerThreadFactory.
384 >     * Creates a {@code ForkJoinPool} with the indicated parallelism
385 >     * level and using the {@linkplain
386 >     * #defaultForkJoinWorkerThreadFactory default thread factory}.
387       *
388 <     * @param parallelism the number of worker threads
388 >     * @param parallelism the parallelism level
389       * @throws IllegalArgumentException if parallelism less than or
390 <     * equal to zero
390 >     *         equal to zero, or greater than implementation limit
391       * @throws SecurityException if a security manager exists and
392       *         the caller is not permitted to modify threads
393       *         because it does not hold {@link
# Line 374 | Line 398 | public class ForkJoinPool extends Abstra
398      }
399  
400      /**
401 <     * Creates a ForkJoinPool with parallelism equal to the number of
402 <     * processors available on the system and using the given
403 <     * ForkJoinWorkerThreadFactory.
401 >     * Creates a {@code ForkJoinPool} with parallelism equal to {@link
402 >     * java.lang.Runtime#availableProcessors}, and using the given
403 >     * thread factory.
404       *
405       * @param factory the factory for creating new threads
406 <     * @throws NullPointerException if factory is null
406 >     * @throws NullPointerException if the factory is null
407       * @throws SecurityException if a security manager exists and
408       *         the caller is not permitted to modify threads
409       *         because it does not hold {@link
# Line 390 | Line 414 | public class ForkJoinPool extends Abstra
414      }
415  
416      /**
417 <     * Creates a ForkJoinPool with the given parallelism and factory.
417 >     * Creates a {@code ForkJoinPool} with the given parallelism and
418 >     * thread factory.
419       *
420 <     * @param parallelism the targeted number of worker threads
420 >     * @param parallelism the parallelism level
421       * @param factory the factory for creating new threads
422       * @throws IllegalArgumentException if parallelism less than or
423 <     * equal to zero, or greater than implementation limit
424 <     * @throws NullPointerException if factory is null
423 >     *         equal to zero, or greater than implementation limit
424 >     * @throws NullPointerException if the factory is null
425       * @throws SecurityException if a security manager exists and
426       *         the caller is not permitted to modify threads
427       *         because it does not hold {@link
# Line 424 | Line 449 | public class ForkJoinPool extends Abstra
449       * Creates a new worker thread using factory.
450       *
451       * @param index the index to assign worker
452 <     * @return new worker, or null of factory failed
452 >     * @return new worker, or null if factory failed
453       */
454      private ForkJoinWorkerThread createWorker(int index) {
455          Thread.UncaughtExceptionHandler h = ueh;
# Line 445 | Line 470 | public class ForkJoinPool extends Abstra
470       * Currently requires size to be a power of two.
471       */
472      private static int arraySizeFor(int poolSize) {
473 <        return (poolSize <= 1) ? 1 :
474 <            (1 << (32 - Integer.numberOfLeadingZeros(poolSize-1)));
473 >        if (poolSize <= 1)
474 >            return 1;
475 >        // See Hackers Delight, sec 3.2
476 >        int c = poolSize >= MAX_THREADS ? MAX_THREADS : (poolSize - 1);
477 >        c |= c >>>  1;
478 >        c |= c >>>  2;
479 >        c |= c >>>  4;
480 >        c |= c >>>  8;
481 >        c |= c >>> 16;
482 >        return c + 1;
483      }
484  
485      /**
# Line 493 | Line 526 | public class ForkJoinPool extends Abstra
526                  ws = workers;
527                  if (ws == null) {
528                      int ps = parallelism;
529 +                    updateWorkerCount(ps);
530                      ws = ensureWorkerArrayCapacity(ps);
531                      for (int i = 0; i < ps; ++i) {
532                          ForkJoinWorkerThread w = createWorker(i);
533                          if (w != null) {
534                              ws[i] = w;
535                              w.start();
502                            updateWorkerCount(1);
536                          }
537 +                        else
538 +                            updateWorkerCount(-1);
539                      }
540                  }
541              } finally {
# Line 564 | Line 599 | public class ForkJoinPool extends Abstra
599       *
600       * @param task the task
601       * @return the task's result
602 <     * @throws NullPointerException if task is null
603 <     * @throws RejectedExecutionException if pool is shut down
602 >     * @throws NullPointerException if the task is null
603 >     * @throws RejectedExecutionException if the task cannot be
604 >     *         scheduled for execution
605       */
606      public <T> T invoke(ForkJoinTask<T> task) {
607          doSubmit(task);
# Line 576 | Line 612 | public class ForkJoinPool extends Abstra
612       * Arranges for (asynchronous) execution of the given task.
613       *
614       * @param task the task
615 <     * @throws NullPointerException if task is null
616 <     * @throws RejectedExecutionException if pool is shut down
615 >     * @throws NullPointerException if the task is null
616 >     * @throws RejectedExecutionException if the task cannot be
617 >     *         scheduled for execution
618       */
619      public void execute(ForkJoinTask<?> task) {
620          doSubmit(task);
# Line 585 | Line 622 | public class ForkJoinPool extends Abstra
622  
623      // AbstractExecutorService methods
624  
625 +    /**
626 +     * @throws NullPointerException if the task is null
627 +     * @throws RejectedExecutionException if the task cannot be
628 +     *         scheduled for execution
629 +     */
630      public void execute(Runnable task) {
631          ForkJoinTask<?> job;
632          if (task instanceof ForkJoinTask<?>) // avoid re-wrap
# Line 594 | Line 636 | public class ForkJoinPool extends Abstra
636          doSubmit(job);
637      }
638  
639 +    /**
640 +     * @throws NullPointerException if the task is null
641 +     * @throws RejectedExecutionException if the task cannot be
642 +     *         scheduled for execution
643 +     */
644      public <T> ForkJoinTask<T> submit(Callable<T> task) {
645          ForkJoinTask<T> job = ForkJoinTask.adapt(task);
646          doSubmit(job);
647          return job;
648      }
649  
650 +    /**
651 +     * @throws NullPointerException if the task is null
652 +     * @throws RejectedExecutionException if the task cannot be
653 +     *         scheduled for execution
654 +     */
655      public <T> ForkJoinTask<T> submit(Runnable task, T result) {
656          ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
657          doSubmit(job);
658          return job;
659      }
660  
661 +    /**
662 +     * @throws NullPointerException if the task is null
663 +     * @throws RejectedExecutionException if the task cannot be
664 +     *         scheduled for execution
665 +     */
666      public ForkJoinTask<?> submit(Runnable task) {
667          ForkJoinTask<?> job;
668          if (task instanceof ForkJoinTask<?>) // avoid re-wrap
# Line 621 | Line 678 | public class ForkJoinPool extends Abstra
678       *
679       * @param task the task to submit
680       * @return the task
681 +     * @throws NullPointerException if the task is null
682       * @throws RejectedExecutionException if the task cannot be
683       *         scheduled for execution
626     * @throws NullPointerException if the task is null
684       */
685      public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
686          doSubmit(task);
# Line 631 | Line 688 | public class ForkJoinPool extends Abstra
688      }
689  
690  
691 +    /**
692 +     * @throws NullPointerException       {@inheritDoc}
693 +     * @throws RejectedExecutionException {@inheritDoc}
694 +     */
695      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
696          ArrayList<ForkJoinTask<T>> forkJoinTasks =
697              new ArrayList<ForkJoinTask<T>>(tasks.size());
# Line 737 | Line 798 | public class ForkJoinPool extends Abstra
798          final ReentrantLock lock = this.workerLock;
799          lock.lock();
800          try {
801 <            if (!isTerminating()) {
801 >            if (isProcessingTasks()) {
802                  int p = this.parallelism;
803                  this.parallelism = parallelism;
804 <                if (parallelism > p)
805 <                    createAndStartAddedWorkers();
806 <                else
807 <                    trimSpares();
804 >                if (workers != null) {
805 >                    if (parallelism > p)
806 >                        createAndStartAddedWorkers();
807 >                    else
808 >                        trimSpares();
809 >                }
810              }
811          } finally {
812              lock.unlock();
# Line 752 | Line 815 | public class ForkJoinPool extends Abstra
815      }
816  
817      /**
818 <     * Returns the targeted number of worker threads in this pool.
818 >     * Returns the targeted parallelism level of this pool.
819       *
820 <     * @return the targeted number of worker threads in this pool
820 >     * @return the targeted parallelism level of this pool
821       */
822      public int getParallelism() {
823          return parallelism;
# Line 774 | Line 837 | public class ForkJoinPool extends Abstra
837  
838      /**
839       * Returns the maximum number of threads allowed to exist in the
840 <     * pool, even if there are insufficient unblocked running threads.
840 >     * pool. Unless set using {@link #setMaximumPoolSize}, the
841 >     * maximum is an implementation-defined value designed only to
842 >     * prevent runaway growth.
843       *
844       * @return the maximum
845       */
# Line 784 | Line 849 | public class ForkJoinPool extends Abstra
849  
850      /**
851       * Sets the maximum number of threads allowed to exist in the
852 <     * pool, even if there are insufficient unblocked running threads.
853 <     * Setting this value has no effect on current pool size. It
854 <     * controls construction of new threads.
852 >     * pool. The given value should normally be greater than or equal
853 >     * to the {@link #getParallelism parallelism} level. Setting this
854 >     * value has no effect on current pool size. It controls
855 >     * construction of new threads.
856       *
857       * @throws IllegalArgumentException if negative or greater than
858       * internal implementation limit
# Line 956 | Line 1022 | public class ForkJoinPool extends Abstra
1022      }
1023  
1024      /**
1025 <     * Returns an estimate of the number tasks submitted to this pool
1026 <     * that have not yet begun executing. This method takes time
1025 >     * Returns an estimate of the number of tasks submitted to this
1026 >     * pool that have not yet begun executing.  This method takes time
1027       * proportional to the number of submissions.
1028       *
1029       * @return the number of queued submissions
# Line 991 | Line 1057 | public class ForkJoinPool extends Abstra
1057       * Removes all available unexecuted submitted and forked tasks
1058       * from scheduling queues and adds them to the given collection,
1059       * without altering their execution status. These may include
1060 <     * artificially generated or wrapped tasks. This method is designed
1061 <     * to be invoked only when the pool is known to be
1060 >     * artificially generated or wrapped tasks. This method is
1061 >     * designed to be invoked only when the pool is known to be
1062       * quiescent. Invocations at other times may not remove all
1063       * tasks. A failure encountered while attempting to add elements
1064       * to collection {@code c} may result in elements being in
# Line 1044 | Line 1110 | public class ForkJoinPool extends Abstra
1110      }
1111  
1112      private static String runStateToString(int rs) {
1113 <        switch(rs) {
1113 >        switch (rs) {
1114          case RUNNING: return "Running";
1115          case SHUTDOWN: return "Shutting down";
1116          case TERMINATING: return "Terminating";
# Line 1089 | Line 1155 | public class ForkJoinPool extends Abstra
1155      }
1156  
1157      /**
1158 <     * Attempts to stop all actively executing tasks, and cancels all
1159 <     * waiting tasks.  Tasks that are in the process of being
1160 <     * submitted or executed concurrently during the course of this
1161 <     * method may or may not be rejected. Unlike some other executors,
1162 <     * this method cancels rather than collects non-executed tasks
1163 <     * upon termination, so always returns an empty list. However, you
1164 <     * can use method {@link #drainTasksTo} before invoking this
1165 <     * method to transfer unexecuted tasks to another collection.
1158 >     * Attempts to cancel and/or stop all tasks, and reject all
1159 >     * subsequently submitted tasks.  Tasks that are in the process of
1160 >     * being submitted or executed concurrently during the course of
1161 >     * this method may or may not be rejected. This method cancels
1162 >     * both existing and unexecuted tasks, in order to permit
1163 >     * termination in the presence of task dependencies. So the method
1164 >     * always returns an empty list (unlike the case for some other
1165 >     * Executors).
1166       *
1167       * @return an empty list
1168       * @throws SecurityException if a security manager exists and
# Line 1121 | Line 1187 | public class ForkJoinPool extends Abstra
1187  
1188      /**
1189       * Returns {@code true} if the process of termination has
1190 <     * commenced but possibly not yet completed.
1190 >     * commenced but not yet completed.  This method may be useful for
1191 >     * debugging. A return of {@code true} reported a sufficient
1192 >     * period after shutdown may indicate that submitted tasks have
1193 >     * ignored or suppressed interruption, causing this executor not
1194 >     * to properly terminate.
1195       *
1196 <     * @return {@code true} if terminating
1196 >     * @return {@code true} if terminating but not yet terminated
1197       */
1198      public boolean isTerminating() {
1199 <        return runStateOf(runControl) >= TERMINATING;
1199 >        return runStateOf(runControl) == TERMINATING;
1200      }
1201  
1202      /**
# Line 1139 | Line 1209 | public class ForkJoinPool extends Abstra
1209      }
1210  
1211      /**
1212 +     * Returns true if pool is not terminating or terminated.
1213 +     * Used internally to suppress execution when terminating.
1214 +     */
1215 +    final boolean isProcessingTasks() {
1216 +        return runStateOf(runControl) < TERMINATING;
1217 +    }
1218 +
1219 +    /**
1220       * Blocks until all tasks have completed execution after a shutdown
1221       * request, or the timeout occurs, or the current thread is
1222       * interrupted, whichever happens first.
# Line 1192 | Line 1270 | public class ForkJoinPool extends Abstra
1270                      transitionRunStateTo(TERMINATED);
1271                      termination.signalAll();
1272                  }
1273 <                else if (!isTerminating()) {
1273 >                else if (isProcessingTasks()) {
1274                      tryShrinkWorkerArray();
1275                      tryResumeSpare(true); // allow replacement
1276                  }
# Line 1302 | Line 1380 | public class ForkJoinPool extends Abstra
1380          }
1381      }
1382  
1305
1383      /*
1384       * Nodes for event barrier to manage idle threads.  Queue nodes
1385       * are basic Treiber stack nodes, also used for spare stack.
# Line 1326 | Line 1403 | public class ForkJoinPool extends Abstra
1403       * handling: Method signalWork returns without advancing count if
1404       * the queue appears to be empty.  This would ordinarily result in
1405       * races causing some queued waiters not to be woken up. To avoid
1406 <     * this, the first worker enqueued in method sync (see
1407 <     * syncIsReleasable) rescans for tasks after being enqueued, and
1408 <     * helps signal if any are found. This works well because the
1409 <     * worker has nothing better to do, and so might as well help
1410 <     * alleviate the overhead and contention on the threads actually
1411 <     * doing work.  Also, since event counts increments on task
1412 <     * availability exist to maintain liveness (rather than to force
1413 <     * refreshes etc), it is OK for callers to exit early if
1337 <     * contending with another signaller.
1406 >     * this, the first worker enqueued in method sync rescans for
1407 >     * tasks after being enqueued, and helps signal if any are
1408 >     * found. This works well because the worker has nothing better to
1409 >     * do, and so might as well help alleviate the overhead and
1410 >     * contention on the threads actually doing work.  Also, since
1411 >     * event counts increments on task availability exist to maintain
1412 >     * liveness (rather than to force refreshes etc), it is OK for
1413 >     * callers to exit early if contending with another signaller.
1414       */
1415      static final class WaitQueueNode {
1416          WaitQueueNode next; // only written before enqueued
# Line 1347 | Line 1423 | public class ForkJoinPool extends Abstra
1423          }
1424  
1425          /**
1426 <         * Wakes up waiter, returning false if known to already
1426 >         * Wakes up waiter, returning false if known to already be awake
1427           */
1428          boolean signal() {
1429              ForkJoinWorkerThread t = thread;
# Line 1357 | Line 1433 | public class ForkJoinPool extends Abstra
1433              LockSupport.unpark(t);
1434              return true;
1435          }
1360
1361        /**
1362         * Awaits release on sync.
1363         */
1364        void awaitSyncRelease(ForkJoinPool p) {
1365            while (thread != null && !p.syncIsReleasable(this))
1366                LockSupport.park(this);
1367        }
1368
1369        /**
1370         * Awaits resumption as spare.
1371         */
1372        void awaitSpareRelease() {
1373            while (thread != null) {
1374                if (!Thread.interrupted())
1375                    LockSupport.park(this);
1376            }
1377        }
1436      }
1437  
1438      /**
1439       * Ensures that no thread is waiting for count to advance from the
1440       * current value of eventCount read on entry to this method, by
1441       * releasing waiting threads if necessary.
1384     *
1385     * @return the count
1442       */
1443 <    final long ensureSync() {
1443 >    final void ensureSync() {
1444          long c = eventCount;
1445          WaitQueueNode q;
1446          while ((q = syncStack) != null && q.count < c) {
# Line 1395 | Line 1451 | public class ForkJoinPool extends Abstra
1451                  break;
1452              }
1453          }
1398        return c;
1454      }
1455  
1456      /**
# Line 1410 | Line 1465 | public class ForkJoinPool extends Abstra
1465      /**
1466       * Signals threads waiting to poll a task. Because method sync
1467       * rechecks availability, it is OK to only proceed if queue
1468 <     * appears to be non-empty, and OK to skip under contention to
1469 <     * increment count (since some other thread succeeded).
1468 >     * appears to be non-empty, and OK if CAS to increment count
1469 >     * fails (since some other thread succeeded).
1470       */
1471      final void signalWork() {
1472 <        long c;
1473 <        WaitQueueNode q;
1474 <        if (syncStack != null &&
1475 <            casEventCount(c = eventCount, c+1) &&
1476 <            (((q = syncStack) != null && q.count <= c) &&
1477 <             (!casBarrierStack(q, q.next) || !q.signal())))
1478 <            ensureSync();
1472 >        if (syncStack != null) {
1473 >            long c;
1474 >            casEventCount(c = eventCount, c+1);
1475 >            WaitQueueNode q = syncStack;
1476 >            if (q != null && q.count <= c &&
1477 >                (!casBarrierStack(q, q.next) || !q.signal()))
1478 >                ensureSync();
1479 >        }
1480      }
1481  
1482      /**
# Line 1433 | Line 1489 | public class ForkJoinPool extends Abstra
1489      final void sync(ForkJoinWorkerThread w) {
1490          updateStealCount(w); // Transfer w's count while it is idle
1491  
1492 <        while (!w.isShutdown() && !isTerminating() && !suspendIfSpare(w)) {
1492 >        if (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1493              long prev = w.lastEventCount;
1494              WaitQueueNode node = null;
1495              WaitQueueNode h;
1496 +            boolean helpSignal = false;
1497              while (eventCount == prev &&
1498                     ((h = syncStack) == null || h.count == prev)) {
1499                  if (node == null)
1500                      node = new WaitQueueNode(prev, w);
1501                  if (casBarrierStack(node.next = h, node)) {
1502 <                    node.awaitSyncRelease(this);
1502 >                    if (!Thread.interrupted() && node.thread != null &&
1503 >                        eventCount == prev) {
1504 >                        if (h == null && // cover signalWork race
1505 >                            ForkJoinWorkerThread.hasQueuedTasks(workers))
1506 >                            helpSignal = true;
1507 >                        else
1508 >                            LockSupport.park(this);
1509 >                    }
1510 >                    if (node.thread != null)
1511 >                        node.thread = null;
1512                      break;
1513                  }
1514              }
1515 <            long ec = ensureSync();
1516 <            if (ec != prev) {
1515 >            long ec = eventCount;
1516 >            if (ec != prev)
1517                  w.lastEventCount = ec;
1518 <                break;
1519 <            }
1518 >            else if (helpSignal)
1519 >                casEventCount(ec, ec + 1);
1520 >            ensureSync();
1521          }
1522      }
1523  
1457    /**
1458     * Returns {@code true} if worker waiting on sync can proceed:
1459     *  - on signal (thread == null)
1460     *  - on event count advance (winning race to notify vs signaller)
1461     *  - on interrupt
1462     *  - if the first queued node, we find work available
1463     * If node was not signalled and event count not advanced on exit,
1464     * then we also help advance event count.
1465     *
1466     * @return {@code true} if node can be released
1467     */
1468    final boolean syncIsReleasable(WaitQueueNode node) {
1469        long prev = node.count;
1470        if (!Thread.interrupted() && node.thread != null &&
1471            (node.next != null ||
1472             !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1473            eventCount == prev)
1474            return false;
1475        if (node.thread != null) {
1476            node.thread = null;
1477            long ec = eventCount;
1478            if (prev <= ec) // help signal
1479                casEventCount(ec, ec+1);
1480        }
1481        return true;
1482    }
1524  
1525      /**
1526       * Returns {@code true} if a new sync event occurred since last
# Line 1487 | Line 1528 | public class ForkJoinPool extends Abstra
1528       */
1529      final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1530          long lc = w.lastEventCount;
1531 <        long ec = ensureSync();
1532 <        if (ec == lc)
1533 <            return false;
1534 <        w.lastEventCount = ec;
1535 <        return true;
1531 >        long ec = eventCount;
1532 >        if (lc != ec)
1533 >            w.lastEventCount = ec;
1534 >        ensureSync();
1535 >        return lc != ec || lc != eventCount;
1536      }
1537  
1538      //  Parallelism maintenance
# Line 1533 | Line 1574 | public class ForkJoinPool extends Abstra
1574          while (spareStack == null || !tryResumeSpare(dec)) {
1575              int counts = workerCounts;
1576              if (dec || (dec = casWorkerCounts(counts, --counts))) {
1536                // CAS cheat
1577                  if (!needSpare(counts, maintainParallelism))
1578                      break;
1579                  if (joinMe.status < 0)
# Line 1638 | Line 1678 | public class ForkJoinPool extends Abstra
1678              for (k = 0; k < len && ws[k] != null; ++k)
1679                  ;
1680          }
1681 <        if (k < len && !isTerminating() && (w = createWorker(k)) != null) {
1681 >        if (k < len && isProcessingTasks() && (w = createWorker(k)) != null) {
1682              ws[k] = w;
1683              w.start();
1684          }
# Line 1658 | Line 1698 | public class ForkJoinPool extends Abstra
1698       */
1699      private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1700          WaitQueueNode node = null;
1701 <        int s;
1702 <        while (parallelism < runningCountOf(s = workerCounts)) {
1701 >        for (;;) {
1702 >            int p = parallelism;
1703 >            int s = workerCounts;
1704 >            int r = runningCountOf(s);
1705 >            int t = totalCountOf(s);
1706 >            // use t as bound if r transiently out of sync
1707 >            if (t <= p || r <= p)
1708 >                return false; // not a spare
1709              if (node == null)
1710                  node = new WaitQueueNode(0, w);
1711 <            if (casWorkerCounts(s, s-1)) { // representation-dependent
1712 <                // push onto stack
1667 <                do {} while (!casSpareStack(node.next = spareStack, node));
1668 <                // block until released by resumeSpare
1669 <                node.awaitSpareRelease();
1670 <                return true;
1671 <            }
1711 >            if (casWorkerCounts(s, workerCountsFor(t, r - 1)))
1712 >                break;
1713          }
1714 <        return false;
1714 >        // push onto stack
1715 >        do {} while (!casSpareStack(node.next = spareStack, node));
1716 >        // block until released by resumeSpare
1717 >        while (!Thread.interrupted() && node.thread != null)
1718 >            LockSupport.park(this);
1719 >        return true;
1720      }
1721  
1722      /**
# Line 1742 | Line 1788 | public class ForkJoinPool extends Abstra
1788       * Method {@code isReleasable} must return {@code true} if
1789       * blocking is not necessary. Method {@code block} blocks the
1790       * current thread if necessary (perhaps internally invoking
1791 <     * {@code isReleasable} before actually blocking.).
1791 >     * {@code isReleasable} before actually blocking).
1792       *
1793       * <p>For example, here is a ManagedBlocker based on a
1794       * ReentrantLock:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines