ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.134 by jsr166, Sun Oct 21 06:40:20 2012 UTC vs.
Revision 1.135 by dl, Sun Oct 28 22:36:01 2012 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 +
9   import java.util.ArrayList;
10   import java.util.Arrays;
11   import java.util.Collection;
# Line 41 | Line 42 | import java.util.concurrent.locks.Condit
42   * ForkJoinPool}s may also be appropriate for use with event-style
43   * tasks that are never joined.
44   *
45 < * <p>A {@code ForkJoinPool} is constructed with a given target
46 < * parallelism level; by default, equal to the number of available
47 < * processors. The pool attempts to maintain enough active (or
48 < * available) threads by dynamically adding, suspending, or resuming
49 < * internal worker threads, even if some tasks are stalled waiting to
50 < * join others. However, no such adjustments are guaranteed in the
51 < * face of blocked IO or other unmanaged synchronization. The nested
52 < * {@link ManagedBlocker} interface enables extension of the kinds of
45 > * <p>A static {@link #commonPool} is available and appropriate for
46 > * most applications. The common pool is constructed upon first
47 > * access, or upon usage by any ForkJoinTask that is not explictly
48 > * submitted to a specified pool. Using the common pool normally
49 > * reduces resource usage (its threads are slowly reclaimed during
50 > * periods of non-use, and reinstated upon subsequent use).  The
51 > * common pool is by default constructed with default parameters, but
52 > * these may be controlled by setting any or all of the three
53 > * properties {@code
54 > * java.util.concurrent.ForkJoinPool.common.{parallelism,
55 > * threadFactory, exceptionHandler}}.
56 > *
57 > * <p>For applications that require separate or custom pools, a {@code
58 > * ForkJoinPool} may be constructed with a given target parallelism
59 > * level; by default, equal to the number of available processors. The
60 > * pool attempts to maintain enough active (or available) threads by
61 > * dynamically adding, suspending, or resuming internal worker
62 > * threads, even if some tasks are stalled waiting to join
63 > * others. However, no such adjustments are guaranteed in the face of
64 > * blocked IO or other unmanaged synchronization. The nested {@link
65 > * ManagedBlocker} interface enables extension of the kinds of
66   * synchronization accommodated.
67   *
68   * <p>In addition to execution and lifecycle control methods, this
# Line 93 | Line 107 | import java.util.concurrent.locks.Condit
107   *  </tr>
108   * </table>
109   *
96 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
97 * used for all parallel task execution in a program or subsystem.
98 * Otherwise, use would not usually outweigh the construction and
99 * bookkeeping overhead of creating a large set of threads. For
100 * example, a common pool could be used for the {@code SortTasks}
101 * illustrated in {@link RecursiveAction}. Because {@code
102 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
103 * daemon} mode, there is typically no need to explicitly {@link
104 * #shutdown} such a pool upon program exit.
105 *
106 *  <pre> {@code
107 * static final ForkJoinPool mainPool = new ForkJoinPool();
108 * ...
109 * public void sort(long[] array) {
110 *   mainPool.invoke(new SortTask(array, 0, array.length));
111 * }}</pre>
112 *
110   * <p><b>Implementation notes</b>: This implementation restricts the
111   * maximum number of running threads to 32767. Attempts to create
112   * pools with greater than the maximum number result in
# Line 320 | Line 317 | public class ForkJoinPool extends Abstra
317       *
318       * Trimming workers. To release resources after periods of lack of
319       * use, a worker starting to wait when the pool is quiescent will
320 <     * time out and terminate if the pool has remained quiescent for
321 <     * SHRINK_RATE nanosecs. This will slowly propagate, eventually
322 <     * terminating all workers after long periods of non-use.
320 >     * time out and terminate if the pool has remained quiescent for a
321 >     * given period -- a short period if there are more threads than
322 >     * parallelism, longer as the number of threads decreases. This
323 >     * will slowly propagate, eventually terminating all workers after
324 >     * periods of non-use.
325       *
326       * Shutdown and Termination. A call to shutdownNow atomically sets
327       * a runState bit and then (non-atomically) sets each worker's
# Line 813 | Line 812 | public class ForkJoinPool extends Abstra
812          }
813  
814          /**
815 +         * Version of tryUnpush for shared queues; called by non-FJ
816 +         * submitters. Conservatively fails to unpush if all workers
817 +         * are active unless there are multiple tasks in queue.
818 +         */
819 +        final boolean trySharedUnpush(ForkJoinTask<?> task, ForkJoinPool p) {
820 +            boolean success = false;
821 +            if (task != null && top != base && runState == 0 &&
822 +                U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
823 +                try {
824 +                    ForkJoinTask<?>[] a; int n, s;
825 +                    if ((a = array) != null && (n = (s = top) - base) > 0 &&
826 +                        (n > 1 || p == null || (int)(p.ctl >> AC_SHIFT) < 0)) {
827 +                        int j = (((a.length - 1) & --s) << ASHIFT) + ABASE;
828 +                        if (U.getObjectVolatile(a, j) == task &&
829 +                            U.compareAndSwapObject(a, j, task, null)) {
830 +                            top = s;
831 +                            success = true;
832 +                        }
833 +                    }
834 +                } finally {
835 +                    runState = 0;                         // unlock
836 +                }
837 +            }
838 +            return success;
839 +        }
840 +
841 +        /**
842           * Polls the given task only if it is at the current base.
843           */
844          final boolean pollFor(ForkJoinTask<?> task) {
# Line 1110 | Line 1136 | public class ForkJoinPool extends Abstra
1136       */
1137      private static final ThreadSubmitter submitters;
1138  
1139 +    /** Common default pool */
1140 +    static volatile ForkJoinPool commonPool;
1141 +
1142 +    // commonPool construction parameters
1143 +    private static final String propPrefix =
1144 +        "java.util.concurrent.ForkJoinPool.common.";
1145 +    private static final Thread.UncaughtExceptionHandler commonPoolUEH;
1146 +    private static final ForkJoinWorkerThreadFactory commonPoolFactory;
1147 +    static final int commonPoolParallelism;
1148 +
1149 +    /** Static initialization lock */
1150 +    private static final Mutex initializationLock;
1151 +
1152      // static constants
1153  
1154      /**
1155 <     * The wakeup interval (in nanoseconds) for a worker waiting for a
1156 <     * task when the pool is quiescent to instead try to shrink the
1157 <     * number of workers.  The exact value does not matter too
1119 <     * much. It must be short enough to release resources during
1120 <     * sustained periods of idleness, but not so short that threads
1121 <     * are continually re-created.
1155 >     * Initial timeout value (in nanoseconds) for the tread triggering
1156 >     * quiescence to park waiting for new work. On timeout, the thread
1157 >     * will instead try to shrink the number of workers.
1158       */
1159 <    private static final long SHRINK_RATE =
1124 <        4L * 1000L * 1000L * 1000L; // 4 seconds
1159 >    private static final long IDLE_TIMEOUT      = 1000L * 1000L * 1000L; // 1sec
1160  
1161      /**
1162 <     * The timeout value for attempted shrinkage, includes
1128 <     * some slop to cope with system timer imprecision.
1162 >     * Timeout value when there are more threads than parallelism level
1163       */
1164 <    private static final long SHRINK_TIMEOUT = SHRINK_RATE - (SHRINK_RATE / 10);
1164 >    private static final long FAST_IDLE_TIMEOUT =  100L * 1000L * 1000L;
1165  
1166      /**
1167       * The maximum stolen->joining link depth allowed in method
# Line 1260 | Line 1294 | public class ForkJoinPool extends Abstra
1294      final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
1295      final AtomicLong stealCount;               // collect counts when terminated
1296      final AtomicInteger nextWorkerNumber;      // to create worker name string
1297 <    final String workerNamePrefix;             // to create worker name string
1297 >    String workerNamePrefix;                   // to create worker name string
1298  
1299      //  Creating, registering, and deregistering workers
1300  
# Line 1375 | Line 1409 | public class ForkJoinPool extends Abstra
1409              U.throwException(ex);
1410      }
1411  
1378
1412      // Submissions
1413  
1414      /**
# Line 1423 | Line 1456 | public class ForkJoinPool extends Abstra
1456          }
1457      }
1458  
1459 +    /**
1460 +     * Submits the given (non-null) task to the common pool, if possible.
1461 +     */
1462 +    static void submitToCommonPool(ForkJoinTask<?> task) {
1463 +        ForkJoinPool p;
1464 +        if ((p = commonPool) == null)
1465 +            p = ensureCommonPool();
1466 +        p.doSubmit(task);
1467 +    }
1468 +
1469 +    /**
1470 +     * Returns true if the given task was submitted to common pool
1471 +     * and has not yet commenced execution, and is available for
1472 +     * removal according to execution policies; if so removing the
1473 +     * submission from the pool.
1474 +     *
1475 +     * @param task the task
1476 +     * @return true if successful
1477 +     */
1478 +    static boolean tryUnsubmitFromCommonPool(ForkJoinTask<?> task) {
1479 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1480 +        int k = submitters.get().seed & SQMASK;
1481 +        return ((p = commonPool) != null &&
1482 +                (ws = p.workQueues) != null &&
1483 +                ws.length > (k &= p.submitMask) &&
1484 +                (q = ws[k]) != null &&
1485 +                q.trySharedUnpush(task, p));
1486 +    }
1487 +
1488      // Maintaining ctl counts
1489  
1490      /**
# Line 1434 | Line 1496 | public class ForkJoinPool extends Abstra
1496      }
1497  
1498      /**
1499 <     * Tries to activate or create a worker if too few are active.
1499 >     * Tries to create one or activate one or more workers if too few are active.
1500       */
1501      final void signalWork() {
1502          long c; int u;
# Line 1535 | Line 1597 | public class ForkJoinPool extends Abstra
1597                      t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1598                      if (q.base == b && ec >= 0 && t != null &&
1599                          U.compareAndSwapObject(a, i, t, null)) {
1600 <                        if (q.top - (q.base = b + 1) > 1)
1600 >                        if (q.top - (q.base = b + 1) > 0)
1601                              signalWork();    // help pushes signal
1602                          return t;
1603                      }
# Line 1581 | Line 1643 | public class ForkJoinPool extends Abstra
1643                  }
1644              }
1645              else if (w.eventCount < 0) {      // already queued
1646 <                if ((nr = w.rescans) > 0) {   // continue rescanning
1647 <                    int ac = a + parallelism;
1648 <                    if (((w.rescans = (ac < nr) ? ac : nr - 1) & 3) == 0)
1649 <                        Thread.yield();       // yield before block
1588 <                }
1589 <                else {
1646 >                int ac = a + parallelism;
1647 >                if ((nr = w.rescans) > 0)     // continue rescanning
1648 >                    w.rescans = (ac < nr) ? ac : nr - 1;
1649 >                else if (((w.seed >>> 16) & ac) == 0) { // randomize park
1650                      Thread.interrupted();     // clear status
1651                      Thread wt = Thread.currentThread();
1652                      U.putObject(wt, PARKBLOCKER, this);
# Line 1604 | Line 1664 | public class ForkJoinPool extends Abstra
1664      /**
1665       * If inactivating worker w has caused the pool to become
1666       * quiescent, checks for pool termination, and, so long as this is
1667 <     * not the only worker, waits for event for up to SHRINK_RATE
1668 <     * nanosecs.  On timeout, if ctl has not changed, terminates the
1667 >     * not the only worker, waits for event for up to a given
1668 >     * duration.  On timeout, if ctl has not changed, terminates the
1669       * worker, which will in turn wake up another worker to possibly
1670       * repeat this process.
1671       *
# Line 1616 | Line 1676 | public class ForkJoinPool extends Abstra
1676      private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1677          if (w.eventCount < 0 && !tryTerminate(false, false) &&
1678              (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) {
1679 +            int dc = -(short)(currentCtl >>> TC_SHIFT);
1680 +            long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1681 +            long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop
1682              Thread wt = Thread.currentThread();
1620            Thread.yield();            // yield before block
1683              while (ctl == currentCtl) {
1622                long startTime = System.nanoTime();
1684                  Thread.interrupted();  // timed variant of version in scan()
1685                  U.putObject(wt, PARKBLOCKER, this);
1686                  w.parker = wt;
1687                  if (ctl == currentCtl)
1688 <                    U.park(false, SHRINK_RATE);
1688 >                    U.park(false, parkTime);
1689                  w.parker = null;
1690                  U.putObject(wt, PARKBLOCKER, null);
1691                  if (ctl != currentCtl)
1692                      break;
1693 <                if (System.nanoTime() - startTime >= SHRINK_TIMEOUT &&
1693 >                if (deadline - System.nanoTime() <= 0L &&
1694                      U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1695                      w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1696                      w.runState = -1;   // shrink
# Line 1895 | Line 1956 | public class ForkJoinPool extends Abstra
1956       */
1957      private WorkQueue findNonEmptyStealQueue(WorkQueue w) {
1958          // Similar to loop in scan(), but ignoring submissions
1959 <        int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1959 >        int r;
1960 >        if (w == null) // allow external callers
1961 >            r = ThreadLocalRandom.current().nextInt();
1962 >        else {
1963 >            r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1964 >        }
1965          int step = (r >>> 16) | 1;
1966          for (WorkQueue[] ws;;) {
1967              int rs = runState, m;
# Line 1914 | Line 1980 | public class ForkJoinPool extends Abstra
1980          }
1981      }
1982  
1917
1983      /**
1984       * Runs tasks until {@code isQuiescent()}. We piggyback on
1985       * active count ctl maintenance, but rather than blocking
# Line 1957 | Line 2022 | public class ForkJoinPool extends Abstra
2022      }
2023  
2024      /**
2025 +     * Restricted version of helpQuiescePool for non-FJ callers
2026 +     */
2027 +    static void externalHelpQuiescePool() {
2028 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q;
2029 +        ForkJoinTask<?> t; int b;
2030 +        int k = submitters.get().seed & SQMASK;
2031 +        if ((p = commonPool) != null &&
2032 +            (ws = p.workQueues) != null &&
2033 +            ws.length > (k &= p.submitMask) &&
2034 +            (w = ws[k]) != null &&
2035 +            (q = p.findNonEmptyStealQueue(w)) != null &&
2036 +            (b = q.base) - q.top < 0 &&
2037 +            (t = q.pollAt(b)) != null)
2038 +            t.doExec();
2039 +    }
2040 +
2041 +    /**
2042       * Gets and removes a local or stolen task for the given worker.
2043       *
2044       * @return a task, if available
# Line 1989 | Line 2071 | public class ForkJoinPool extends Abstra
2071                  8);
2072      }
2073  
2074 +    /**
2075 +     * Returns approximate submission queue length for the given caller
2076 +     */
2077 +    static int getEstimatedSubmitterQueueLength() {
2078 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
2079 +        int k = submitters.get().seed & SQMASK;
2080 +        return ((p = commonPool) != null &&
2081 +                p.runState >= 0 &&
2082 +                (ws = p.workQueues) != null &&
2083 +                ws.length > (k &= p.submitMask) &&
2084 +                (q = ws[k]) != null) ?
2085 +            q.queueSize() : 0;
2086 +    }
2087 +
2088      //  Termination
2089  
2090      /**
# Line 2170 | Line 2266 | public class ForkJoinPool extends Abstra
2266          lock.unlock();
2267      }
2268  
2269 +    /**
2270 +     * Returns the common pool instance
2271 +     *
2272 +     * @return the common pool instance
2273 +     */
2274 +    public static ForkJoinPool commonPool() {
2275 +        ForkJoinPool p;
2276 +        return (p = commonPool) != null? p : ensureCommonPool();
2277 +    }
2278 +
2279 +    private static ForkJoinPool ensureCommonPool() {
2280 +        ForkJoinPool p;
2281 +        if ((p = commonPool) == null) {
2282 +            final Mutex lock = initializationLock;
2283 +            lock.lock();
2284 +            try {
2285 +                if ((p = commonPool) == null) {
2286 +                    p = commonPool = new ForkJoinPool(commonPoolParallelism,
2287 +                                                      commonPoolFactory,
2288 +                                                      commonPoolUEH, false);
2289 +                    // use a more informative name string for workers
2290 +                    p.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2291 +                }
2292 +            } finally {
2293 +                lock.unlock();
2294 +            }
2295 +        }
2296 +        return p;
2297 +    }
2298 +
2299      // Execution methods
2300  
2301      /**
# Line 2343 | Line 2469 | public class ForkJoinPool extends Abstra
2469      }
2470  
2471      /**
2472 +     * Returns the targeted parallelism level of the common pool.
2473 +     *
2474 +     * @return the targeted parallelism level of the common pool
2475 +     */
2476 +    public static int getCommonPoolParallelism() {
2477 +        return commonPoolParallelism;
2478 +    }
2479 +
2480 +    /**
2481       * Returns the number of worker threads that have started but not
2482       * yet terminated.  The result returned by this method may differ
2483       * from {@link #getParallelism} when threads are created to
# Line 2594 | Line 2729 | public class ForkJoinPool extends Abstra
2729      }
2730  
2731      /**
2732 <     * Initiates an orderly shutdown in which previously submitted
2733 <     * tasks are executed, but no new tasks will be accepted.
2734 <     * Invocation has no additional effect if already shut down.
2735 <     * Tasks that are in the process of being submitted concurrently
2736 <     * during the course of this method may or may not be rejected.
2732 >     * Possibly initiates an orderly shutdown in which previously
2733 >     * submitted tasks are executed, but no new tasks will be
2734 >     * accepted. Invocation has no effect on execution state if this
2735 >     * is the {@link #commonPool}, and no additional effect if
2736 >     * already shut down.  Tasks that are in the process of being
2737 >     * submitted concurrently during the course of this method may or
2738 >     * may not be rejected.
2739       *
2740       * @throws SecurityException if a security manager exists and
2741       *         the caller is not permitted to modify threads
# Line 2607 | Line 2744 | public class ForkJoinPool extends Abstra
2744       */
2745      public void shutdown() {
2746          checkPermission();
2747 <        tryTerminate(false, true);
2747 >        if (this != commonPool)
2748 >            tryTerminate(false, true);
2749      }
2750  
2751      /**
2752 <     * Attempts to cancel and/or stop all tasks, and reject all
2753 <     * subsequently submitted tasks.  Tasks that are in the process of
2754 <     * being submitted or executed concurrently during the course of
2755 <     * this method may or may not be rejected. This method cancels
2756 <     * both existing and unexecuted tasks, in order to permit
2757 <     * termination in the presence of task dependencies. So the method
2758 <     * always returns an empty list (unlike the case for some other
2759 <     * Executors).
2752 >     * Possibly attempts to cancel and/or stop all tasks, and reject
2753 >     * all subsequently submitted tasks.  Invocation has no effect on
2754 >     * execution state if this is the {@link #commonPool}, and no
2755 >     * additional effect if already shut down. Otherwise, tasks that
2756 >     * are in the process of being submitted or executed concurrently
2757 >     * during the course of this method may or may not be
2758 >     * rejected. This method cancels both existing and unexecuted
2759 >     * tasks, in order to permit termination in the presence of task
2760 >     * dependencies. So the method always returns an empty list
2761 >     * (unlike the case for some other Executors).
2762       *
2763       * @return an empty list
2764       * @throws SecurityException if a security manager exists and
# Line 2628 | Line 2768 | public class ForkJoinPool extends Abstra
2768       */
2769      public List<Runnable> shutdownNow() {
2770          checkPermission();
2771 <        tryTerminate(true, true);
2771 >        if (this != commonPool)
2772 >            tryTerminate(true, true);
2773          return Collections.emptyList();
2774      }
2775  
# Line 2837 | Line 2978 | public class ForkJoinPool extends Abstra
2978          defaultForkJoinWorkerThreadFactory =
2979              new DefaultForkJoinWorkerThreadFactory();
2980          submitters = new ThreadSubmitter();
2981 +        initializationLock = new Mutex();
2982          int s;
2983          try {
2984              U = getUnsafe();
# Line 2855 | Line 2997 | public class ForkJoinPool extends Abstra
2997          if ((s & (s-1)) != 0)
2998              throw new Error("data type scale not a power of two");
2999          ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
3000 +
3001 +        // Establish configuration for default pool
3002 +        try {
3003 +            String pp = System.getProperty(propPrefix + "parallelism");
3004 +            String fp = System.getProperty(propPrefix + "threadFactory");
3005 +            String up = System.getProperty(propPrefix + "exceptionHandler");
3006 +            int par;
3007 +            if ((pp == null || (par = Integer.parseInt(pp)) <= 0))
3008 +                par = Runtime.getRuntime().availableProcessors();
3009 +            commonPoolParallelism = par;
3010 +            if (fp != null)
3011 +                commonPoolFactory = (ForkJoinWorkerThreadFactory)
3012 +                    ClassLoader.getSystemClassLoader().loadClass(fp).newInstance();
3013 +            else
3014 +                commonPoolFactory = defaultForkJoinWorkerThreadFactory;
3015 +            if (up != null)
3016 +                commonPoolUEH = (Thread.UncaughtExceptionHandler)
3017 +                    ClassLoader.getSystemClassLoader().loadClass(up).newInstance();
3018 +            else
3019 +                commonPoolUEH = null;
3020 +        } catch (Exception e) {
3021 +            throw new Error(e);
3022 +        }
3023      }
3024  
3025      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines