ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.1 by dl, Tue Jan 6 14:30:31 2009 UTC vs.
Revision 1.4 by dl, Mon Jan 12 17:16:18 2009 UTC

# Line 13 | Line 13 | import sun.misc.Unsafe;
13   import java.lang.reflect.*;
14  
15   /**
16 < * Host for a group of ForkJoinWorkerThreads.  A ForkJoinPool provides
17 < * the entry point for tasks submitted from non-ForkJoinTasks, as well
18 < * as management and monitoring operations.  Normally a single
19 < * ForkJoinPool is used for a large number of submitted
20 < * tasks. Otherwise, use would not usually outweigh the construction
21 < * and bookkeeping overhead of creating a large set of threads.
16 > * An {@link ExecutorService} for running {@link ForkJoinTask}s.  A
17 > * ForkJoinPool provides the entry point for submissions from
18 > * non-ForkJoinTasks, as well as management and monitoring operations.
19 > * Normally a single ForkJoinPool is used for a large number of
20 > * submitted tasks. Otherwise, use would not usually outweigh the
21 > * construction and bookkeeping overhead of creating a large set of
22 > * threads.
23   *
24 < * <p>ForkJoinPools differ from other kinds of Executor mainly in that
25 < * they provide <em>work-stealing</em>: all threads in the pool
24 > * <p>ForkJoinPools differ from other kinds of Executors mainly in
25 > * that they provide <em>work-stealing</em>: all threads in the pool
26   * attempt to find and execute subtasks created by other active tasks
27   * (eventually blocking if none exist). This makes them efficient when
28 < * most tasks spawn other subtasks (as do most ForkJoinTasks) but
29 < * possibly less so otherwise. It is however fine to combine execution
30 < * of some plain Runnable- or Callable- based activities with that of
31 < * ForkJoinTasks.
28 > * most tasks spawn other subtasks (as do most ForkJoinTasks), as well
29 > * as the mixed execution of some plain Runnable- or Callable- based
30 > * activities along with ForkJoinTasks. Otherwise, other
31 > * ExecutorService implementations are typically more appropriate
32 > * choices.
33   *
34   * <p>A ForkJoinPool may be constructed with a given parallelism level
35   * (target pool size), which it attempts to maintain by dynamically
36 < * adding, suspending, or resuming threads, even if some tasks have
37 < * blocked waiting to join others. However, no such adjustments are
38 < * performed in the face of blocked IO or other unmanaged
39 < * synchronization. The nested ManagedBlocker interface enables
40 < * extension of the kinds of synchronization accommodated.
41 < *
42 < * <p>The target parallelism level may also be set dynamically. You
43 < * can limit the number of threads dynamically constructed using
44 < * method <tt>setMaximumPoolSize</tt> and/or
43 < * <tt>setMaintainParallelism</tt>.
36 > * adding, suspending, or resuming threads, even if some tasks are
37 > * waiting to join others. However, no such adjustments are performed
38 > * in the face of blocked IO or other unmanaged synchronization. The
39 > * nested <code>ManagedBlocker</code> interface enables extension of
40 > * the kinds of synchronization accommodated.  The target parallelism
41 > * level may also be changed dynamically (<code>setParallelism</code>)
42 > * and dynamically thread construction can be limited using methods
43 > * <code>setMaximumPoolSize</code> and/or
44 > * <code>setMaintainsParallelism</code>.
45   *
46   * <p>In addition to execution and lifecycle control methods, this
47   * class provides status check methods (for example
48 < * <tt>getStealCount</tt>) that are intended to aid in developing,
48 > * <code>getStealCount</code>) that are intended to aid in developing,
49   * tuning, and monitoring fork/join applications. Also, method
50 < * <tt>toString</tt> returns indications of pool state in a convenient
51 < * form for informal monitoring.
50 > * <code>toString</code> returns indications of pool state in a
51 > * convenient form for informal monitoring.
52   *
53   * <p><b>Implementation notes</b>: This implementation restricts the
54 < * maximum parallelism to 32767. Attempts to create pools with greater
55 < * than the maximum result in IllegalArgumentExceptions.
54 > * maximum number of running threads to 32767. Attempts to create
55 > * pools with greater than the maximum result in
56 > * IllegalArgumentExceptions.
57   */
58 < public class ForkJoinPool extends AbstractExecutorService
57 <    implements ExecutorService {
58 > public class ForkJoinPool extends AbstractExecutorService {
59  
60      /*
61       * See the extended comments interspersed below for design,
# Line 87 | Line 88 | public class ForkJoinPool extends Abstra
88       * Default ForkJoinWorkerThreadFactory implementation, creates a
89       * new ForkJoinWorkerThread.
90       */
91 <    public static class  DefaultForkJoinWorkerThreadFactory
91 >    static class  DefaultForkJoinWorkerThreadFactory
92          implements ForkJoinWorkerThreadFactory {
93          public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
94              try {
# Line 99 | Line 100 | public class ForkJoinPool extends Abstra
100      }
101  
102      /**
103 <     * The default ForkJoinWorkerThreadFactory, used unless overridden
104 <     * in ForkJoinPool constructors.
103 >     * Creates a new ForkJoinWorkerThread. This factory is used unless
104 >     * overridden in ForkJoinPool constructors.
105       */
106 <    private static final DefaultForkJoinWorkerThreadFactory
106 >    public static final ForkJoinWorkerThreadFactory
107          defaultForkJoinWorkerThreadFactory =
108          new DefaultForkJoinWorkerThreadFactory();
109  
109
110      /**
111       * Permission required for callers of methods that may start or
112       * kill threads.
# Line 181 | Line 181 | public class ForkJoinPool extends Abstra
181      /**
182       * Head of Treiber stack for barrier sync. See below for explanation
183       */
184 <    private volatile WaitQueueNode barrierStack;
184 >    private volatile WaitQueueNode syncStack;
185  
186      /**
187       * The count for event barrier
# Line 264 | Line 264 | public class ForkJoinPool extends Abstra
264      private static int runControlFor(int r, int a)   { return (r << 16) + a; }
265  
266      /**
267 <     * Increment active count. Called by workers before/during
268 <     * executing tasks.
267 >     * Try incrementing active count; fail on contention. Called by
268 >     * workers before/during executing tasks.
269 >     * @return true on success;
270       */
271 <    final void incrementActiveCount() {
272 <        int c;
273 <        do;while (!casRunControl(c = runControl, c+1));
271 >    final boolean tryIncrementActiveCount() {
272 >        int c = runControl;
273 >        return casRunControl(c, c+1);
274      }
275  
276      /**
277 <     * Decrement active count; possibly trigger termination.
277 >     * Try decrementing active count; fail on contention.
278 >     * Possibly trigger termination on success
279       * Called by workers when they can't find tasks.
280 +     * @return true on success
281       */
282 <    final void decrementActiveCount() {
283 <        int c, nextc;
284 <        do;while (!casRunControl(c = runControl, nextc = c-1));
282 >    final boolean tryDecrementActiveCount() {
283 >        int c = runControl;
284 >        int nextc = c - 1;
285 >        if (!casRunControl(c, nextc))
286 >            return false;
287          if (canTerminateOnShutdown(nextc))
288              terminateOnShutdown();
289 +        return true;
290      }
291  
292      /**
# Line 320 | Line 326 | public class ForkJoinPool extends Abstra
326       * @throws SecurityException if a security manager exists and
327       *         the caller is not permitted to modify threads
328       *         because it does not hold {@link
329 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
329 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
330       */
331      public ForkJoinPool() {
332          this(Runtime.getRuntime().availableProcessors(),
# Line 336 | Line 342 | public class ForkJoinPool extends Abstra
342       * @throws SecurityException if a security manager exists and
343       *         the caller is not permitted to modify threads
344       *         because it does not hold {@link
345 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
345 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
346       */
347      public ForkJoinPool(int parallelism) {
348          this(parallelism, defaultForkJoinWorkerThreadFactory);
349      }
350  
351      /**
352 <     * Creates a ForkJoinPool with a pool size equal to the number of
352 >     * Creates a ForkJoinPool with parallelism equal to the number of
353       * processors available on the system and using the given
354       * ForkJoinWorkerThreadFactory,
355       * @param factory the factory for creating new threads
# Line 351 | Line 357 | public class ForkJoinPool extends Abstra
357       * @throws SecurityException if a security manager exists and
358       *         the caller is not permitted to modify threads
359       *         because it does not hold {@link
360 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
360 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
361       */
362      public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
363          this(Runtime.getRuntime().availableProcessors(), factory);
364      }
365  
366      /**
367 <     * Creates a ForkJoinPool with the indicated target number of
362 <     * worker threads and the given factory.
367 >     * Creates a ForkJoinPool with the given parallelism and factory.
368       *
369       * @param parallelism the targeted number of worker threads
370       * @param factory the factory for creating new threads
# Line 369 | Line 374 | public class ForkJoinPool extends Abstra
374       * @throws SecurityException if a security manager exists and
375       *         the caller is not permitted to modify threads
376       *         because it does not hold {@link
377 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
377 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
378       */
379      public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
380          if (parallelism <= 0 || parallelism > MAX_THREADS)
# Line 500 | Line 505 | public class ForkJoinPool extends Abstra
505          }
506      }
507  
503    /**
504     * Sets the handler for internal worker threads that terminate due
505     * to unrecoverable errors encountered while executing tasks.
506     * Unless set, the current default or ThreadGroup handler is used
507     * as handler.
508     *
509     * @param h the new handler
510     * @return the old handler, or null if none
511     * @throws SecurityException if a security manager exists and
512     *         the caller is not permitted to modify threads
513     *         because it does not hold {@link
514     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
515     */
516    public Thread.UncaughtExceptionHandler
517        setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
518        checkPermission();
519        Thread.UncaughtExceptionHandler old = null;
520        final ReentrantLock lock = this.workerLock;
521        lock.lock();
522        try {
523            old = ueh;
524            ueh = h;
525            ForkJoinWorkerThread[] ws = workers;
526            for (int i = 0; i < ws.length; ++i) {
527                ForkJoinWorkerThread w = ws[i];
528                if (w != null)
529                    w.setUncaughtExceptionHandler(h);
530            }
531        } finally {
532            lock.unlock();
533        }
534        return old;
535    }
536
537    /**
538     * Returns the handler for internal worker threads that terminate
539     * due to unrecoverable errors encountered while executing tasks.
540     * @return the handler, or null if none
541     */
542    public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
543        Thread.UncaughtExceptionHandler h;
544        final ReentrantLock lock = this.workerLock;
545        lock.lock();
546        try {
547            h = ueh;
548        } finally {
549            lock.unlock();
550        }
551        return h;
552    }
553
508      // Execution methods
509  
510      /**
# Line 560 | Line 514 | public class ForkJoinPool extends Abstra
514          if (isShutdown())
515              throw new RejectedExecutionException();
516          submissionQueue.offer(task);
517 <        signalIdleWorkers(true);
517 >        signalIdleWorkers();
518      }
519  
520      /**
# Line 609 | Line 563 | public class ForkJoinPool extends Abstra
563          return job;
564      }
565  
612    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
613        return new AdaptedRunnable(runnable, value);
614    }
615
616    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
617        return new AdaptedCallable(callable);
618    }
619
566      /**
567       * Adaptor for Runnables. This implements RunnableFuture
568       * to be compliant with AbstractExecutorService constraints
# Line 698 | Line 644 | public class ForkJoinPool extends Abstra
644      }
645  
646      /**
647 +     * Returns the handler for internal worker threads that terminate
648 +     * due to unrecoverable errors encountered while executing tasks.
649 +     * @return the handler, or null if none
650 +     */
651 +    public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
652 +        Thread.UncaughtExceptionHandler h;
653 +        final ReentrantLock lock = this.workerLock;
654 +        lock.lock();
655 +        try {
656 +            h = ueh;
657 +        } finally {
658 +            lock.unlock();
659 +        }
660 +        return h;
661 +    }
662 +
663 +    /**
664 +     * Sets the handler for internal worker threads that terminate due
665 +     * to unrecoverable errors encountered while executing tasks.
666 +     * Unless set, the current default or ThreadGroup handler is used
667 +     * as handler.
668 +     *
669 +     * @param h the new handler
670 +     * @return the old handler, or null if none
671 +     * @throws SecurityException if a security manager exists and
672 +     *         the caller is not permitted to modify threads
673 +     *         because it does not hold {@link
674 +     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
675 +     */
676 +    public Thread.UncaughtExceptionHandler
677 +        setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
678 +        checkPermission();
679 +        Thread.UncaughtExceptionHandler old = null;
680 +        final ReentrantLock lock = this.workerLock;
681 +        lock.lock();
682 +        try {
683 +            old = ueh;
684 +            ueh = h;
685 +            ForkJoinWorkerThread[] ws = workers;
686 +            for (int i = 0; i < ws.length; ++i) {
687 +                ForkJoinWorkerThread w = ws[i];
688 +                if (w != null)
689 +                    w.setUncaughtExceptionHandler(h);
690 +            }
691 +        } finally {
692 +            lock.unlock();
693 +        }
694 +        return old;
695 +    }
696 +
697 +
698 +    /**
699       * Sets the target paralleism level of this pool.
700       * @param parallelism the target parallelism
701       * @throws IllegalArgumentException if parallelism less than or
# Line 705 | Line 703 | public class ForkJoinPool extends Abstra
703       * @throws SecurityException if a security manager exists and
704       *         the caller is not permitted to modify threads
705       *         because it does not hold {@link
706 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
706 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
707       */
708      public void setParallelism(int parallelism) {
709          checkPermission();
# Line 725 | Line 723 | public class ForkJoinPool extends Abstra
723          } finally {
724              lock.unlock();
725          }
726 <        signalIdleWorkers(false);
726 >        signalIdleWorkers();
727      }
728  
729      /**
730       * Returns the targeted number of worker threads in this pool.
733     * This value does not necessarily reflect transient changes as
734     * threads are added, removed, or abruptly terminate.
731       *
732       * @return the targeted number of worker threads in this pool
733       */
# Line 742 | Line 738 | public class ForkJoinPool extends Abstra
738      /**
739       * Returns the number of worker threads that have started but not
740       * yet terminated.  This result returned by this method may differ
741 <     * from <tt>getParallelism</tt> when threads are created to
741 >     * from <code>getParallelism</code> when threads are created to
742       * maintain parallelism when others are cooperatively blocked.
743       *
744       * @return the number of worker threads
# Line 797 | Line 793 | public class ForkJoinPool extends Abstra
793      }
794  
795      /**
796 <     * Returns the approximate number of worker threads that are not
797 <     * blocked waiting to join tasks or for other managed
796 >     * Returns an estimate of the number of worker threads that are
797 >     * not blocked waiting to join tasks or for other managed
798       * synchronization.
799       *
800       * @return the number of worker threads
# Line 808 | Line 804 | public class ForkJoinPool extends Abstra
804      }
805  
806      /**
807 <     * Returns the approximate number of threads that are currently
807 >     * Returns an estimate of the number of threads that are currently
808       * stealing or executing tasks. This method may overestimate the
809       * number of active threads.
810       * @return the number of active threads.
# Line 818 | Line 814 | public class ForkJoinPool extends Abstra
814      }
815  
816      /**
817 <     * Returns the approximate number of threads that are currently
817 >     * Returns an estimate of the number of threads that are currently
818       * idle waiting for tasks. This method may underestimate the
819       * number of idle threads.
820       * @return the number of idle threads.
# Line 867 | Line 863 | public class ForkJoinPool extends Abstra
863      }
864  
865      /**
866 <     * Returns the total number of tasks currently held in queues by
867 <     * worker threads (but not including tasks submitted to the pool
868 <     * that have not begun executing). This value is only an
869 <     * approximation, obtained by iterating across all threads in the
870 <     * pool. This method may be useful for tuning task granularities.
866 >     * Returns an estimate of the total number of tasks currently held
867 >     * in queues by worker threads (but not including tasks submitted
868 >     * to the pool that have not begun executing). This value is only
869 >     * an approximation, obtained by iterating across all threads in
870 >     * the pool. This method may be useful for tuning task
871 >     * granularities.
872       * @return the number of queued tasks.
873       */
874      public long getQueuedTaskCount() {
# Line 886 | Line 883 | public class ForkJoinPool extends Abstra
883      }
884  
885      /**
886 <     * Returns the approximate number tasks submitted to this pool
886 >     * Returns an estimate of the number tasks submitted to this pool
887       * that have not yet begun executing. This method takes time
888       * proportional to the number of submissions.
889       * @return the number of queued submissions.
# Line 898 | Line 895 | public class ForkJoinPool extends Abstra
895      /**
896       * Returns true if there are any tasks submitted to this pool
897       * that have not yet begun executing.
898 <     * @return <tt>true</tt> if there are any queued submissions.
898 >     * @return <code>true</code> if there are any queued submissions.
899       */
900      public boolean hasQueuedSubmissions() {
901          return !submissionQueue.isEmpty();
# Line 961 | Line 958 | public class ForkJoinPool extends Abstra
958       * @throws SecurityException if a security manager exists and
959       *         the caller is not permitted to modify threads
960       *         because it does not hold {@link
961 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
961 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
962       */
963      public void shutdown() {
964          checkPermission();
# Line 981 | Line 978 | public class ForkJoinPool extends Abstra
978       * @throws SecurityException if a security manager exists and
979       *         the caller is not permitted to modify threads
980       *         because it does not hold {@link
981 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
981 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
982       */
983      public List<Runnable> shutdownNow() {
984          checkPermission();
# Line 990 | Line 987 | public class ForkJoinPool extends Abstra
987      }
988  
989      /**
990 <     * Returns <tt>true</tt> if all tasks have completed following shut down.
990 >     * Returns <code>true</code> if all tasks have completed following shut down.
991       *
992 <     * @return <tt>true</tt> if all tasks have completed following shut down
992 >     * @return <code>true</code> if all tasks have completed following shut down
993       */
994      public boolean isTerminated() {
995          return runStateOf(runControl) == TERMINATED;
996      }
997  
998      /**
999 <     * Returns <tt>true</tt> if the process of termination has
999 >     * Returns <code>true</code> if the process of termination has
1000       * commenced but possibly not yet completed.
1001       *
1002 <     * @return <tt>true</tt> if terminating
1002 >     * @return <code>true</code> if terminating
1003       */
1004      public boolean isTerminating() {
1005          return runStateOf(runControl) >= TERMINATING;
1006      }
1007  
1008      /**
1009 <     * Returns <tt>true</tt> if this pool has been shut down.
1009 >     * Returns <code>true</code> if this pool has been shut down.
1010       *
1011 <     * @return <tt>true</tt> if this pool has been shut down
1011 >     * @return <code>true</code> if this pool has been shut down
1012       */
1013      public boolean isShutdown() {
1014          return runStateOf(runControl) >= SHUTDOWN;
# Line 1024 | Line 1021 | public class ForkJoinPool extends Abstra
1021       *
1022       * @param timeout the maximum time to wait
1023       * @param unit the time unit of the timeout argument
1024 <     * @return <tt>true</tt> if this executor terminated and
1025 <     *         <tt>false</tt> if the timeout elapsed before termination
1024 >     * @return <code>true</code> if this executor terminated and
1025 >     *         <code>false</code> if the timeout elapsed before termination
1026       * @throws InterruptedException if interrupted while waiting
1027       */
1028      public boolean awaitTermination(long timeout, TimeUnit unit)
# Line 1076 | Line 1073 | public class ForkJoinPool extends Abstra
1073          } finally {
1074              lock.unlock();
1075          }
1076 <        signalIdleWorkers(false);
1076 >        signalIdleWorkers();
1077      }
1078  
1079      /**
# Line 1086 | Line 1083 | public class ForkJoinPool extends Abstra
1083          if (transitionRunStateTo(TERMINATING)) {
1084              stopAllWorkers();
1085              resumeAllSpares();
1086 <            signalIdleWorkers(true);
1086 >            signalIdleWorkers();
1087              cancelQueuedSubmissions();
1088              cancelQueuedWorkerTasks();
1089              interruptUnterminatedWorkers();
1090 <            signalIdleWorkers(true); // resignal after interrupt
1090 >            signalIdleWorkers(); // resignal after interrupt
1091          }
1092      }
1093  
# Line 1174 | Line 1171 | public class ForkJoinPool extends Abstra
1171  
1172  
1173      /*
1174 <     * Nodes for event barrier to manage idle threads.
1174 >     * Nodes for event barrier to manage idle threads.  Queue nodes
1175 >     * are basic Treiber stack nodes, also used for spare stack.
1176       *
1177       * The event barrier has an event count and a wait queue (actually
1178       * a Treiber stack).  Workers are enabled to look for work when
1179 <     * the eventCount is incremented. If they fail to find some,
1180 <     * they may wait for next count. Synchronization events occur only
1181 <     * in enough contexts to maintain overall liveness:
1179 >     * the eventCount is incremented. If they fail to find work, they
1180 >     * may wait for next count. Upon release, threads help others wake
1181 >     * up.
1182 >     *
1183 >     * Synchronization events occur only in enough contexts to
1184 >     * maintain overall liveness:
1185       *
1186       *   - Submission of a new task to the pool
1187 <     *   - Creation or termination of a worker
1187 >     *   - Resizes or other changes to the workers array
1188       *   - pool termination
1189       *   - A worker pushing a task on an empty queue
1190       *
1191 <     * The last case (pushing a task) occurs often enough, and is
1192 <     * heavy enough compared to simple stack pushes to require some
1193 <     * special handling: Method signalNonEmptyWorkerQueue returns
1194 <     * without advancing count if the queue appears to be empty.  This
1195 <     * would ordinarily result in races causing some queued waiters
1196 <     * not to be woken up. To avoid this, a worker in sync
1197 <     * rescans for tasks after being enqueued if it was the first to
1198 <     * enqueue, and aborts the wait if finding one, also helping to
1199 <     * signal others. This works well because the worker has nothing
1200 <     * better to do anyway, and so might as well help alleviate the
1201 <     * overhead and contention on the threads actually doing work.
1202 <     *
1203 <     * Queue nodes are basic Treiber stack nodes, also used for spare
1204 <     * stack.
1191 >     * The case of pushing a task occurs often enough, and is heavy
1192 >     * enough compared to simple stack pushes, to require special
1193 >     * handling: Method signalWork returns without advancing count if
1194 >     * the queue appears to be empty.  This would ordinarily result in
1195 >     * races causing some queued waiters not to be woken up. To avoid
1196 >     * this, the first worker enqueued in method sync (see
1197 >     * syncIsReleasable) rescans for tasks after being enqueued, and
1198 >     * helps signal if any are found. This works well because the
1199 >     * worker has nothing better to do, and so might as well help
1200 >     * alleviate the overhead and contention on the threads actually
1201 >     * doing work.  Also, since event counts increments on task
1202 >     * availability exist to maintain liveness (rather than to force
1203 >     * refreshes etc), it is OK for callers to exit early if
1204 >     * contending with another signaller.
1205       */
1206      static final class WaitQueueNode {
1207          WaitQueueNode next; // only written before enqueued
1208          volatile ForkJoinWorkerThread thread; // nulled to cancel wait
1209          final long count; // unused for spare stack
1210 <        WaitQueueNode(ForkJoinWorkerThread w, long c) {
1210 >
1211 >        WaitQueueNode(long c, ForkJoinWorkerThread w) {
1212              count = c;
1213              thread = w;
1214          }
1215 <        final boolean signal() {
1215 >
1216 >        /**
1217 >         * Wake up waiter, returning false if known to already
1218 >         */
1219 >        boolean signal() {
1220              ForkJoinWorkerThread t = thread;
1221 +            if (t == null)
1222 +                return false;
1223              thread = null;
1224 <            if (t != null) {
1225 <                LockSupport.unpark(t);
1226 <                return true;
1224 >            LockSupport.unpark(t);
1225 >            return true;
1226 >        }
1227 >
1228 >        /**
1229 >         * Await release on sync
1230 >         */
1231 >        void awaitSyncRelease(ForkJoinPool p) {
1232 >            while (thread != null && !p.syncIsReleasable(this))
1233 >                LockSupport.park(this);
1234 >        }
1235 >
1236 >        /**
1237 >         * Await resumption as spare
1238 >         */
1239 >        void awaitSpareRelease() {
1240 >            while (thread != null) {
1241 >                if (!Thread.interrupted())
1242 >                    LockSupport.park(this);
1243              }
1220            return false;
1244          }
1245      }
1246  
1247      /**
1248 <     * Release at least one thread waiting for event count to advance,
1249 <     * if one exists. If initial attempt fails, release all threads.
1250 <     * @param all if false, at first try to only release one thread
1251 <     * @return current event
1248 >     * Ensures that no thread is waiting for count to advance from the
1249 >     * current value of eventCount read on entry to this method, by
1250 >     * releasing waiting threads if necessary.
1251 >     * @return the count
1252       */
1253 <    private long releaseIdleWorkers(boolean all) {
1254 <        long c;
1255 <        for (;;) {
1256 <            WaitQueueNode q = barrierStack;
1257 <            c = eventCount;
1235 <            long qc;
1236 <            if (q == null || (qc = q.count) >= c)
1237 <                break;
1238 <            if (!all) {
1239 <                if (casBarrierStack(q, q.next) && q.signal())
1240 <                    break;
1241 <                all = true;
1242 <            }
1243 <            else if (casBarrierStack(q, null)) {
1253 >    final long ensureSync() {
1254 >        long c = eventCount;
1255 >        WaitQueueNode q;
1256 >        while ((q = syncStack) != null && q.count < c) {
1257 >            if (casBarrierStack(q, null)) {
1258                  do {
1259 <                 q.signal();
1259 >                    q.signal();
1260                  } while ((q = q.next) != null);
1261                  break;
1262              }
# Line 1251 | Line 1265 | public class ForkJoinPool extends Abstra
1265      }
1266  
1267      /**
1268 <     * Returns current barrier event count
1255 <     * @return current barrier event count
1268 >     * Increments event count and releases waiting threads.
1269       */
1270 <    final long getEventCount() {
1258 <        long ec = eventCount;
1259 <        releaseIdleWorkers(true); // release to ensure accurate result
1260 <        return ec;
1261 <    }
1262 <
1263 <    /**
1264 <     * Increment event count and release at least one waiting thread,
1265 <     * if one exists (released threads will in turn wake up others).
1266 <     * @param all if true, try to wake up all
1267 <     */
1268 <    final void signalIdleWorkers(boolean all) {
1270 >    private void signalIdleWorkers() {
1271          long c;
1272          do;while (!casEventCount(c = eventCount, c+1));
1273 <        releaseIdleWorkers(all);
1273 >        ensureSync();
1274      }
1275  
1276      /**
1277 <     * Wake up threads waiting to steal a task. Because method
1278 <     * sync rechecks availability, it is OK to only proceed if
1279 <     * queue appears to be non-empty.
1277 >     * Signal threads waiting to poll a task. Because method sync
1278 >     * rechecks availability, it is OK to only proceed if queue
1279 >     * appears to be non-empty, and OK to skip under contention to
1280 >     * increment count (since some other thread succeeded).
1281       */
1282 <    final void signalNonEmptyWorkerQueue() {
1280 <        // If CAS fails another signaller must have succeeded
1282 >    final void signalWork() {
1283          long c;
1284 <        if (barrierStack != null && casEventCount(c = eventCount, c+1))
1285 <            releaseIdleWorkers(false);
1284 >        WaitQueueNode q;
1285 >        if (syncStack != null &&
1286 >            casEventCount(c = eventCount, c+1) &&
1287 >            (((q = syncStack) != null && q.count <= c) &&
1288 >             (!casBarrierStack(q, q.next) || !q.signal())))
1289 >            ensureSync();
1290      }
1291  
1292      /**
1293 <     * Waits until event count advances from count, or some thread is
1294 <     * waiting on a previous count, or there is stealable work
1295 <     * available. Help wake up others on release.
1293 >     * Waits until event count advances from last value held by
1294 >     * caller, or if excess threads, caller is resumed as spare, or
1295 >     * caller or pool is terminating. Updates caller's event on exit.
1296       * @param w the calling worker thread
1291     * @param prev previous value returned by sync (or 0)
1292     * @return current event count
1297       */
1298 <    final long sync(ForkJoinWorkerThread w, long prev) {
1299 <        updateStealCount(w);
1298 >    final void sync(ForkJoinWorkerThread w) {
1299 >        updateStealCount(w); // Transfer w's count while it is idle
1300  
1301 <        while (!w.isShutdown() && !isTerminating() &&
1302 <               (parallelism >= runningCountOf(workerCounts) ||
1299 <                !suspendIfSpare(w))) { // prefer suspend to waiting here
1301 >        while (!w.isShutdown() && !isTerminating() && !suspendIfSpare(w)) {
1302 >            long prev = w.lastEventCount;
1303              WaitQueueNode node = null;
1304 <            boolean queued = false;
1305 <            for (;;) {
1306 <                if (!queued) {
1307 <                    if (eventCount != prev)
1308 <                        break;
1309 <                    WaitQueueNode h = barrierStack;
1310 <                    if (h != null && h.count != prev)
1308 <                        break; // release below and maybe retry
1309 <                    if (node == null)
1310 <                        node = new WaitQueueNode(w, prev);
1311 <                    queued = casBarrierStack(node.next = h, node);
1312 <                }
1313 <                else if (Thread.interrupted() ||
1314 <                         node.thread == null ||
1315 <                         (node.next == null && w.prescan()) ||
1316 <                         eventCount != prev) {
1317 <                    node.thread = null;
1318 <                    if (eventCount == prev) // help trigger
1319 <                        casEventCount(prev, prev+1);
1304 >            WaitQueueNode h;
1305 >            while (eventCount == prev &&
1306 >                   ((h = syncStack) == null || h.count == prev)) {
1307 >                if (node == null)
1308 >                    node = new WaitQueueNode(prev, w);
1309 >                if (casBarrierStack(node.next = h, node)) {
1310 >                    node.awaitSyncRelease(this);
1311                      break;
1312                  }
1322                else
1323                    LockSupport.park(this);
1313              }
1314 +            long ec = ensureSync();
1315 +            if (ec != prev) {
1316 +                w.lastEventCount = ec;
1317 +                break;
1318 +            }
1319 +        }
1320 +    }
1321 +
1322 +    /**
1323 +     * Returns true if worker waiting on sync can proceed:
1324 +     *  - on signal (thread == null)
1325 +     *  - on event count advance (winning race to notify vs signaller)
1326 +     *  - on Interrupt
1327 +     *  - if the first queued node, we find work available
1328 +     * If node was not signalled and event count not advanced on exit,
1329 +     * then we also help advance event count.
1330 +     * @return true if node can be released
1331 +     */
1332 +    final boolean syncIsReleasable(WaitQueueNode node) {
1333 +        long prev = node.count;
1334 +        if (!Thread.interrupted() && node.thread != null &&
1335 +            (node.next != null ||
1336 +             !ForkJoinWorkerThread.hasQueuedTasks(workers)) &&
1337 +            eventCount == prev)
1338 +            return false;
1339 +        if (node.thread != null) {
1340 +            node.thread = null;
1341              long ec = eventCount;
1342 <            if (releaseIdleWorkers(false) != prev)
1343 <                return ec;
1342 >            if (prev <= ec) // help signal
1343 >                casEventCount(ec, ec+1);
1344          }
1345 <        return prev; // return old count if aborted
1345 >        return true;
1346 >    }
1347 >
1348 >    /**
1349 >     * Returns true if a new sync event occurred since last call to
1350 >     * sync or this method, if so, updating caller's count.
1351 >     */
1352 >    final boolean hasNewSyncEvent(ForkJoinWorkerThread w) {
1353 >        long lc = w.lastEventCount;
1354 >        long ec = ensureSync();
1355 >        if (ec == lc)
1356 >            return false;
1357 >        w.lastEventCount = ec;
1358 >        return true;
1359      }
1360  
1361      //  Parallelism maintenance
# Line 1417 | Line 1446 | public class ForkJoinPool extends Abstra
1446          return (tc < maxPoolSize &&
1447                  (rc == 0 || totalSurplus < 0 ||
1448                   (maintainParallelism &&
1449 <                  runningDeficit > totalSurplus && mayHaveQueuedWork())));
1450 <    }
1422 <
1423 <    /**
1424 <     * Returns true if at least one worker queue appears to be
1425 <     * nonempty. This is expensive but not often called. It is not
1426 <     * critical that this be accurate, but if not, more or fewer
1427 <     * running threads than desired might be maintained.
1428 <     */
1429 <    private boolean mayHaveQueuedWork() {
1430 <        ForkJoinWorkerThread[] ws = workers;
1431 <        int len = ws.length;
1432 <        ForkJoinWorkerThread v;
1433 <        for (int i = 0; i < len; ++i) {
1434 <            if ((v = ws[i]) != null && v.getRawQueueSize() > 0) {
1435 <                releaseIdleWorkers(false); // help wake up stragglers
1436 <                return true;
1437 <            }
1438 <        }
1439 <        return false;
1449 >                  runningDeficit > totalSurplus &&
1450 >                  ForkJoinWorkerThread.hasQueuedTasks(workers))));
1451      }
1452 <
1452 >    
1453      /**
1454       * Add a spare worker if lock available and no more than the
1455       * expected numbers of threads exist
# Line 1486 | Line 1497 | public class ForkJoinPool extends Abstra
1497              for (k = 0; k < len && ws[k] != null; ++k)
1498                  ;
1499          }
1500 <        if (k < len && (w = createWorker(k)) != null) {
1500 >        if (k < len && !isTerminating() && (w = createWorker(k)) != null) {
1501              ws[k] = w;
1502              w.start();
1503          }
1504          else
1505              updateWorkerCount(-1); // adjust on failure
1506 <        signalIdleWorkers(false);
1506 >        signalIdleWorkers();
1507      }
1508  
1509      /**
# Line 1508 | Line 1519 | public class ForkJoinPool extends Abstra
1519          int s;
1520          while (parallelism < runningCountOf(s = workerCounts)) {
1521              if (node == null)
1522 <                node = new WaitQueueNode(w, 0);
1522 >                node = new WaitQueueNode(0, w);
1523              if (casWorkerCounts(s, s-1)) { // representation-dependent
1524                  // push onto stack
1525                  do;while (!casSpareStack(node.next = spareStack, node));
1515
1526                  // block until released by resumeSpare
1527 <                while (node.thread != null) {
1518 <                    if (!Thread.interrupted())
1519 <                        LockSupport.park(this);
1520 <                }
1521 <                w.activate(); // help warm up
1527 >                node.awaitSpareRelease();
1528                  return true;
1529              }
1530          }
# Line 1544 | Line 1550 | public class ForkJoinPool extends Abstra
1550      }
1551  
1552      /**
1553 <     * Pop and resume all spare threads. Same idea as
1548 <     * releaseIdleWorkers.
1553 >     * Pop and resume all spare threads. Same idea as ensureSync.
1554       * @return true if any spares released
1555       */
1556      private boolean resumeAllSpares() {
# Line 1586 | Line 1591 | public class ForkJoinPool extends Abstra
1591      }
1592  
1593      /**
1589     * Returns approximate number of spares, just for diagnostics.
1590     */
1591    private int countSpares() {
1592        int sum = 0;
1593        for (WaitQueueNode q = spareStack; q != null; q = q.next)
1594            ++sum;
1595        return sum;
1596    }
1597
1598    /**
1594       * Interface for extending managed parallelism for tasks running
1595       * in ForkJoinPools. A ManagedBlocker provides two methods.
1596 <     * Method <tt>isReleasable</tt> must return true if blocking is not
1597 <     * necessary. Method <tt>block</tt> blocks the current thread
1596 >     * Method <code>isReleasable</code> must return true if blocking is not
1597 >     * necessary. Method <code>block</code> blocks the current thread
1598       * if necessary (perhaps internally invoking isReleasable before
1599       * actually blocking.).
1600       * <p>For example, here is a ManagedBlocker based on a
# Line 1642 | Line 1637 | public class ForkJoinPool extends Abstra
1637       * is a ForkJoinWorkerThread, this method possibly arranges for a
1638       * spare thread to be activated if necessary to ensure parallelism
1639       * while the current thread is blocked.  If
1640 <     * <tt>maintainParallelism</tt> is true and the pool supports it
1641 <     * (see <tt>getMaintainsParallelism</tt>), this method attempts to
1640 >     * <code>maintainParallelism</code> is true and the pool supports
1641 >     * it ({@link #getMaintainsParallelism}), this method attempts to
1642       * maintain the pool's nominal parallelism. Otherwise if activates
1643       * a thread only if necessary to avoid complete starvation. This
1644       * option may be preferable when blockages use timeouts, or are
# Line 1689 | Line 1684 | public class ForkJoinPool extends Abstra
1684          do;while (!blocker.isReleasable() && !blocker.block());
1685      }
1686  
1687 +    // AbstractExecutorService overrides
1688 +
1689 +    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1690 +        return new AdaptedRunnable(runnable, value);
1691 +    }
1692 +
1693 +    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1694 +        return new AdaptedCallable(callable);
1695 +    }
1696 +
1697  
1698      // Temporary Unsafe mechanics for preliminary release
1699  
# Line 1696 | Line 1701 | public class ForkJoinPool extends Abstra
1701      static final long eventCountOffset;
1702      static final long workerCountsOffset;
1703      static final long runControlOffset;
1704 <    static final long barrierStackOffset;
1704 >    static final long syncStackOffset;
1705      static final long spareStackOffset;
1706  
1707      static {
# Line 1714 | Line 1719 | public class ForkJoinPool extends Abstra
1719                  (ForkJoinPool.class.getDeclaredField("workerCounts"));
1720              runControlOffset = _unsafe.objectFieldOffset
1721                  (ForkJoinPool.class.getDeclaredField("runControl"));
1722 <            barrierStackOffset = _unsafe.objectFieldOffset
1723 <                (ForkJoinPool.class.getDeclaredField("barrierStack"));
1722 >            syncStackOffset = _unsafe.objectFieldOffset
1723 >                (ForkJoinPool.class.getDeclaredField("syncStack"));
1724              spareStackOffset = _unsafe.objectFieldOffset
1725                  (ForkJoinPool.class.getDeclaredField("spareStack"));
1726          } catch (Exception e) {
# Line 1736 | Line 1741 | public class ForkJoinPool extends Abstra
1741          return _unsafe.compareAndSwapObject(this, spareStackOffset, cmp, val);
1742      }
1743      private boolean casBarrierStack(WaitQueueNode cmp, WaitQueueNode val) {
1744 <        return _unsafe.compareAndSwapObject(this, barrierStackOffset, cmp, val);
1744 >        return _unsafe.compareAndSwapObject(this, syncStackOffset, cmp, val);
1745      }
1746   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines