ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166e/ForkJoinPool.java (file contents):
Revision 1.6 by jsr166, Sun Oct 21 06:40:20 2012 UTC vs.
Revision 1.7 by dl, Sun Oct 28 22:35:45 2012 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166e;
8 +
9   import java.util.ArrayList;
10   import java.util.Arrays;
11   import java.util.Collection;
# Line 17 | Line 18 | import java.util.concurrent.ExecutorServ
18   import java.util.concurrent.Future;
19   import java.util.concurrent.RejectedExecutionException;
20   import java.util.concurrent.RunnableFuture;
21 + import java.util.concurrent.ThreadLocalRandom;
22   import java.util.concurrent.TimeUnit;
23   import java.util.concurrent.atomic.AtomicInteger;
24   import java.util.concurrent.atomic.AtomicLong;
# Line 41 | Line 43 | import java.util.concurrent.locks.Condit
43   * ForkJoinPool}s may also be appropriate for use with event-style
44   * tasks that are never joined.
45   *
46 < * <p>A {@code ForkJoinPool} is constructed with a given target
47 < * parallelism level; by default, equal to the number of available
48 < * processors. The pool attempts to maintain enough active (or
49 < * available) threads by dynamically adding, suspending, or resuming
50 < * internal worker threads, even if some tasks are stalled waiting to
51 < * join others. However, no such adjustments are guaranteed in the
52 < * face of blocked IO or other unmanaged synchronization. The nested
53 < * {@link ManagedBlocker} interface enables extension of the kinds of
46 > * <p>A static {@link #commonPool} is available and appropriate for
47 > * most applications. The common pool is constructed upon first
48 > * access, or upon usage by any ForkJoinTask that is not explictly
49 > * submitted to a specified pool. Using the common pool normally
50 > * reduces resource usage (its threads are slowly reclaimed during
51 > * periods of non-use, and reinstated upon subsequent use).  The
52 > * common pool is by default constructed with default parameters, but
53 > * these may be controlled by setting any or all of the three
54 > * properties {@code
55 > * java.util.concurrent.ForkJoinPool.common.{parallelism,
56 > * threadFactory, exceptionHandler}}.
57 > *
58 > * <p>For applications that require separate or custom pools, a {@code
59 > * ForkJoinPool} may be constructed with a given target parallelism
60 > * level; by default, equal to the number of available processors. The
61 > * pool attempts to maintain enough active (or available) threads by
62 > * dynamically adding, suspending, or resuming internal worker
63 > * threads, even if some tasks are stalled waiting to join
64 > * others. However, no such adjustments are guaranteed in the face of
65 > * blocked IO or other unmanaged synchronization. The nested {@link
66 > * ManagedBlocker} interface enables extension of the kinds of
67   * synchronization accommodated.
68   *
69   * <p>In addition to execution and lifecycle control methods, this
# Line 93 | Line 108 | import java.util.concurrent.locks.Condit
108   *  </tr>
109   * </table>
110   *
96 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
97 * used for all parallel task execution in a program or subsystem.
98 * Otherwise, use would not usually outweigh the construction and
99 * bookkeeping overhead of creating a large set of threads. For
100 * example, a common pool could be used for the {@code SortTasks}
101 * illustrated in {@link RecursiveAction}. Because {@code
102 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
103 * daemon} mode, there is typically no need to explicitly {@link
104 * #shutdown} such a pool upon program exit.
105 *
106 *  <pre> {@code
107 * static final ForkJoinPool mainPool = new ForkJoinPool();
108 * ...
109 * public void sort(long[] array) {
110 *   mainPool.invoke(new SortTask(array, 0, array.length));
111 * }}</pre>
112 *
111   * <p><b>Implementation notes</b>: This implementation restricts the
112   * maximum number of running threads to 32767. Attempts to create
113   * pools with greater than the maximum number result in
# Line 320 | Line 318 | public class ForkJoinPool extends Abstra
318       *
319       * Trimming workers. To release resources after periods of lack of
320       * use, a worker starting to wait when the pool is quiescent will
321 <     * time out and terminate if the pool has remained quiescent for
322 <     * SHRINK_RATE nanosecs. This will slowly propagate, eventually
323 <     * terminating all workers after long periods of non-use.
321 >     * time out and terminate if the pool has remained quiescent for a
322 >     * given period -- a short period if there are more threads than
323 >     * parallelism, longer as the number of threads decreases. This
324 >     * will slowly propagate, eventually terminating all workers after
325 >     * periods of non-use.
326       *
327       * Shutdown and Termination. A call to shutdownNow atomically sets
328       * a runState bit and then (non-atomically) sets each worker's
# Line 813 | Line 813 | public class ForkJoinPool extends Abstra
813          }
814  
815          /**
816 +         * Version of tryUnpush for shared queues; called by non-FJ
817 +         * submitters. Conservatively fails to unpush if all workers
818 +         * are active unless there are multiple tasks in queue.
819 +         */
820 +        final boolean trySharedUnpush(ForkJoinTask<?> task, ForkJoinPool p) {
821 +            boolean success = false;
822 +            if (task != null && top != base && runState == 0 &&
823 +                U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
824 +                try {
825 +                    ForkJoinTask<?>[] a; int n, s;
826 +                    if ((a = array) != null && (n = (s = top) - base) > 0 &&
827 +                        (n > 1 || p == null || (int)(p.ctl >> AC_SHIFT) < 0)) {
828 +                        int j = (((a.length - 1) & --s) << ASHIFT) + ABASE;
829 +                        if (U.getObjectVolatile(a, j) == task &&
830 +                            U.compareAndSwapObject(a, j, task, null)) {
831 +                            top = s;
832 +                            success = true;
833 +                        }
834 +                    }
835 +                } finally {
836 +                    runState = 0;                         // unlock
837 +                }
838 +            }
839 +            return success;
840 +        }
841 +
842 +        /**
843           * Polls the given task only if it is at the current base.
844           */
845          final boolean pollFor(ForkJoinTask<?> task) {
# Line 1110 | Line 1137 | public class ForkJoinPool extends Abstra
1137       */
1138      private static final ThreadSubmitter submitters;
1139  
1140 +    /** Common default pool */
1141 +    static volatile ForkJoinPool commonPool;
1142 +
1143 +    // commonPool construction parameters
1144 +    private static final String propPrefix =
1145 +        "java.util.concurrent.ForkJoinPool.common.";
1146 +    private static final Thread.UncaughtExceptionHandler commonPoolUEH;
1147 +    private static final ForkJoinWorkerThreadFactory commonPoolFactory;
1148 +    static final int commonPoolParallelism;
1149 +
1150 +    /** Static initialization lock */
1151 +    private static final Mutex initializationLock;
1152 +
1153      // static constants
1154  
1155      /**
1156 <     * The wakeup interval (in nanoseconds) for a worker waiting for a
1157 <     * task when the pool is quiescent to instead try to shrink the
1158 <     * number of workers.  The exact value does not matter too
1119 <     * much. It must be short enough to release resources during
1120 <     * sustained periods of idleness, but not so short that threads
1121 <     * are continually re-created.
1156 >     * Initial timeout value (in nanoseconds) for the tread triggering
1157 >     * quiescence to park waiting for new work. On timeout, the thread
1158 >     * will instead try to shrink the number of workers.
1159       */
1160 <    private static final long SHRINK_RATE =
1124 <        4L * 1000L * 1000L * 1000L; // 4 seconds
1160 >    private static final long IDLE_TIMEOUT      = 1000L * 1000L * 1000L; // 1sec
1161  
1162      /**
1163 <     * The timeout value for attempted shrinkage, includes
1128 <     * some slop to cope with system timer imprecision.
1163 >     * Timeout value when there are more threads than parallelism level
1164       */
1165 <    private static final long SHRINK_TIMEOUT = SHRINK_RATE - (SHRINK_RATE / 10);
1165 >    private static final long FAST_IDLE_TIMEOUT =  100L * 1000L * 1000L;
1166  
1167      /**
1168       * The maximum stolen->joining link depth allowed in method
# Line 1260 | Line 1295 | public class ForkJoinPool extends Abstra
1295      final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
1296      final AtomicLong stealCount;               // collect counts when terminated
1297      final AtomicInteger nextWorkerNumber;      // to create worker name string
1298 <    final String workerNamePrefix;             // to create worker name string
1298 >    String workerNamePrefix;                   // to create worker name string
1299  
1300      //  Creating, registering, and deregistering workers
1301  
# Line 1307 | Line 1342 | public class ForkJoinPool extends Abstra
1342          try {
1343              WorkQueue[] ws = workQueues;
1344              if (w != null && ws != null) {          // skip on shutdown/failure
1345 <                int rs, n = ws.length, m = n - 1;
1345 >                int rs, n =  ws.length, m = n - 1;
1346                  int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence
1347                  w.seed = (s == 0) ? 1 : s;          // ensure non-zero seed
1348                  int r = (s << 1) | 1;               // use odd-numbered indices
# Line 1375 | Line 1410 | public class ForkJoinPool extends Abstra
1410              U.throwException(ex);
1411      }
1412  
1378
1413      // Submissions
1414  
1415      /**
# Line 1423 | Line 1457 | public class ForkJoinPool extends Abstra
1457          }
1458      }
1459  
1460 +    /**
1461 +     * Submits the given (non-null) task to the common pool, if possible.
1462 +     */
1463 +    static void submitToCommonPool(ForkJoinTask<?> task) {
1464 +        ForkJoinPool p;
1465 +        if ((p = commonPool) == null)
1466 +            p = ensureCommonPool();
1467 +        p.doSubmit(task);
1468 +    }
1469 +
1470 +    /**
1471 +     * Returns true if the given task was submitted to common pool
1472 +     * and has not yet commenced execution, and is available for
1473 +     * removal according to execution policies; if so removing the
1474 +     * submission from the pool.
1475 +     *
1476 +     * @param task the task
1477 +     * @return true if successful
1478 +     */
1479 +    static boolean tryUnsubmitFromCommonPool(ForkJoinTask<?> task) {
1480 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1481 +        int k = submitters.get().seed & SQMASK;
1482 +        return ((p = commonPool) != null &&
1483 +                (ws = p.workQueues) != null &&
1484 +                ws.length > (k &= p.submitMask) &&
1485 +                (q = ws[k]) != null &&
1486 +                q.trySharedUnpush(task, p));
1487 +    }
1488 +
1489      // Maintaining ctl counts
1490  
1491      /**
# Line 1434 | Line 1497 | public class ForkJoinPool extends Abstra
1497      }
1498  
1499      /**
1500 <     * Tries to activate or create a worker if too few are active.
1500 >     * Tries to create one or activate one or more workers if too few are active.
1501       */
1502      final void signalWork() {
1503          long c; int u;
# Line 1535 | Line 1598 | public class ForkJoinPool extends Abstra
1598                      t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1599                      if (q.base == b && ec >= 0 && t != null &&
1600                          U.compareAndSwapObject(a, i, t, null)) {
1601 <                        if (q.top - (q.base = b + 1) > 1)
1601 >                        if (q.top - (q.base = b + 1) > 0)
1602                              signalWork();    // help pushes signal
1603                          return t;
1604                      }
# Line 1581 | Line 1644 | public class ForkJoinPool extends Abstra
1644                  }
1645              }
1646              else if (w.eventCount < 0) {      // already queued
1647 <                if ((nr = w.rescans) > 0) {   // continue rescanning
1648 <                    int ac = a + parallelism;
1649 <                    if (((w.rescans = (ac < nr) ? ac : nr - 1) & 3) == 0)
1650 <                        Thread.yield();       // yield before block
1588 <                }
1589 <                else {
1647 >                int ac = a + parallelism;
1648 >                if ((nr = w.rescans) > 0)     // continue rescanning
1649 >                    w.rescans = (ac < nr) ? ac : nr - 1;
1650 >                else if (((w.seed >>> 16) & ac) == 0) { // randomize park
1651                      Thread.interrupted();     // clear status
1652                      Thread wt = Thread.currentThread();
1653                      U.putObject(wt, PARKBLOCKER, this);
# Line 1604 | Line 1665 | public class ForkJoinPool extends Abstra
1665      /**
1666       * If inactivating worker w has caused the pool to become
1667       * quiescent, checks for pool termination, and, so long as this is
1668 <     * not the only worker, waits for event for up to SHRINK_RATE
1669 <     * nanosecs.  On timeout, if ctl has not changed, terminates the
1668 >     * not the only worker, waits for event for up to a given
1669 >     * duration.  On timeout, if ctl has not changed, terminates the
1670       * worker, which will in turn wake up another worker to possibly
1671       * repeat this process.
1672       *
# Line 1616 | Line 1677 | public class ForkJoinPool extends Abstra
1677      private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1678          if (w.eventCount < 0 && !tryTerminate(false, false) &&
1679              (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) {
1680 +            int dc = -(short)(currentCtl >>> TC_SHIFT);
1681 +            long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1682 +            long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop
1683              Thread wt = Thread.currentThread();
1620            Thread.yield();            // yield before block
1684              while (ctl == currentCtl) {
1622                long startTime = System.nanoTime();
1685                  Thread.interrupted();  // timed variant of version in scan()
1686                  U.putObject(wt, PARKBLOCKER, this);
1687                  w.parker = wt;
1688                  if (ctl == currentCtl)
1689 <                    U.park(false, SHRINK_RATE);
1689 >                    U.park(false, parkTime);
1690                  w.parker = null;
1691                  U.putObject(wt, PARKBLOCKER, null);
1692                  if (ctl != currentCtl)
1693                      break;
1694 <                if (System.nanoTime() - startTime >= SHRINK_TIMEOUT &&
1694 >                if (deadline - System.nanoTime() <= 0L &&
1695                      U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1696                      w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1697                      w.runState = -1;   // shrink
# Line 1895 | Line 1957 | public class ForkJoinPool extends Abstra
1957       */
1958      private WorkQueue findNonEmptyStealQueue(WorkQueue w) {
1959          // Similar to loop in scan(), but ignoring submissions
1960 <        int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1960 >        int r;
1961 >        if (w == null) // allow external callers
1962 >            r = ThreadLocalRandom.current().nextInt();
1963 >        else {
1964 >            r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1965 >        }
1966          int step = (r >>> 16) | 1;
1967          for (WorkQueue[] ws;;) {
1968              int rs = runState, m;
# Line 1914 | Line 1981 | public class ForkJoinPool extends Abstra
1981          }
1982      }
1983  
1917
1984      /**
1985       * Runs tasks until {@code isQuiescent()}. We piggyback on
1986       * active count ctl maintenance, but rather than blocking
# Line 1957 | Line 2023 | public class ForkJoinPool extends Abstra
2023      }
2024  
2025      /**
2026 +     * Restricted version of helpQuiescePool for non-FJ callers
2027 +     */
2028 +    static void externalHelpQuiescePool() {
2029 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q;
2030 +        ForkJoinTask<?> t; int b;
2031 +        int k = submitters.get().seed & SQMASK;
2032 +        if ((p = commonPool) != null &&
2033 +            (ws = p.workQueues) != null &&
2034 +            ws.length > (k &= p.submitMask) &&
2035 +            (w = ws[k]) != null &&
2036 +            (q = p.findNonEmptyStealQueue(w)) != null &&
2037 +            (b = q.base) - q.top < 0 &&
2038 +            (t = q.pollAt(b)) != null)
2039 +            t.doExec();
2040 +    }
2041 +
2042 +    /**
2043       * Gets and removes a local or stolen task for the given worker.
2044       *
2045       * @return a task, if available
# Line 1989 | Line 2072 | public class ForkJoinPool extends Abstra
2072                  8);
2073      }
2074  
2075 +    /**
2076 +     * Returns approximate submission queue length for the given caller
2077 +     */
2078 +    static int getEstimatedSubmitterQueueLength() {
2079 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
2080 +        int k = submitters.get().seed & SQMASK;
2081 +        return ((p = commonPool) != null &&
2082 +                p.runState >= 0 &&
2083 +                (ws = p.workQueues) != null &&
2084 +                ws.length > (k &= p.submitMask) &&
2085 +                (q = ws[k]) != null) ?
2086 +            q.queueSize() : 0;
2087 +    }
2088 +
2089      //  Termination
2090  
2091      /**
# Line 2170 | Line 2267 | public class ForkJoinPool extends Abstra
2267          lock.unlock();
2268      }
2269  
2270 +    /**
2271 +     * Returns the common pool instance
2272 +     *
2273 +     * @return the common pool instance
2274 +     */
2275 +    public static ForkJoinPool commonPool() {
2276 +        ForkJoinPool p;
2277 +        return (p = commonPool) != null? p : ensureCommonPool();
2278 +    }
2279 +
2280 +    private static ForkJoinPool ensureCommonPool() {
2281 +        ForkJoinPool p;
2282 +        if ((p = commonPool) == null) {
2283 +            final Mutex lock = initializationLock;
2284 +            lock.lock();
2285 +            try {
2286 +                if ((p = commonPool) == null) {
2287 +                    p = commonPool = new ForkJoinPool(commonPoolParallelism,
2288 +                                                      commonPoolFactory,
2289 +                                                      commonPoolUEH, false);
2290 +                    // use a more informative name string for workers
2291 +                    p.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2292 +                }
2293 +            } finally {
2294 +                lock.unlock();
2295 +            }
2296 +        }
2297 +        return p;
2298 +    }
2299 +
2300      // Execution methods
2301  
2302      /**
# Line 2343 | Line 2470 | public class ForkJoinPool extends Abstra
2470      }
2471  
2472      /**
2473 +     * Returns the targeted parallelism level of the common pool.
2474 +     *
2475 +     * @return the targeted parallelism level of the common pool
2476 +     */
2477 +    public static int getCommonPoolParallelism() {
2478 +        return commonPoolParallelism;
2479 +    }
2480 +
2481 +    /**
2482       * Returns the number of worker threads that have started but not
2483       * yet terminated.  The result returned by this method may differ
2484       * from {@link #getParallelism} when threads are created to
# Line 2594 | Line 2730 | public class ForkJoinPool extends Abstra
2730      }
2731  
2732      /**
2733 <     * Initiates an orderly shutdown in which previously submitted
2734 <     * tasks are executed, but no new tasks will be accepted.
2735 <     * Invocation has no additional effect if already shut down.
2736 <     * Tasks that are in the process of being submitted concurrently
2737 <     * during the course of this method may or may not be rejected.
2733 >     * Possibly initiates an orderly shutdown in which previously
2734 >     * submitted tasks are executed, but no new tasks will be
2735 >     * accepted. Invocation has no effect on execution state if this
2736 >     * is the {@link #commonPool}, and no additional effect if
2737 >     * already shut down.  Tasks that are in the process of being
2738 >     * submitted concurrently during the course of this method may or
2739 >     * may not be rejected.
2740       *
2741       * @throws SecurityException if a security manager exists and
2742       *         the caller is not permitted to modify threads
# Line 2607 | Line 2745 | public class ForkJoinPool extends Abstra
2745       */
2746      public void shutdown() {
2747          checkPermission();
2748 <        tryTerminate(false, true);
2748 >        if (this != commonPool)
2749 >            tryTerminate(false, true);
2750      }
2751  
2752      /**
2753 <     * Attempts to cancel and/or stop all tasks, and reject all
2754 <     * subsequently submitted tasks.  Tasks that are in the process of
2755 <     * being submitted or executed concurrently during the course of
2756 <     * this method may or may not be rejected. This method cancels
2757 <     * both existing and unexecuted tasks, in order to permit
2758 <     * termination in the presence of task dependencies. So the method
2759 <     * always returns an empty list (unlike the case for some other
2760 <     * Executors).
2753 >     * Possibly attempts to cancel and/or stop all tasks, and reject
2754 >     * all subsequently submitted tasks.  Invocation has no effect on
2755 >     * execution state if this is the {@link #commonPool}, and no
2756 >     * additional effect if already shut down. Otherwise, tasks that
2757 >     * are in the process of being submitted or executed concurrently
2758 >     * during the course of this method may or may not be
2759 >     * rejected. This method cancels both existing and unexecuted
2760 >     * tasks, in order to permit termination in the presence of task
2761 >     * dependencies. So the method always returns an empty list
2762 >     * (unlike the case for some other Executors).
2763       *
2764       * @return an empty list
2765       * @throws SecurityException if a security manager exists and
# Line 2628 | Line 2769 | public class ForkJoinPool extends Abstra
2769       */
2770      public List<Runnable> shutdownNow() {
2771          checkPermission();
2772 <        tryTerminate(true, true);
2772 >        if (this != commonPool)
2773 >            tryTerminate(true, true);
2774          return Collections.emptyList();
2775      }
2776  
# Line 2837 | Line 2979 | public class ForkJoinPool extends Abstra
2979          defaultForkJoinWorkerThreadFactory =
2980              new DefaultForkJoinWorkerThreadFactory();
2981          submitters = new ThreadSubmitter();
2982 +        initializationLock = new Mutex();
2983          int s;
2984          try {
2985              U = getUnsafe();
# Line 2855 | Line 2998 | public class ForkJoinPool extends Abstra
2998          if ((s & (s-1)) != 0)
2999              throw new Error("data type scale not a power of two");
3000          ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
3001 +
3002 +        // Establish configuration for default pool
3003 +        try {
3004 +            String pp = System.getProperty(propPrefix + "parallelism");
3005 +            String fp = System.getProperty(propPrefix + "threadFactory");
3006 +            String up = System.getProperty(propPrefix + "exceptionHandler");
3007 +            int par;
3008 +            if ((pp == null || (par = Integer.parseInt(pp)) <= 0))
3009 +                par = Runtime.getRuntime().availableProcessors();
3010 +            commonPoolParallelism = par;
3011 +            if (fp != null)
3012 +                commonPoolFactory = (ForkJoinWorkerThreadFactory)
3013 +                    ClassLoader.getSystemClassLoader().loadClass(fp).newInstance();
3014 +            else
3015 +                commonPoolFactory = defaultForkJoinWorkerThreadFactory;
3016 +            if (up != null)
3017 +                commonPoolUEH = (Thread.UncaughtExceptionHandler)
3018 +                    ClassLoader.getSystemClassLoader().loadClass(up).newInstance();
3019 +            else
3020 +                commonPoolUEH = null;
3021 +        } catch (Exception e) {
3022 +            throw new Error(e);
3023 +        }
3024      }
3025  
3026      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines