ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.1 by dl, Tue Jan 6 14:30:31 2009 UTC vs.
Revision 1.2 by dl, Wed Jan 7 16:07:37 2009 UTC

# Line 13 | Line 13 | import sun.misc.Unsafe;
13   import java.lang.reflect.*;
14  
15   /**
16 < * Host for a group of ForkJoinWorkerThreads.  A ForkJoinPool provides
17 < * the entry point for tasks submitted from non-ForkJoinTasks, as well
18 < * as management and monitoring operations.  Normally a single
19 < * ForkJoinPool is used for a large number of submitted
20 < * tasks. Otherwise, use would not usually outweigh the construction
21 < * and bookkeeping overhead of creating a large set of threads.
16 > * An {@link ExecutorService} for running {@link ForkJoinTask}s.  A
17 > * ForkJoinPool provides the entry point for submissions from
18 > * non-ForkJoinTasks, as well as management and monitoring operations.
19 > * Normally a single ForkJoinPool is used for a large number of
20 > * submitted tasks. Otherwise, use would not usually outweigh the
21 > * construction and bookkeeping overhead of creating a large set of
22 > * threads.
23   *
24 < * <p>ForkJoinPools differ from other kinds of Executor mainly in that
25 < * they provide <em>work-stealing</em>: all threads in the pool
24 > * <p>ForkJoinPools differ from other kinds of Executors mainly in
25 > * that they provide <em>work-stealing</em>: all threads in the pool
26   * attempt to find and execute subtasks created by other active tasks
27   * (eventually blocking if none exist). This makes them efficient when
28 < * most tasks spawn other subtasks (as do most ForkJoinTasks) but
29 < * possibly less so otherwise. It is however fine to combine execution
30 < * of some plain Runnable- or Callable- based activities with that of
31 < * ForkJoinTasks.
28 > * most tasks spawn other subtasks (as do most ForkJoinTasks), as well
29 > * as the mixed execution of some plain Runnable- or Callable- based
30 > * activities along with ForkJoinTasks. Otherwise, other
31 > * ExecutorService implementations are typically more appropriate
32 > * choices.
33   *
34   * <p>A ForkJoinPool may be constructed with a given parallelism level
35   * (target pool size), which it attempts to maintain by dynamically
36 < * adding, suspending, or resuming threads, even if some tasks have
37 < * blocked waiting to join others. However, no such adjustments are
38 < * performed in the face of blocked IO or other unmanaged
39 < * synchronization. The nested ManagedBlocker interface enables
40 < * extension of the kinds of synchronization accommodated.
41 < *
42 < * <p>The target parallelism level may also be set dynamically. You
43 < * can limit the number of threads dynamically constructed using
44 < * method <tt>setMaximumPoolSize</tt> and/or
43 < * <tt>setMaintainParallelism</tt>.
36 > * adding, suspending, or resuming threads, even if some tasks are
37 > * waiting to join others. However, no such adjustments are performed
38 > * in the face of blocked IO or other unmanaged synchronization. The
39 > * nested <code>ManagedBlocker</code> interface enables extension of
40 > * the kinds of synchronization accommodated.  The target parallelism
41 > * level may also be changed dynamically (<code>setParallelism</code>)
42 > * and dynamically thread construction can be limited using methods
43 > * <code>setMaximumPoolSize</code> and/or
44 > * <code>setMaintainsParallelism</code>.
45   *
46   * <p>In addition to execution and lifecycle control methods, this
47   * class provides status check methods (for example
48 < * <tt>getStealCount</tt>) that are intended to aid in developing,
48 > * <code>getStealCount</code>) that are intended to aid in developing,
49   * tuning, and monitoring fork/join applications. Also, method
50 < * <tt>toString</tt> returns indications of pool state in a convenient
51 < * form for informal monitoring.
50 > * <code>toString</code> returns indications of pool state in a
51 > * convenient form for informal monitoring.
52   *
53   * <p><b>Implementation notes</b>: This implementation restricts the
54 < * maximum parallelism to 32767. Attempts to create pools with greater
55 < * than the maximum result in IllegalArgumentExceptions.
54 > * maximum number of running threads to 32767. Attempts to create
55 > * pools with greater than the maximum result in
56 > * IllegalArgumentExceptions.
57   */
58 < public class ForkJoinPool extends AbstractExecutorService
57 <    implements ExecutorService {
58 > public class ForkJoinPool extends AbstractExecutorService {
59  
60      /*
61       * See the extended comments interspersed below for design,
# Line 87 | Line 88 | public class ForkJoinPool extends Abstra
88       * Default ForkJoinWorkerThreadFactory implementation, creates a
89       * new ForkJoinWorkerThread.
90       */
91 <    public static class  DefaultForkJoinWorkerThreadFactory
91 >    static class  DefaultForkJoinWorkerThreadFactory
92          implements ForkJoinWorkerThreadFactory {
93          public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
94              try {
# Line 99 | Line 100 | public class ForkJoinPool extends Abstra
100      }
101  
102      /**
103 <     * The default ForkJoinWorkerThreadFactory, used unless overridden
104 <     * in ForkJoinPool constructors.
103 >     * Creates a new ForkJoinWorkerThread. This factory is used unless
104 >     * overridden in ForkJoinPool constructors.
105       */
106 <    private static final DefaultForkJoinWorkerThreadFactory
106 >    public static final ForkJoinWorkerThreadFactory
107          defaultForkJoinWorkerThreadFactory =
108          new DefaultForkJoinWorkerThreadFactory();
109  
109
110      /**
111       * Permission required for callers of methods that may start or
112       * kill threads.
# Line 320 | Line 320 | public class ForkJoinPool extends Abstra
320       * @throws SecurityException if a security manager exists and
321       *         the caller is not permitted to modify threads
322       *         because it does not hold {@link
323 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
323 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
324       */
325      public ForkJoinPool() {
326          this(Runtime.getRuntime().availableProcessors(),
# Line 336 | Line 336 | public class ForkJoinPool extends Abstra
336       * @throws SecurityException if a security manager exists and
337       *         the caller is not permitted to modify threads
338       *         because it does not hold {@link
339 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
339 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
340       */
341      public ForkJoinPool(int parallelism) {
342          this(parallelism, defaultForkJoinWorkerThreadFactory);
343      }
344  
345      /**
346 <     * Creates a ForkJoinPool with a pool size equal to the number of
346 >     * Creates a ForkJoinPool with parallelism equal to the number of
347       * processors available on the system and using the given
348       * ForkJoinWorkerThreadFactory,
349       * @param factory the factory for creating new threads
# Line 351 | Line 351 | public class ForkJoinPool extends Abstra
351       * @throws SecurityException if a security manager exists and
352       *         the caller is not permitted to modify threads
353       *         because it does not hold {@link
354 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
354 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
355       */
356      public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
357          this(Runtime.getRuntime().availableProcessors(), factory);
358      }
359  
360      /**
361 <     * Creates a ForkJoinPool with the indicated target number of
362 <     * worker threads and the given factory.
361 >     * Creates a ForkJoinPool with the given parallelism and factory.
362       *
363       * @param parallelism the targeted number of worker threads
364       * @param factory the factory for creating new threads
# Line 369 | Line 368 | public class ForkJoinPool extends Abstra
368       * @throws SecurityException if a security manager exists and
369       *         the caller is not permitted to modify threads
370       *         because it does not hold {@link
371 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
371 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
372       */
373      public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
374          if (parallelism <= 0 || parallelism > MAX_THREADS)
# Line 500 | Line 499 | public class ForkJoinPool extends Abstra
499          }
500      }
501  
503    /**
504     * Sets the handler for internal worker threads that terminate due
505     * to unrecoverable errors encountered while executing tasks.
506     * Unless set, the current default or ThreadGroup handler is used
507     * as handler.
508     *
509     * @param h the new handler
510     * @return the old handler, or null if none
511     * @throws SecurityException if a security manager exists and
512     *         the caller is not permitted to modify threads
513     *         because it does not hold {@link
514     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
515     */
516    public Thread.UncaughtExceptionHandler
517        setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
518        checkPermission();
519        Thread.UncaughtExceptionHandler old = null;
520        final ReentrantLock lock = this.workerLock;
521        lock.lock();
522        try {
523            old = ueh;
524            ueh = h;
525            ForkJoinWorkerThread[] ws = workers;
526            for (int i = 0; i < ws.length; ++i) {
527                ForkJoinWorkerThread w = ws[i];
528                if (w != null)
529                    w.setUncaughtExceptionHandler(h);
530            }
531        } finally {
532            lock.unlock();
533        }
534        return old;
535    }
536
537    /**
538     * Returns the handler for internal worker threads that terminate
539     * due to unrecoverable errors encountered while executing tasks.
540     * @return the handler, or null if none
541     */
542    public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
543        Thread.UncaughtExceptionHandler h;
544        final ReentrantLock lock = this.workerLock;
545        lock.lock();
546        try {
547            h = ueh;
548        } finally {
549            lock.unlock();
550        }
551        return h;
552    }
553
502      // Execution methods
503  
504      /**
# Line 609 | Line 557 | public class ForkJoinPool extends Abstra
557          return job;
558      }
559  
612    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
613        return new AdaptedRunnable(runnable, value);
614    }
615
616    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
617        return new AdaptedCallable(callable);
618    }
619
560      /**
561       * Adaptor for Runnables. This implements RunnableFuture
562       * to be compliant with AbstractExecutorService constraints
# Line 698 | Line 638 | public class ForkJoinPool extends Abstra
638      }
639  
640      /**
641 +     * Returns the handler for internal worker threads that terminate
642 +     * due to unrecoverable errors encountered while executing tasks.
643 +     * @return the handler, or null if none
644 +     */
645 +    public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
646 +        Thread.UncaughtExceptionHandler h;
647 +        final ReentrantLock lock = this.workerLock;
648 +        lock.lock();
649 +        try {
650 +            h = ueh;
651 +        } finally {
652 +            lock.unlock();
653 +        }
654 +        return h;
655 +    }
656 +
657 +    /**
658 +     * Sets the handler for internal worker threads that terminate due
659 +     * to unrecoverable errors encountered while executing tasks.
660 +     * Unless set, the current default or ThreadGroup handler is used
661 +     * as handler.
662 +     *
663 +     * @param h the new handler
664 +     * @return the old handler, or null if none
665 +     * @throws SecurityException if a security manager exists and
666 +     *         the caller is not permitted to modify threads
667 +     *         because it does not hold {@link
668 +     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
669 +     */
670 +    public Thread.UncaughtExceptionHandler
671 +        setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
672 +        checkPermission();
673 +        Thread.UncaughtExceptionHandler old = null;
674 +        final ReentrantLock lock = this.workerLock;
675 +        lock.lock();
676 +        try {
677 +            old = ueh;
678 +            ueh = h;
679 +            ForkJoinWorkerThread[] ws = workers;
680 +            for (int i = 0; i < ws.length; ++i) {
681 +                ForkJoinWorkerThread w = ws[i];
682 +                if (w != null)
683 +                    w.setUncaughtExceptionHandler(h);
684 +            }
685 +        } finally {
686 +            lock.unlock();
687 +        }
688 +        return old;
689 +    }
690 +
691 +
692 +    /**
693       * Sets the target paralleism level of this pool.
694       * @param parallelism the target parallelism
695       * @throws IllegalArgumentException if parallelism less than or
# Line 705 | Line 697 | public class ForkJoinPool extends Abstra
697       * @throws SecurityException if a security manager exists and
698       *         the caller is not permitted to modify threads
699       *         because it does not hold {@link
700 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
700 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
701       */
702      public void setParallelism(int parallelism) {
703          checkPermission();
# Line 730 | Line 722 | public class ForkJoinPool extends Abstra
722  
723      /**
724       * Returns the targeted number of worker threads in this pool.
733     * This value does not necessarily reflect transient changes as
734     * threads are added, removed, or abruptly terminate.
725       *
726       * @return the targeted number of worker threads in this pool
727       */
# Line 742 | Line 732 | public class ForkJoinPool extends Abstra
732      /**
733       * Returns the number of worker threads that have started but not
734       * yet terminated.  This result returned by this method may differ
735 <     * from <tt>getParallelism</tt> when threads are created to
735 >     * from <code>getParallelism</code> when threads are created to
736       * maintain parallelism when others are cooperatively blocked.
737       *
738       * @return the number of worker threads
# Line 797 | Line 787 | public class ForkJoinPool extends Abstra
787      }
788  
789      /**
790 <     * Returns the approximate number of worker threads that are not
791 <     * blocked waiting to join tasks or for other managed
790 >     * Returns an estimate of the number of worker threads that are
791 >     * not blocked waiting to join tasks or for other managed
792       * synchronization.
793       *
794       * @return the number of worker threads
# Line 808 | Line 798 | public class ForkJoinPool extends Abstra
798      }
799  
800      /**
801 <     * Returns the approximate number of threads that are currently
801 >     * Returns an estimate of the number of threads that are currently
802       * stealing or executing tasks. This method may overestimate the
803       * number of active threads.
804       * @return the number of active threads.
# Line 818 | Line 808 | public class ForkJoinPool extends Abstra
808      }
809  
810      /**
811 <     * Returns the approximate number of threads that are currently
811 >     * Returns an estimate of the number of threads that are currently
812       * idle waiting for tasks. This method may underestimate the
813       * number of idle threads.
814       * @return the number of idle threads.
# Line 867 | Line 857 | public class ForkJoinPool extends Abstra
857      }
858  
859      /**
860 <     * Returns the total number of tasks currently held in queues by
861 <     * worker threads (but not including tasks submitted to the pool
862 <     * that have not begun executing). This value is only an
863 <     * approximation, obtained by iterating across all threads in the
864 <     * pool. This method may be useful for tuning task granularities.
860 >     * Returns an estimate of the total number of tasks currently held
861 >     * in queues by worker threads (but not including tasks submitted
862 >     * to the pool that have not begun executing). This value is only
863 >     * an approximation, obtained by iterating across all threads in
864 >     * the pool. This method may be useful for tuning task
865 >     * granularities.
866       * @return the number of queued tasks.
867       */
868      public long getQueuedTaskCount() {
# Line 886 | Line 877 | public class ForkJoinPool extends Abstra
877      }
878  
879      /**
880 <     * Returns the approximate number tasks submitted to this pool
880 >     * Returns an estimate of the number tasks submitted to this pool
881       * that have not yet begun executing. This method takes time
882       * proportional to the number of submissions.
883       * @return the number of queued submissions.
# Line 898 | Line 889 | public class ForkJoinPool extends Abstra
889      /**
890       * Returns true if there are any tasks submitted to this pool
891       * that have not yet begun executing.
892 <     * @return <tt>true</tt> if there are any queued submissions.
892 >     * @return <code>true</code> if there are any queued submissions.
893       */
894      public boolean hasQueuedSubmissions() {
895          return !submissionQueue.isEmpty();
# Line 961 | Line 952 | public class ForkJoinPool extends Abstra
952       * @throws SecurityException if a security manager exists and
953       *         the caller is not permitted to modify threads
954       *         because it does not hold {@link
955 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
955 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
956       */
957      public void shutdown() {
958          checkPermission();
# Line 981 | Line 972 | public class ForkJoinPool extends Abstra
972       * @throws SecurityException if a security manager exists and
973       *         the caller is not permitted to modify threads
974       *         because it does not hold {@link
975 <     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
975 >     *         java.lang.RuntimePermission}<code>("modifyThread")</code>,
976       */
977      public List<Runnable> shutdownNow() {
978          checkPermission();
# Line 990 | Line 981 | public class ForkJoinPool extends Abstra
981      }
982  
983      /**
984 <     * Returns <tt>true</tt> if all tasks have completed following shut down.
984 >     * Returns <code>true</code> if all tasks have completed following shut down.
985       *
986 <     * @return <tt>true</tt> if all tasks have completed following shut down
986 >     * @return <code>true</code> if all tasks have completed following shut down
987       */
988      public boolean isTerminated() {
989          return runStateOf(runControl) == TERMINATED;
990      }
991  
992      /**
993 <     * Returns <tt>true</tt> if the process of termination has
993 >     * Returns <code>true</code> if the process of termination has
994       * commenced but possibly not yet completed.
995       *
996 <     * @return <tt>true</tt> if terminating
996 >     * @return <code>true</code> if terminating
997       */
998      public boolean isTerminating() {
999          return runStateOf(runControl) >= TERMINATING;
1000      }
1001  
1002      /**
1003 <     * Returns <tt>true</tt> if this pool has been shut down.
1003 >     * Returns <code>true</code> if this pool has been shut down.
1004       *
1005 <     * @return <tt>true</tt> if this pool has been shut down
1005 >     * @return <code>true</code> if this pool has been shut down
1006       */
1007      public boolean isShutdown() {
1008          return runStateOf(runControl) >= SHUTDOWN;
# Line 1024 | Line 1015 | public class ForkJoinPool extends Abstra
1015       *
1016       * @param timeout the maximum time to wait
1017       * @param unit the time unit of the timeout argument
1018 <     * @return <tt>true</tt> if this executor terminated and
1019 <     *         <tt>false</tt> if the timeout elapsed before termination
1018 >     * @return <code>true</code> if this executor terminated and
1019 >     *         <code>false</code> if the timeout elapsed before termination
1020       * @throws InterruptedException if interrupted while waiting
1021       */
1022      public boolean awaitTermination(long timeout, TimeUnit unit)
# Line 1598 | Line 1589 | public class ForkJoinPool extends Abstra
1589      /**
1590       * Interface for extending managed parallelism for tasks running
1591       * in ForkJoinPools. A ManagedBlocker provides two methods.
1592 <     * Method <tt>isReleasable</tt> must return true if blocking is not
1593 <     * necessary. Method <tt>block</tt> blocks the current thread
1592 >     * Method <code>isReleasable</code> must return true if blocking is not
1593 >     * necessary. Method <code>block</code> blocks the current thread
1594       * if necessary (perhaps internally invoking isReleasable before
1595       * actually blocking.).
1596       * <p>For example, here is a ManagedBlocker based on a
# Line 1642 | Line 1633 | public class ForkJoinPool extends Abstra
1633       * is a ForkJoinWorkerThread, this method possibly arranges for a
1634       * spare thread to be activated if necessary to ensure parallelism
1635       * while the current thread is blocked.  If
1636 <     * <tt>maintainParallelism</tt> is true and the pool supports it
1637 <     * (see <tt>getMaintainsParallelism</tt>), this method attempts to
1636 >     * <code>maintainParallelism</code> is true and the pool supports
1637 >     * it ({@link #getMaintainsParallelism}), this method attempts to
1638       * maintain the pool's nominal parallelism. Otherwise if activates
1639       * a thread only if necessary to avoid complete starvation. This
1640       * option may be preferable when blockages use timeouts, or are
# Line 1689 | Line 1680 | public class ForkJoinPool extends Abstra
1680          do;while (!blocker.isReleasable() && !blocker.block());
1681      }
1682  
1683 +    // AbstractExecutorService overrides
1684 +
1685 +    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
1686 +        return new AdaptedRunnable(runnable, value);
1687 +    }
1688 +
1689 +    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
1690 +        return new AdaptedCallable(callable);
1691 +    }
1692 +
1693  
1694      // Temporary Unsafe mechanics for preliminary release
1695  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines