ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.33 by dl, Fri Jul 31 16:27:08 2009 UTC vs.
Revision 1.51 by dl, Fri Dec 4 15:46:38 2009 UTC

# Line 21 | Line 21 | import java.util.concurrent.atomic.Atomi
21  
22   /**
23   * An {@link ExecutorService} for running {@link ForkJoinTask}s.
24 < * A ForkJoinPool provides the entry point for submissions from
25 < * non-ForkJoinTasks, as well as management and monitoring operations.
26 < * Normally a single ForkJoinPool is used for a large number of
27 < * submitted tasks. Otherwise, use would not usually outweigh the
28 < * construction and bookkeeping overhead of creating a large set of
29 < * threads.
24 > * A {@code ForkJoinPool} provides the entry point for submissions
25 > * from non-{@code ForkJoinTask}s, as well as management and
26 > * monitoring operations.
27   *
28 < * <p>ForkJoinPools differ from other kinds of Executors mainly in
29 < * that they provide <em>work-stealing</em>: all threads in the pool
30 < * attempt to find and execute subtasks created by other active tasks
31 < * (eventually blocking if none exist). This makes them efficient when
32 < * most tasks spawn other subtasks (as do most ForkJoinTasks), as well
33 < * as the mixed execution of some plain Runnable- or Callable- based
34 < * activities along with ForkJoinTasks. When setting {@linkplain
35 < * #setAsyncMode async mode}, a ForkJoinPool may also be appropriate
36 < * for use with fine-grained tasks that are never joined. Otherwise,
37 < * other ExecutorService implementations are typically more
38 < * appropriate choices.
28 > * <p>A {@code ForkJoinPool} differs from other kinds of {@link
29 > * ExecutorService} mainly by virtue of employing
30 > * <em>work-stealing</em>: all threads in the pool attempt to find and
31 > * execute subtasks created by other active tasks (eventually blocking
32 > * waiting for work if none exist). This enables efficient processing
33 > * when most tasks spawn other subtasks (as do most {@code
34 > * ForkJoinTask}s). A {@code ForkJoinPool} may also be used for mixed
35 > * execution of some plain {@code Runnable}- or {@code Callable}-
36 > * based activities along with {@code ForkJoinTask}s. When setting
37 > * {@linkplain #setAsyncMode async mode}, a {@code ForkJoinPool} may
38 > * also be appropriate for use with fine-grained tasks of any form
39 > * that are never joined. Otherwise, other {@code ExecutorService}
40 > * implementations are typically more appropriate choices.
41   *
42 < * <p>A ForkJoinPool may be constructed with a given parallelism level
43 < * (target pool size), which it attempts to maintain by dynamically
44 < * adding, suspending, or resuming threads, even if some tasks are
45 < * waiting to join others. However, no such adjustments are performed
46 < * in the face of blocked IO or other unmanaged synchronization. The
47 < * nested {@link ManagedBlocker} interface enables extension of
48 < * the kinds of synchronization accommodated.  The target parallelism
49 < * level may also be changed dynamically ({@link #setParallelism})
50 < * and thread construction can be limited using methods
51 < * {@link #setMaximumPoolSize} and/or
52 < * {@link #setMaintainsParallelism}.
42 > * <p>A {@code ForkJoinPool} is constructed with a given target
43 > * parallelism level; by default, equal to the number of available
44 > * processors. Unless configured otherwise via {@link
45 > * #setMaintainsParallelism}, the pool attempts to maintain this
46 > * number of active (or available) threads by dynamically adding,
47 > * suspending, or resuming internal worker threads, even if some tasks
48 > * are stalled waiting to join others. However, no such adjustments
49 > * are performed in the face of blocked IO or other unmanaged
50 > * synchronization. The nested {@link ManagedBlocker} interface
51 > * enables extension of the kinds of synchronization accommodated.
52 > * The target parallelism level may also be changed dynamically
53 > * ({@link #setParallelism}). The total number of threads may be
54 > * limited using method {@link #setMaximumPoolSize}, in which case it
55 > * may become possible for the activities of a pool to stall due to
56 > * the lack of available threads to process new tasks.
57   *
58   * <p>In addition to execution and lifecycle control methods, this
59   * class provides status check methods (for example
# Line 59 | Line 62 | import java.util.concurrent.atomic.Atomi
62   * {@link #toString} returns indications of pool state in a
63   * convenient form for informal monitoring.
64   *
65 + * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
66 + * used for all parallel task execution in a program or subsystem.
67 + * Otherwise, use would not usually outweigh the construction and
68 + * bookkeeping overhead of creating a large set of threads. For
69 + * example, a common pool could be used for the {@code SortTasks}
70 + * illustrated in {@link RecursiveAction}. Because {@code
71 + * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
72 + * daemon} mode, there is typically no need to explicitly {@link
73 + * #shutdown} such a pool upon program exit.
74 + *
75 + * <pre>
76 + * static final ForkJoinPool mainPool = new ForkJoinPool();
77 + * ...
78 + * public void sort(long[] array) {
79 + *   mainPool.invoke(new SortTask(array, 0, array.length));
80 + * }
81 + * </pre>
82 + *
83   * <p><b>Implementation notes</b>: This implementation restricts the
84   * maximum number of running threads to 32767. Attempts to create
85 < * pools with greater than the maximum result in
86 < * IllegalArgumentExceptions.
85 > * pools with greater than the maximum number result in
86 > * {@code IllegalArgumentException}.
87 > *
88 > * <p>This implementation rejects submitted tasks (that is, by throwing
89 > * {@link RejectedExecutionException}) only when the pool is shut down.
90   *
91   * @since 1.7
92   * @author Doug Lea
# Line 81 | Line 105 | public class ForkJoinPool extends Abstra
105      private static final int MAX_THREADS =  0x7FFF;
106  
107      /**
108 <     * Factory for creating new ForkJoinWorkerThreads.  A
109 <     * ForkJoinWorkerThreadFactory must be defined and used for
110 <     * ForkJoinWorkerThread subclasses that extend base functionality
111 <     * or initialize threads with different contexts.
108 >     * Factory for creating new {@link ForkJoinWorkerThread}s.
109 >     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
110 >     * for {@code ForkJoinWorkerThread} subclasses that extend base
111 >     * functionality or initialize threads with different contexts.
112       */
113      public static interface ForkJoinWorkerThreadFactory {
114          /**
115           * Returns a new worker thread operating in the given pool.
116           *
117           * @param pool the pool this thread works in
118 <         * @throws NullPointerException if pool is null
118 >         * @throws NullPointerException if the pool is null
119           */
120          public ForkJoinWorkerThread newThread(ForkJoinPool pool);
121      }
# Line 342 | Line 366 | public class ForkJoinPool extends Abstra
366      // Constructors
367  
368      /**
369 <     * Creates a ForkJoinPool with a pool size equal to the number of
370 <     * processors available on the system, using the default
371 <     * ForkJoinWorkerThreadFactory.
369 >     * Creates a {@code ForkJoinPool} with parallelism equal to {@link
370 >     * java.lang.Runtime#availableProcessors}, and using the {@linkplain
371 >     * #defaultForkJoinWorkerThreadFactory default thread factory}.
372       *
373       * @throws SecurityException if a security manager exists and
374       *         the caller is not permitted to modify threads
# Line 357 | Line 381 | public class ForkJoinPool extends Abstra
381      }
382  
383      /**
384 <     * Creates a ForkJoinPool with the indicated parallelism level
385 <     * threads and using the default ForkJoinWorkerThreadFactory.
384 >     * Creates a {@code ForkJoinPool} with the indicated parallelism
385 >     * level and using the {@linkplain
386 >     * #defaultForkJoinWorkerThreadFactory default thread factory}.
387       *
388 <     * @param parallelism the number of worker threads
388 >     * @param parallelism the parallelism level
389       * @throws IllegalArgumentException if parallelism less than or
390 <     * equal to zero
390 >     *         equal to zero, or greater than implementation limit
391       * @throws SecurityException if a security manager exists and
392       *         the caller is not permitted to modify threads
393       *         because it does not hold {@link
# Line 373 | Line 398 | public class ForkJoinPool extends Abstra
398      }
399  
400      /**
401 <     * Creates a ForkJoinPool with parallelism equal to the number of
402 <     * processors available on the system and using the given
403 <     * ForkJoinWorkerThreadFactory.
401 >     * Creates a {@code ForkJoinPool} with parallelism equal to {@link
402 >     * java.lang.Runtime#availableProcessors}, and using the given
403 >     * thread factory.
404       *
405       * @param factory the factory for creating new threads
406 <     * @throws NullPointerException if factory is null
406 >     * @throws NullPointerException if the factory is null
407       * @throws SecurityException if a security manager exists and
408       *         the caller is not permitted to modify threads
409       *         because it does not hold {@link
# Line 389 | Line 414 | public class ForkJoinPool extends Abstra
414      }
415  
416      /**
417 <     * Creates a ForkJoinPool with the given parallelism and factory.
417 >     * Creates a {@code ForkJoinPool} with the given parallelism and
418 >     * thread factory.
419       *
420 <     * @param parallelism the targeted number of worker threads
420 >     * @param parallelism the parallelism level
421       * @param factory the factory for creating new threads
422       * @throws IllegalArgumentException if parallelism less than or
423 <     * equal to zero, or greater than implementation limit
424 <     * @throws NullPointerException if factory is null
423 >     *         equal to zero, or greater than implementation limit
424 >     * @throws NullPointerException if the factory is null
425       * @throws SecurityException if a security manager exists and
426       *         the caller is not permitted to modify threads
427       *         because it does not hold {@link
# Line 423 | Line 449 | public class ForkJoinPool extends Abstra
449       * Creates a new worker thread using factory.
450       *
451       * @param index the index to assign worker
452 <     * @return new worker, or null of factory failed
452 >     * @return new worker, or null if factory failed
453       */
454      private ForkJoinWorkerThread createWorker(int index) {
455          Thread.UncaughtExceptionHandler h = ueh;
# Line 444 | Line 470 | public class ForkJoinPool extends Abstra
470       * Currently requires size to be a power of two.
471       */
472      private static int arraySizeFor(int poolSize) {
473 <        return (poolSize <= 1) ? 1 :
474 <            (1 << (32 - Integer.numberOfLeadingZeros(poolSize-1)));
473 >        if (poolSize <= 1)
474 >            return 1;
475 >        // See Hackers Delight, sec 3.2
476 >        int c = poolSize >= MAX_THREADS ? MAX_THREADS : (poolSize - 1);
477 >        c |= c >>>  1;
478 >        c |= c >>>  2;
479 >        c |= c >>>  4;
480 >        c |= c >>>  8;
481 >        c |= c >>> 16;
482 >        return c + 1;
483      }
484  
485      /**
# Line 492 | Line 526 | public class ForkJoinPool extends Abstra
526                  ws = workers;
527                  if (ws == null) {
528                      int ps = parallelism;
529 +                    updateWorkerCount(ps);
530                      ws = ensureWorkerArrayCapacity(ps);
531                      for (int i = 0; i < ps; ++i) {
532                          ForkJoinWorkerThread w = createWorker(i);
533                          if (w != null) {
534                              ws[i] = w;
535                              w.start();
501                            updateWorkerCount(1);
536                          }
537 +                        else
538 +                            updateWorkerCount(-1);
539                      }
540                  }
541              } finally {
# Line 563 | Line 599 | public class ForkJoinPool extends Abstra
599       *
600       * @param task the task
601       * @return the task's result
602 <     * @throws NullPointerException if task is null
603 <     * @throws RejectedExecutionException if pool is shut down
602 >     * @throws NullPointerException if the task is null
603 >     * @throws RejectedExecutionException if the task cannot be
604 >     *         scheduled for execution
605       */
606      public <T> T invoke(ForkJoinTask<T> task) {
607          doSubmit(task);
# Line 575 | Line 612 | public class ForkJoinPool extends Abstra
612       * Arranges for (asynchronous) execution of the given task.
613       *
614       * @param task the task
615 <     * @throws NullPointerException if task is null
616 <     * @throws RejectedExecutionException if pool is shut down
615 >     * @throws NullPointerException if the task is null
616 >     * @throws RejectedExecutionException if the task cannot be
617 >     *         scheduled for execution
618       */
619 <    public <T> void execute(ForkJoinTask<T> task) {
619 >    public void execute(ForkJoinTask<?> task) {
620          doSubmit(task);
621      }
622  
623      // AbstractExecutorService methods
624  
625 +    /**
626 +     * @throws NullPointerException if the task is null
627 +     * @throws RejectedExecutionException if the task cannot be
628 +     *         scheduled for execution
629 +     */
630      public void execute(Runnable task) {
631          ForkJoinTask<?> job;
632          if (task instanceof ForkJoinTask<?>) // avoid re-wrap
# Line 593 | Line 636 | public class ForkJoinPool extends Abstra
636          doSubmit(job);
637      }
638  
639 +    /**
640 +     * @throws NullPointerException if the task is null
641 +     * @throws RejectedExecutionException if the task cannot be
642 +     *         scheduled for execution
643 +     */
644      public <T> ForkJoinTask<T> submit(Callable<T> task) {
645          ForkJoinTask<T> job = ForkJoinTask.adapt(task);
646          doSubmit(job);
647          return job;
648      }
649  
650 +    /**
651 +     * @throws NullPointerException if the task is null
652 +     * @throws RejectedExecutionException if the task cannot be
653 +     *         scheduled for execution
654 +     */
655      public <T> ForkJoinTask<T> submit(Runnable task, T result) {
656          ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
657          doSubmit(job);
658          return job;
659      }
660  
661 +    /**
662 +     * @throws NullPointerException if the task is null
663 +     * @throws RejectedExecutionException if the task cannot be
664 +     *         scheduled for execution
665 +     */
666      public ForkJoinTask<?> submit(Runnable task) {
667          ForkJoinTask<?> job;
668          if (task instanceof ForkJoinTask<?>) // avoid re-wrap
# Line 620 | Line 678 | public class ForkJoinPool extends Abstra
678       *
679       * @param task the task to submit
680       * @return the task
681 +     * @throws NullPointerException if the task is null
682       * @throws RejectedExecutionException if the task cannot be
683       *         scheduled for execution
625     * @throws NullPointerException if the task is null
684       */
685      public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
686          doSubmit(task);
# Line 630 | Line 688 | public class ForkJoinPool extends Abstra
688      }
689  
690  
691 +    /**
692 +     * @throws NullPointerException       {@inheritDoc}
693 +     * @throws RejectedExecutionException {@inheritDoc}
694 +     */
695      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
696          ArrayList<ForkJoinTask<T>> forkJoinTasks =
697              new ArrayList<ForkJoinTask<T>>(tasks.size());
# Line 736 | Line 798 | public class ForkJoinPool extends Abstra
798          final ReentrantLock lock = this.workerLock;
799          lock.lock();
800          try {
801 <            if (!isTerminating()) {
801 >            if (isProcessingTasks()) {
802                  int p = this.parallelism;
803                  this.parallelism = parallelism;
804 <                if (parallelism > p)
805 <                    createAndStartAddedWorkers();
806 <                else
807 <                    trimSpares();
804 >                if (workers != null) {
805 >                    if (parallelism > p)
806 >                        createAndStartAddedWorkers();
807 >                    else
808 >                        trimSpares();
809 >                }
810              }
811          } finally {
812              lock.unlock();
# Line 751 | Line 815 | public class ForkJoinPool extends Abstra
815      }
816  
817      /**
818 <     * Returns the targeted number of worker threads in this pool.
818 >     * Returns the targeted parallelism level of this pool.
819       *
820 <     * @return the targeted number of worker threads in this pool
820 >     * @return the targeted parallelism level of this pool
821       */
822      public int getParallelism() {
823          return parallelism;
# Line 773 | Line 837 | public class ForkJoinPool extends Abstra
837  
838      /**
839       * Returns the maximum number of threads allowed to exist in the
840 <     * pool, even if there are insufficient unblocked running threads.
840 >     * pool. Unless set using {@link #setMaximumPoolSize}, the
841 >     * maximum is an implementation-defined value designed only to
842 >     * prevent runaway growth.
843       *
844       * @return the maximum
845       */
# Line 783 | Line 849 | public class ForkJoinPool extends Abstra
849  
850      /**
851       * Sets the maximum number of threads allowed to exist in the
852 <     * pool, even if there are insufficient unblocked running threads.
853 <     * Setting this value has no effect on current pool size. It
854 <     * controls construction of new threads.
852 >     * pool. The given value should normally be greater than or equal
853 >     * to the {@link #getParallelism parallelism} level. Setting this
854 >     * value has no effect on current pool size. It controls
855 >     * construction of new threads.
856       *
857 <     * @throws IllegalArgumentException if negative or greater then
857 >     * @throws IllegalArgumentException if negative or greater than
858       * internal implementation limit
859       */
860      public void setMaximumPoolSize(int newMax) {
# Line 955 | Line 1022 | public class ForkJoinPool extends Abstra
1022      }
1023  
1024      /**
1025 <     * Returns an estimate of the number tasks submitted to this pool
1026 <     * that have not yet begun executing. This method takes time
1025 >     * Returns an estimate of the number of tasks submitted to this
1026 >     * pool that have not yet begun executing.  This method takes time
1027       * proportional to the number of submissions.
1028       *
1029       * @return the number of queued submissions
# Line 990 | Line 1057 | public class ForkJoinPool extends Abstra
1057       * Removes all available unexecuted submitted and forked tasks
1058       * from scheduling queues and adds them to the given collection,
1059       * without altering their execution status. These may include
1060 <     * artificially generated or wrapped tasks. This method is designed
1061 <     * to be invoked only when the pool is known to be
1060 >     * artificially generated or wrapped tasks. This method is
1061 >     * designed to be invoked only when the pool is known to be
1062       * quiescent. Invocations at other times may not remove all
1063       * tasks. A failure encountered while attempting to add elements
1064       * to collection {@code c} may result in elements being in
# Line 1043 | Line 1110 | public class ForkJoinPool extends Abstra
1110      }
1111  
1112      private static String runStateToString(int rs) {
1113 <        switch(rs) {
1113 >        switch (rs) {
1114          case RUNNING: return "Running";
1115          case SHUTDOWN: return "Shutting down";
1116          case TERMINATING: return "Terminating";
# Line 1088 | Line 1155 | public class ForkJoinPool extends Abstra
1155      }
1156  
1157      /**
1158 <     * Attempts to stop all actively executing tasks, and cancels all
1159 <     * waiting tasks.  Tasks that are in the process of being
1160 <     * submitted or executed concurrently during the course of this
1161 <     * method may or may not be rejected. Unlike some other executors,
1162 <     * this method cancels rather than collects non-executed tasks
1163 <     * upon termination, so always returns an empty list. However, you
1164 <     * can use method {@link #drainTasksTo} before invoking this
1165 <     * method to transfer unexecuted tasks to another collection.
1158 >     * Attempts to cancel and/or stop all tasks, and reject all
1159 >     * subsequently submitted tasks.  Tasks that are in the process of
1160 >     * being submitted or executed concurrently during the course of
1161 >     * this method may or may not be rejected. This method cancels
1162 >     * both existing and unexecuted tasks, in order to permit
1163 >     * termination in the presence of task dependencies. So the method
1164 >     * always returns an empty list (unlike the case for some other
1165 >     * Executors).
1166       *
1167       * @return an empty list
1168       * @throws SecurityException if a security manager exists and
# Line 1120 | Line 1187 | public class ForkJoinPool extends Abstra
1187  
1188      /**
1189       * Returns {@code true} if the process of termination has
1190 <     * commenced but possibly not yet completed.
1190 >     * commenced but not yet completed.  This method may be useful for
1191 >     * debugging. A return of {@code true} reported a sufficient
1192 >     * period after shutdown may indicate that submitted tasks have
1193 >     * ignored or suppressed interruption, causing this executor not
1194 >     * to properly terminate.
1195       *
1196 <     * @return {@code true} if terminating
1196 >     * @return {@code true} if terminating but not yet terminated
1197       */
1198      public boolean isTerminating() {
1199 <        return runStateOf(runControl) >= TERMINATING;
1199 >        return runStateOf(runControl) == TERMINATING;
1200      }
1201  
1202      /**
# Line 1138 | Line 1209 | public class ForkJoinPool extends Abstra
1209      }
1210  
1211      /**
1212 +     * Returns true if pool is not terminating or terminated.
1213 +     * Used internally to suppress execution when terminating.
1214 +     */
1215 +    final boolean isProcessingTasks() {
1216 +        return runStateOf(runControl) < TERMINATING;
1217 +    }
1218 +
1219 +    /**
1220       * Blocks until all tasks have completed execution after a shutdown
1221       * request, or the timeout occurs, or the current thread is
1222       * interrupted, whichever happens first.
# Line 1191 | Line 1270 | public class ForkJoinPool extends Abstra
1270                      transitionRunStateTo(TERMINATED);
1271                      termination.signalAll();
1272                  }
1273 <                else if (!isTerminating()) {
1273 >                else if (isProcessingTasks()) {
1274                      tryShrinkWorkerArray();
1275                      tryResumeSpare(true); // allow replacement
1276                  }
# Line 1301 | Line 1380 | public class ForkJoinPool extends Abstra
1380          }
1381      }
1382  
1304
1383      /*
1384       * Nodes for event barrier to manage idle threads.  Queue nodes
1385       * are basic Treiber stack nodes, also used for spare stack.
# Line 1325 | Line 1403 | public class ForkJoinPool extends Abstra
1403       * handling: Method signalWork returns without advancing count if
1404       * the queue appears to be empty.  This would ordinarily result in
1405       * races causing some queued waiters not to be woken up. To avoid
1406 <     * this, the first worker enqueued in method sync (see
1407 <     * syncIsReleasable) rescans for tasks after being enqueued, and
1408 <     * helps signal if any are found. This works well because the
1409 <     * worker has nothing better to do, and so might as well help
1410 <     * alleviate the overhead and contention on the threads actually
1411 <     * doing work.  Also, since event counts increments on task
1412 <     * availability exist to maintain liveness (rather than to force
1413 <     * refreshes etc), it is OK for callers to exit early if
1336 <     * contending with another signaller.
1406 >     * this, the first worker enqueued in method sync rescans for
1407 >     * tasks after being enqueued, and helps signal if any are
1408 >     * found. This works well because the worker has nothing better to
1409 >     * do, and so might as well help alleviate the overhead and
1410 >     * contention on the threads actually doing work.  Also, since
1411 >     * event counts increments on task availability exist to maintain
1412 >     * liveness (rather than to force refreshes etc), it is OK for
1413 >     * callers to exit early if contending with another signaller.
1414       */
1415      static final class WaitQueueNode {
1416          WaitQueueNode next; // only written before enqueued
# Line 1346 | Line 1423 | public class ForkJoinPool extends Abstra
1423          }
1424  
1425          /**
1426 <         * Wakes up waiter, returning false if known to already
1426 >         * Wakes up waiter, returning false if known to already be awake
1427           */
1428          boolean signal() {
1429              ForkJoinWorkerThread t = thread;
# Line 1356 | Line 1433 | public class ForkJoinPool extends Abstra
1433              LockSupport.unpark(t);
1434              return true;
1435          }
1359
1360        /**
1361         * Awaits release on sync.
1362         */
1363        void awaitSyncRelease(ForkJoinPool p) {
1364            while (thread != null && !p.syncIsReleasable(this))
1365                LockSupport.park(this);
1366        }
1367
1368        /**
1369         * Awaits resumption as spare.
1370         */
1371        void awaitSpareRelease() {
1372            while (thread != null) {
1373                if (!Thread.interrupted())
1374                    LockSupport.park(this);
1375            }
1376        }
1436      }
1437  
1438      /**
1439       * Ensures that no thread is waiting for count to advance from the
1440       * current value of eventCount read on entry to this method, by
1441       * releasing waiting threads if necessary.
1383     *
1384     * @return the count
1442       */
1443 <    final long ensureSync() {
1443 >    final void ensureSync() {
1444          long c = eventCount;
1445          WaitQueueNode q;
1446          while ((q = syncStack) != null && q.count < c) {
# Line 1394 | Line 1451 | public class ForkJoinPool extends Abstra
1451                  break;
1452              }
1453          }
1397        return c;
1454      }
1455  
1456      /**
# Line 1409 | Line 1465 | public class ForkJoinPool extends Abstra
1465      /**
1466       * Signals threads waiting to poll a task. Because method sync
1467       * rechecks availability, it is OK to only proceed if queue
1468 <     * appears to be non-empty, and OK to skip under contention to
1469 <     * increment count (since some other thread succeeded).
1468 >     * appears to be non-empty, and OK if CAS to increment count
1469 >     * fails (since some other thread succeeded).
1470       */
1471      final void signalWork() {
1472 <        long c;
1473 <        WaitQueueNode q;
1474 <        if (syncStack != null &&
1475 <            casEventCount(c = eventCount, c+1) &&
1476 <            (((q = syncStack) != null && q.count <= c) &&
1477 <             (!casBarrierStack(q, q.next) || !q.signal())))
1478 <            ensureSync();
1472 >        if (syncStack != null) {
1473 >            long c;
1474 >            casEventCount(c = eventCount, c+1);
1475 >            WaitQueueNode q = syncStack;
1476 >            if (q != null && q.count <= c &&
1477 >                (!casBarrierStack(q, q.next) || !q.signal()))
1478 >                ensureSync();
1479 >        }
1480      }
1481  
1482      /**
# Line 1432 | Line 1489 | public class ForkJoinPool extends Abstra
1489      final void sync(ForkJoinWorkerThread w) {
1490          updateStealCount(w); // Transfer w's count while it is idle
1491  
1492 <        while (!w.isShutdown() && !isTerminating() && !suspendIfSpare(w)) {
1492 >        if (!w.isShutdown() && isProcessingTasks() && !suspendIfSpare(w)) {
1493              long prev = w.lastEventCount;
1494              WaitQueueNode node = null;
1495              WaitQueueNode h;
1496 <            while (eventCount == prev &&
1496 >            long ec;
1497 >            while ((ec = eventCount) == prev &&
1498                     ((h = syncStack) == null || h.count == prev)) {
1499                  if (node == null)
1500                      node = new WaitQueueNode(prev, w);
1501                  if (casBarrierStack(node.next = h, node)) {
1502 <                    node.awaitSyncRelease(this);
1502 >                    if (!Thread.interrupted() &&
1503 >                        node.thread != null &&
1504 >                        eventCount == prev &&
1505 >                        (h != null || // cover signalWork race
1506 >                         (!ForkJoinWorkerThread.hasQueuedTasks(workers) &&
1507 >                          eventCount == prev)))
1508 >                        LockSupport.park(this);
1509 >                    ec = eventCount;
1510 >                    if (node.thread != null) {
1511 >                        node.thread = null;
1512 >                        if (ec == prev)
1513 >                            casEventCount(prev, prev + 1); // help signal
1514 >                    }
1515                      break;
1516                  }
1517              }
1518 <            long ec = ensureSync();
1519 <            if (ec != prev) {
1450 <                w.lastEventCount = ec;
1451 <                break;
1452 <            }
1453 <        }
1454 <    }
1455 <
1456 <    /**
1457 <     * Returns {@code true} if worker waiting on sync can proceed:
1458 <     *  - on signal (thread == null)
1459 <     *  - on event count advance (winning race to notify vs signaller)
1460 <     *  - on interrupt
1461 <     *  - if the first queued node, we find work available
1462 <     * If node was not signalled and event count not advanced on exit,
1463 <     * then we also help advance event count.
1464 <     *
1465 <     * @return {@code true} if node can be released
1466 <     */
1467 <    final boolean syncIsReleasable(WaitQueueNode node) {
1468 <        long prev = node.count;
1469 <        if (!Thread.interrupted() && node.thread != null &&
1470 <            (node.next != null ||
1471 <             !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1472 <            eventCount == prev)
1473 <            return false;
1474 <        if (node.thread != null) {
1475 <            node.thread = null;
1476 <            long ec = eventCount;
1477 <            if (prev <= ec) // help signal
1478 <                casEventCount(ec, ec+1);
1518 >            w.lastEventCount = ec;
1519 >            ensureSync();
1520          }
1480        return true;
1521      }
1522  
1523      /**
# Line 1486 | Line 1526 | public class ForkJoinPool extends Abstra
1526       */
1527      final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1528          long lc = w.lastEventCount;
1529 <        long ec = ensureSync();
1530 <        if (ec == lc)
1531 <            return false;
1532 <        w.lastEventCount = ec;
1533 <        return true;
1529 >        long ec = eventCount;
1530 >        if (lc != ec)
1531 >            w.lastEventCount = ec;
1532 >        ensureSync();
1533 >        return lc != ec || lc != eventCount;
1534      }
1535  
1536      //  Parallelism maintenance
# Line 1532 | Line 1572 | public class ForkJoinPool extends Abstra
1572          while (spareStack == null || !tryResumeSpare(dec)) {
1573              int counts = workerCounts;
1574              if (dec || (dec = casWorkerCounts(counts, --counts))) {
1535                // CAS cheat
1575                  if (!needSpare(counts, maintainParallelism))
1576                      break;
1577                  if (joinMe.status < 0)
# Line 1637 | Line 1676 | public class ForkJoinPool extends Abstra
1676              for (k = 0; k < len && ws[k] != null; ++k)
1677                  ;
1678          }
1679 <        if (k < len && !isTerminating() && (w = createWorker(k)) != null) {
1679 >        if (k < len && isProcessingTasks() && (w = createWorker(k)) != null) {
1680              ws[k] = w;
1681              w.start();
1682          }
# Line 1657 | Line 1696 | public class ForkJoinPool extends Abstra
1696       */
1697      private boolean suspendIfSpare(ForkJoinWorkerThread w) {
1698          WaitQueueNode node = null;
1699 <        int s;
1700 <        while (parallelism < runningCountOf(s = workerCounts)) {
1699 >        for (;;) {
1700 >            int p = parallelism;
1701 >            int s = workerCounts;
1702 >            int r = runningCountOf(s);
1703 >            int t = totalCountOf(s);
1704 >            // use t as bound if r transiently out of sync
1705 >            if (t <= p || r <= p)
1706 >                return false; // not a spare
1707              if (node == null)
1708                  node = new WaitQueueNode(0, w);
1709 <            if (casWorkerCounts(s, s-1)) { // representation-dependent
1710 <                // push onto stack
1666 <                do {} while (!casSpareStack(node.next = spareStack, node));
1667 <                // block until released by resumeSpare
1668 <                node.awaitSpareRelease();
1669 <                return true;
1670 <            }
1709 >            if (casWorkerCounts(s, workerCountsFor(t, r - 1)))
1710 >                break;
1711          }
1712 <        return false;
1712 >        // push onto stack
1713 >        do {} while (!casSpareStack(node.next = spareStack, node));
1714 >        // block until released by resumeSpare
1715 >        while (!Thread.interrupted() && node.thread != null)
1716 >            LockSupport.park(this);
1717 >        return true;
1718      }
1719  
1720      /**
# Line 1735 | Line 1780 | public class ForkJoinPool extends Abstra
1780  
1781      /**
1782       * Interface for extending managed parallelism for tasks running
1783 <     * in ForkJoinPools. A ManagedBlocker provides two methods.
1783 >     * in {@link ForkJoinPool}s.
1784 >     *
1785 >     * <p>A {@code ManagedBlocker} provides two methods.
1786       * Method {@code isReleasable} must return {@code true} if
1787       * blocking is not necessary. Method {@code block} blocks the
1788       * current thread if necessary (perhaps internally invoking
1789 <     * {@code isReleasable} before actually blocking.).
1789 >     * {@code isReleasable} before actually blocking).
1790       *
1791       * <p>For example, here is a ManagedBlocker based on a
1792       * ReentrantLock:
# Line 1778 | Line 1825 | public class ForkJoinPool extends Abstra
1825  
1826      /**
1827       * Blocks in accord with the given blocker.  If the current thread
1828 <     * is a ForkJoinWorkerThread, this method possibly arranges for a
1829 <     * spare thread to be activated if necessary to ensure parallelism
1830 <     * while the current thread is blocked.  If
1831 <     * {@code maintainParallelism} is {@code true} and the pool supports
1832 <     * it ({@link #getMaintainsParallelism}), this method attempts to
1833 <     * maintain the pool's nominal parallelism. Otherwise it activates
1834 <     * a thread only if necessary to avoid complete starvation. This
1835 <     * option may be preferable when blockages use timeouts, or are
1836 <     * almost always brief.
1828 >     * is a {@link ForkJoinWorkerThread}, this method possibly
1829 >     * arranges for a spare thread to be activated if necessary to
1830 >     * ensure parallelism while the current thread is blocked.
1831 >     *
1832 >     * <p>If {@code maintainParallelism} is {@code true} and the pool
1833 >     * supports it ({@link #getMaintainsParallelism}), this method
1834 >     * attempts to maintain the pool's nominal parallelism. Otherwise
1835 >     * it activates a thread only if necessary to avoid complete
1836 >     * starvation. This option may be preferable when blockages use
1837 >     * timeouts, or are almost always brief.
1838       *
1839 <     * <p> If the caller is not a ForkJoinTask, this method is behaviorally
1840 <     * equivalent to
1839 >     * <p>If the caller is not a {@link ForkJoinTask}, this method is
1840 >     * behaviorally equivalent to
1841       *  <pre> {@code
1842       * while (!blocker.isReleasable())
1843       *   if (blocker.block())
1844       *     return;
1845       * }</pre>
1846 <     * If the caller is a ForkJoinTask, then the pool may first
1847 <     * be expanded to ensure parallelism, and later adjusted.
1846 >     *
1847 >     * If the caller is a {@code ForkJoinTask}, then the pool may
1848 >     * first be expanded to ensure parallelism, and later adjusted.
1849       *
1850       * @param blocker the blocker
1851       * @param maintainParallelism if {@code true} and supported by
# Line 1833 | Line 1882 | public class ForkJoinPool extends Abstra
1882      // implement RunnableFuture.
1883  
1884      protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1885 <        return (RunnableFuture<T>)ForkJoinTask.adapt(runnable, value);
1885 >        return (RunnableFuture<T>) ForkJoinTask.adapt(runnable, value);
1886      }
1887  
1888      protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1889 <        return (RunnableFuture<T>)ForkJoinTask.adapt(callable);
1889 >        return (RunnableFuture<T>) ForkJoinTask.adapt(callable);
1890      }
1891  
1892      // Unsafe mechanics

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines