ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.123 by dl, Mon Feb 20 18:20:06 2012 UTC vs.
Revision 1.135 by dl, Sun Oct 28 22:36:01 2012 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 +
9   import java.util.ArrayList;
10   import java.util.Arrays;
11   import java.util.Collection;
# Line 41 | Line 42 | import java.util.concurrent.locks.Condit
42   * ForkJoinPool}s may also be appropriate for use with event-style
43   * tasks that are never joined.
44   *
45 < * <p>A {@code ForkJoinPool} is constructed with a given target
46 < * parallelism level; by default, equal to the number of available
47 < * processors. The pool attempts to maintain enough active (or
48 < * available) threads by dynamically adding, suspending, or resuming
49 < * internal worker threads, even if some tasks are stalled waiting to
50 < * join others. However, no such adjustments are guaranteed in the
51 < * face of blocked IO or other unmanaged synchronization. The nested
52 < * {@link ManagedBlocker} interface enables extension of the kinds of
45 > * <p>A static {@link #commonPool} is available and appropriate for
46 > * most applications. The common pool is constructed upon first
47 > * access, or upon usage by any ForkJoinTask that is not explictly
48 > * submitted to a specified pool. Using the common pool normally
49 > * reduces resource usage (its threads are slowly reclaimed during
50 > * periods of non-use, and reinstated upon subsequent use).  The
51 > * common pool is by default constructed with default parameters, but
52 > * these may be controlled by setting any or all of the three
53 > * properties {@code
54 > * java.util.concurrent.ForkJoinPool.common.{parallelism,
55 > * threadFactory, exceptionHandler}}.
56 > *
57 > * <p>For applications that require separate or custom pools, a {@code
58 > * ForkJoinPool} may be constructed with a given target parallelism
59 > * level; by default, equal to the number of available processors. The
60 > * pool attempts to maintain enough active (or available) threads by
61 > * dynamically adding, suspending, or resuming internal worker
62 > * threads, even if some tasks are stalled waiting to join
63 > * others. However, no such adjustments are guaranteed in the face of
64 > * blocked IO or other unmanaged synchronization. The nested {@link
65 > * ManagedBlocker} interface enables extension of the kinds of
66   * synchronization accommodated.
67   *
68   * <p>In addition to execution and lifecycle control methods, this
# Line 93 | Line 107 | import java.util.concurrent.locks.Condit
107   *  </tr>
108   * </table>
109   *
96 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
97 * used for all parallel task execution in a program or subsystem.
98 * Otherwise, use would not usually outweigh the construction and
99 * bookkeeping overhead of creating a large set of threads. For
100 * example, a common pool could be used for the {@code SortTasks}
101 * illustrated in {@link RecursiveAction}. Because {@code
102 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
103 * daemon} mode, there is typically no need to explicitly {@link
104 * #shutdown} such a pool upon program exit.
105 *
106 *  <pre> {@code
107 * static final ForkJoinPool mainPool = new ForkJoinPool();
108 * ...
109 * public void sort(long[] array) {
110 *   mainPool.invoke(new SortTask(array, 0, array.length));
111 * }}</pre>
112 *
110   * <p><b>Implementation notes</b>: This implementation restricts the
111   * maximum number of running threads to 32767. Attempts to create
112   * pools with greater than the maximum number result in
# Line 320 | Line 317 | public class ForkJoinPool extends Abstra
317       *
318       * Trimming workers. To release resources after periods of lack of
319       * use, a worker starting to wait when the pool is quiescent will
320 <     * time out and terminate if the pool has remained quiescent for
321 <     * SHRINK_RATE nanosecs. This will slowly propagate, eventually
322 <     * terminating all workers after long periods of non-use.
320 >     * time out and terminate if the pool has remained quiescent for a
321 >     * given period -- a short period if there are more threads than
322 >     * parallelism, longer as the number of threads decreases. This
323 >     * will slowly propagate, eventually terminating all workers after
324 >     * periods of non-use.
325       *
326       * Shutdown and Termination. A call to shutdownNow atomically sets
327       * a runState bit and then (non-atomically) sets each worker's
# Line 413 | Line 412 | public class ForkJoinPool extends Abstra
412       * unblocked threads to the point that we know they are available)
413       * leading to more situations requiring more threads, and so
414       * on. This aspect of control can be seen as an (analytically
415 <     * intractible) game with an opponent that may choose the worst
415 >     * intractable) game with an opponent that may choose the worst
416       * (for us) active thread to stall at any time.  We take several
417       * precautions to bound losses (and thus bound gains), mainly in
418       * methods tryCompensate and awaitJoin: (1) We only try
# Line 629 | Line 628 | public class ForkJoinPool extends Abstra
628          final ForkJoinPool pool;   // the containing pool (may be null)
629          final ForkJoinWorkerThread owner; // owning thread or null if shared
630          volatile Thread parker;    // == owner during call to park; else null
631 <        ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
631 >        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
632          ForkJoinTask<?> currentSteal; // current non-local task being executed
633          // Heuristic padding to ameliorate unfortunate memory placements
634          Object p00, p01, p02, p03, p04, p05, p06, p07;
# Line 721 | Line 720 | public class ForkJoinPool extends Abstra
720           * version of this method because it is never needed.)
721           */
722          final ForkJoinTask<?> pop() {
723 <            ForkJoinTask<?> t; int m;
724 <            ForkJoinTask<?>[] a = array;
726 <            if (a != null && (m = a.length - 1) >= 0) {
723 >            ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
724 >            if ((a = array) != null && (m = a.length - 1) >= 0) {
725                  for (int s; (s = top - 1) - base >= 0;) {
726 <                    int j = ((m & s) << ASHIFT) + ABASE;
727 <                    if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) == null)
726 >                    long j = ((m & s) << ASHIFT) + ABASE;
727 >                    if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
728                          break;
729                      if (U.compareAndSwapObject(a, j, t, null)) {
730                          top = s;
# Line 814 | Line 812 | public class ForkJoinPool extends Abstra
812          }
813  
814          /**
815 +         * Version of tryUnpush for shared queues; called by non-FJ
816 +         * submitters. Conservatively fails to unpush if all workers
817 +         * are active unless there are multiple tasks in queue.
818 +         */
819 +        final boolean trySharedUnpush(ForkJoinTask<?> task, ForkJoinPool p) {
820 +            boolean success = false;
821 +            if (task != null && top != base && runState == 0 &&
822 +                U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
823 +                try {
824 +                    ForkJoinTask<?>[] a; int n, s;
825 +                    if ((a = array) != null && (n = (s = top) - base) > 0 &&
826 +                        (n > 1 || p == null || (int)(p.ctl >> AC_SHIFT) < 0)) {
827 +                        int j = (((a.length - 1) & --s) << ASHIFT) + ABASE;
828 +                        if (U.getObjectVolatile(a, j) == task &&
829 +                            U.compareAndSwapObject(a, j, task, null)) {
830 +                            top = s;
831 +                            success = true;
832 +                        }
833 +                    }
834 +                } finally {
835 +                    runState = 0;                         // unlock
836 +                }
837 +            }
838 +            return success;
839 +        }
840 +
841 +        /**
842           * Polls the given task only if it is at the current base.
843           */
844          final boolean pollFor(ForkJoinTask<?> task) {
# Line 830 | Line 855 | public class ForkJoinPool extends Abstra
855          }
856  
857          /**
833         * If present, removes from queue and executes the given task, or
834         * any other cancelled task. Returns (true) immediately on any CAS
835         * or consistency check failure so caller can retry.
836         *
837         * @return false if no progress can be made
838         */
839        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
840            boolean removed = false, empty = true, progress = true;
841            ForkJoinTask<?>[] a; int m, s, b, n;
842            if ((a = array) != null && (m = a.length - 1) >= 0 &&
843                (n = (s = top) - (b = base)) > 0) {
844                for (ForkJoinTask<?> t;;) {           // traverse from s to b
845                    int j = ((--s & m) << ASHIFT) + ABASE;
846                    t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
847                    if (t == null)                    // inconsistent length
848                        break;
849                    else if (t == task) {
850                        if (s + 1 == top) {           // pop
851                            if (!U.compareAndSwapObject(a, j, task, null))
852                                break;
853                            top = s;
854                            removed = true;
855                        }
856                        else if (base == b)           // replace with proxy
857                            removed = U.compareAndSwapObject(a, j, task,
858                                                             new EmptyTask());
859                        break;
860                    }
861                    else if (t.status >= 0)
862                        empty = false;
863                    else if (s + 1 == top) {          // pop and throw away
864                        if (U.compareAndSwapObject(a, j, t, null))
865                            top = s;
866                        break;
867                    }
868                    if (--n == 0) {
869                        if (!empty && base == b)
870                            progress = false;
871                        break;
872                    }
873                }
874            }
875            if (removed)
876                task.doExec();
877            return progress;
878        }
879
880        /**
858           * Initializes or doubles the capacity of array. Call either
859           * by owner or with lock held -- it is OK for base, but not
860           * top, to move while resizings are in progress.
# Line 939 | Line 916 | public class ForkJoinPool extends Abstra
916          // Execution methods
917  
918          /**
919 <         * Removes and runs tasks until empty, using local mode
920 <         * ordering. Normally called only after checking for apparent
921 <         * non-emptiness.
922 <         */
923 <        final void runLocalTasks() {
924 <            // hoist checks from repeated pop/poll
925 <            ForkJoinTask<?>[] a; int m;
926 <            if ((a = array) != null && (m = a.length - 1) >= 0) {
927 <                if (mode == 0) {
928 <                    for (int s; (s = top - 1) - base >= 0;) {
929 <                        int j = ((m & s) << ASHIFT) + ABASE;
930 <                        ForkJoinTask<?> t =
931 <                            (ForkJoinTask<?>)U.getObjectVolatile(a, j);
955 <                        if (t != null) {
956 <                            if (U.compareAndSwapObject(a, j, t, null)) {
957 <                                top = s;
958 <                                t.doExec();
959 <                            }
960 <                        }
961 <                        else
962 <                            break;
963 <                    }
919 >         * Pops and runs tasks until empty.
920 >         */
921 >        private void popAndExecAll() {
922 >            // A bit faster than repeated pop calls
923 >            ForkJoinTask<?>[] a; int m, s; long j; ForkJoinTask<?> t;
924 >            while ((a = array) != null && (m = a.length - 1) >= 0 &&
925 >                   (s = top - 1) - base >= 0 &&
926 >                   (t = ((ForkJoinTask<?>)
927 >                         U.getObject(a, j = ((m & s) << ASHIFT) + ABASE)))
928 >                   != null) {
929 >                if (U.compareAndSwapObject(a, j, t, null)) {
930 >                    top = s;
931 >                    t.doExec();
932                  }
933 <                else {
934 <                    for (int b; (b = base) - top < 0;) {
935 <                        int j = ((m & b) << ASHIFT) + ABASE;
936 <                        ForkJoinTask<?> t =
937 <                            (ForkJoinTask<?>)U.getObjectVolatile(a, j);
938 <                        if (t != null) {
939 <                            if (base == b &&
940 <                                U.compareAndSwapObject(a, j, t, null)) {
941 <                                base = b + 1;
942 <                                t.doExec();
943 <                            }
944 <                        } else if (base == b) {
945 <                            if (b + 1 == top)
933 >            }
934 >        }
935 >
936 >        /**
937 >         * Polls and runs tasks until empty.
938 >         */
939 >        private void pollAndExecAll() {
940 >            for (ForkJoinTask<?> t; (t = poll()) != null;)
941 >                t.doExec();
942 >        }
943 >
944 >        /**
945 >         * If present, removes from queue and executes the given task, or
946 >         * any other cancelled task. Returns (true) immediately on any CAS
947 >         * or consistency check failure so caller can retry.
948 >         *
949 >         * @return 0 if no progress can be made, else positive
950 >         * (this unusual convention simplifies use with tryHelpStealer.)
951 >         */
952 >        final int tryRemoveAndExec(ForkJoinTask<?> task) {
953 >            int stat = 1;
954 >            boolean removed = false, empty = true;
955 >            ForkJoinTask<?>[] a; int m, s, b, n;
956 >            if ((a = array) != null && (m = a.length - 1) >= 0 &&
957 >                (n = (s = top) - (b = base)) > 0) {
958 >                for (ForkJoinTask<?> t;;) {           // traverse from s to b
959 >                    int j = ((--s & m) << ASHIFT) + ABASE;
960 >                    t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
961 >                    if (t == null)                    // inconsistent length
962 >                        break;
963 >                    else if (t == task) {
964 >                        if (s + 1 == top) {           // pop
965 >                            if (!U.compareAndSwapObject(a, j, task, null))
966                                  break;
967 <                            Thread.yield(); // wait for lagging update
967 >                            top = s;
968 >                            removed = true;
969                          }
970 +                        else if (base == b)           // replace with proxy
971 +                            removed = U.compareAndSwapObject(a, j, task,
972 +                                                             new EmptyTask());
973 +                        break;
974 +                    }
975 +                    else if (t.status >= 0)
976 +                        empty = false;
977 +                    else if (s + 1 == top) {          // pop and throw away
978 +                        if (U.compareAndSwapObject(a, j, t, null))
979 +                            top = s;
980 +                        break;
981 +                    }
982 +                    if (--n == 0) {
983 +                        if (!empty && base == b)
984 +                            stat = 0;
985 +                        break;
986                      }
987                  }
988              }
989 +            if (removed)
990 +                task.doExec();
991 +            return stat;
992          }
993  
994          /**
995           * Executes a top-level task and any local tasks remaining
996           * after execution.
989         *
990         * @return true unless terminating
997           */
998 <        final boolean runTask(ForkJoinTask<?> t) {
993 <            boolean alive = true;
998 >        final void runTask(ForkJoinTask<?> t) {
999              if (t != null) {
1000                  currentSteal = t;
1001                  t.doExec();
1002 <                if (top != base)        // conservative guard
1003 <                    runLocalTasks();
1002 >                if (top != base) {       // process remaining local tasks
1003 >                    if (mode == 0)
1004 >                        popAndExecAll();
1005 >                    else
1006 >                        pollAndExecAll();
1007 >                }
1008                  ++nsteals;
1009                  currentSteal = null;
1010              }
1002            else if (runState < 0)      // terminating
1003                alive = false;
1004            return alive;
1011          }
1012  
1013          /**
# Line 1124 | Line 1130 | public class ForkJoinPool extends Abstra
1130      private static final RuntimePermission modifyThreadPermission;
1131  
1132      /**
1133 <     * Per-thread submission bookeeping. Shared across all pools
1133 >     * Per-thread submission bookkeeping. Shared across all pools
1134       * to reduce ThreadLocal pollution and because random motion
1135       * to avoid contention in one pool is likely to hold for others.
1136       */
1137      private static final ThreadSubmitter submitters;
1138  
1139 +    /** Common default pool */
1140 +    static volatile ForkJoinPool commonPool;
1141 +
1142 +    // commonPool construction parameters
1143 +    private static final String propPrefix =
1144 +        "java.util.concurrent.ForkJoinPool.common.";
1145 +    private static final Thread.UncaughtExceptionHandler commonPoolUEH;
1146 +    private static final ForkJoinWorkerThreadFactory commonPoolFactory;
1147 +    static final int commonPoolParallelism;
1148 +
1149 +    /** Static initialization lock */
1150 +    private static final Mutex initializationLock;
1151 +
1152      // static constants
1153  
1154      /**
1155 <     * The wakeup interval (in nanoseconds) for a worker waiting for a
1156 <     * task when the pool is quiescent to instead try to shrink the
1157 <     * number of workers.  The exact value does not matter too
1139 <     * much. It must be short enough to release resources during
1140 <     * sustained periods of idleness, but not so short that threads
1141 <     * are continually re-created.
1155 >     * Initial timeout value (in nanoseconds) for the tread triggering
1156 >     * quiescence to park waiting for new work. On timeout, the thread
1157 >     * will instead try to shrink the number of workers.
1158       */
1159 <    private static final long SHRINK_RATE =
1144 <        4L * 1000L * 1000L * 1000L; // 4 seconds
1159 >    private static final long IDLE_TIMEOUT      = 1000L * 1000L * 1000L; // 1sec
1160  
1161      /**
1162 <     * The timeout value for attempted shrinkage, includes
1148 <     * some slop to cope with system timer imprecision.
1162 >     * Timeout value when there are more threads than parallelism level
1163       */
1164 <    private static final long SHRINK_TIMEOUT = SHRINK_RATE - (SHRINK_RATE / 10);
1164 >    private static final long FAST_IDLE_TIMEOUT =  100L * 1000L * 1000L;
1165  
1166      /**
1167       * The maximum stolen->joining link depth allowed in method
# Line 1160 | Line 1174 | public class ForkJoinPool extends Abstra
1174       * traversal parameters at the expense of sometimes blocking when
1175       * we could be helping.
1176       */
1177 <    private static final int MAX_HELP = 32;
1177 >    private static final int MAX_HELP = 64;
1178  
1179      /**
1180       * Secondary time-based bound (in nanosecs) for helping attempts
# Line 1170 | Line 1184 | public class ForkJoinPool extends Abstra
1184       * value should roughly approximate the time required to create
1185       * and/or activate a worker thread.
1186       */
1187 <    private static final long COMPENSATION_DELAY = 100L * 1000L; // 0.1 millisec
1187 >    private static final long COMPENSATION_DELAY = 1L << 18; // ~0.25 millisec
1188  
1189      /**
1190       * Increment for seed generators. See class ThreadLocal for
# Line 1280 | Line 1294 | public class ForkJoinPool extends Abstra
1294      final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
1295      final AtomicLong stealCount;               // collect counts when terminated
1296      final AtomicInteger nextWorkerNumber;      // to create worker name string
1297 <    final String workerNamePrefix;             // to create worker name string
1297 >    String workerNamePrefix;                   // to create worker name string
1298  
1299      //  Creating, registering, and deregistering workers
1300  
# Line 1327 | Line 1341 | public class ForkJoinPool extends Abstra
1341          try {
1342              WorkQueue[] ws = workQueues;
1343              if (w != null && ws != null) {          // skip on shutdown/failure
1344 <                int rs, n;
1331 <                while ((n = ws.length) <            // ensure can hold total
1332 <                       (parallelism + (short)(ctl >>> TC_SHIFT) << 1))
1333 <                    workQueues = ws = Arrays.copyOf(ws, n << 1);
1334 <                int m = n - 1;
1344 >                int rs, n = ws.length, m = n - 1;
1345                  int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence
1346                  w.seed = (s == 0) ? 1 : s;          // ensure non-zero seed
1347                  int r = (s << 1) | 1;               // use odd-numbered indices
1348 <                while (ws[r &= m] != null)          // step by approx half size
1349 <                    r += ((n >>> 1) & SQMASK) + 2;
1348 >                if (ws[r &= m] != null) {           // collision
1349 >                    int probes = 0;                 // step by approx half size
1350 >                    int step = (n <= 4) ? 2 : ((n >>> 1) & SQMASK) + 2;
1351 >                    while (ws[r = (r + step) & m] != null) {
1352 >                        if (++probes >= n) {
1353 >                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1354 >                            m = n - 1;
1355 >                            probes = 0;
1356 >                        }
1357 >                    }
1358 >                }
1359                  w.eventCount = w.poolIndex = r;     // establish before recording
1360                  ws[r] = w;                          // also update seq
1361                  runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN);
# Line 1390 | Line 1409 | public class ForkJoinPool extends Abstra
1409              U.throwException(ex);
1410      }
1411  
1393
1412      // Submissions
1413  
1414      /**
# Line 1399 | Line 1417 | public class ForkJoinPool extends Abstra
1417       * range). If no queue exists at the index, one is created.  If
1418       * the queue is busy, another index is randomly chosen. The
1419       * submitMask bounds the effective number of queues to the
1420 <     * (nearest poswer of two for) parallelism level.
1420 >     * (nearest power of two for) parallelism level.
1421       *
1422       * @param task the task. Caller must ensure non-null.
1423       */
# Line 1438 | Line 1456 | public class ForkJoinPool extends Abstra
1456          }
1457      }
1458  
1459 +    /**
1460 +     * Submits the given (non-null) task to the common pool, if possible.
1461 +     */
1462 +    static void submitToCommonPool(ForkJoinTask<?> task) {
1463 +        ForkJoinPool p;
1464 +        if ((p = commonPool) == null)
1465 +            p = ensureCommonPool();
1466 +        p.doSubmit(task);
1467 +    }
1468 +
1469 +    /**
1470 +     * Returns true if the given task was submitted to common pool
1471 +     * and has not yet commenced execution, and is available for
1472 +     * removal according to execution policies; if so removing the
1473 +     * submission from the pool.
1474 +     *
1475 +     * @param task the task
1476 +     * @return true if successful
1477 +     */
1478 +    static boolean tryUnsubmitFromCommonPool(ForkJoinTask<?> task) {
1479 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1480 +        int k = submitters.get().seed & SQMASK;
1481 +        return ((p = commonPool) != null &&
1482 +                (ws = p.workQueues) != null &&
1483 +                ws.length > (k &= p.submitMask) &&
1484 +                (q = ws[k]) != null &&
1485 +                q.trySharedUnpush(task, p));
1486 +    }
1487 +
1488      // Maintaining ctl counts
1489  
1490      /**
# Line 1449 | Line 1496 | public class ForkJoinPool extends Abstra
1496      }
1497  
1498      /**
1499 <     * Tries to activate or create a worker if too few are active.
1499 >     * Tries to create one or activate one or more workers if too few are active.
1500       */
1501      final void signalWork() {
1502          long c; int u;
# Line 1483 | Line 1530 | public class ForkJoinPool extends Abstra
1530          }
1531      }
1532  
1486
1533      // Scanning for tasks
1534  
1535      /**
# Line 1491 | Line 1537 | public class ForkJoinPool extends Abstra
1537       */
1538      final void runWorker(WorkQueue w) {
1539          w.growArray(false);         // initialize queue array in this thread
1540 <        do {} while (w.runTask(scan(w)));
1540 >        do { w.runTask(scan(w)); } while (w.runState >= 0);
1541      }
1542  
1543      /**
# Line 1534 | Line 1580 | public class ForkJoinPool extends Abstra
1580       * awaiting signal,
1581       *
1582       * @param w the worker (via its WorkQueue)
1583 <     * @return a task or null of none found
1583 >     * @return a task or null if none found
1584       */
1585      private final ForkJoinTask<?> scan(WorkQueue w) {
1586          WorkQueue[] ws;                       // first update random seed
# Line 1551 | Line 1597 | public class ForkJoinPool extends Abstra
1597                      t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1598                      if (q.base == b && ec >= 0 && t != null &&
1599                          U.compareAndSwapObject(a, i, t, null)) {
1600 <                        q.base = b + 1;       // specialization of pollAt
1600 >                        if (q.top - (q.base = b + 1) > 0)
1601 >                            signalWork();    // help pushes signal
1602                          return t;
1603                      }
1604 <                    else if ((t != null || b + 1 != q.top) &&
1558 <                             (ec < 0 || j <= m)) {
1604 >                    else if (ec < 0 || j <= m) {
1605                          rs = 0;               // mark scan as imcomplete
1606                          break;                // caller can retry after release
1607                      }
# Line 1563 | Line 1609 | public class ForkJoinPool extends Abstra
1609                  if (--j < 0)
1610                      break;
1611              }
1612 +
1613              long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns;
1614              if (e < 0)                        // decode ctl on empty scan
1615                  w.runState = -1;              // pool is terminating
# Line 1588 | Line 1635 | public class ForkJoinPool extends Abstra
1635                  else {
1636                      if ((ns = w.nsteals) != 0) {
1637                          w.nsteals = 0;        // set rescans if ran task
1638 <                        w.rescans = (a > 0)? 0 : a + parallelism;
1638 >                        w.rescans = (a > 0) ? 0 : a + parallelism;
1639                          w.totalSteals += ns;
1640                      }
1641                      if (a == 1 - parallelism) // quiescent
# Line 1596 | Line 1643 | public class ForkJoinPool extends Abstra
1643                  }
1644              }
1645              else if (w.eventCount < 0) {      // already queued
1646 <                if ((nr = w.rescans) > 0) {   // continue rescanning
1647 <                    int ac = a + parallelism;
1648 <                    if (((w.rescans = (ac < nr) ? ac : nr - 1) & 3) == 0)
1649 <                        Thread.yield();       // yield before block
1603 <                }
1604 <                else {
1646 >                int ac = a + parallelism;
1647 >                if ((nr = w.rescans) > 0)     // continue rescanning
1648 >                    w.rescans = (ac < nr) ? ac : nr - 1;
1649 >                else if (((w.seed >>> 16) & ac) == 0) { // randomize park
1650                      Thread.interrupted();     // clear status
1651                      Thread wt = Thread.currentThread();
1652                      U.putObject(wt, PARKBLOCKER, this);
# Line 1619 | Line 1664 | public class ForkJoinPool extends Abstra
1664      /**
1665       * If inactivating worker w has caused the pool to become
1666       * quiescent, checks for pool termination, and, so long as this is
1667 <     * not the only worker, waits for event for up to SHRINK_RATE
1668 <     * nanosecs.  On timeout, if ctl has not changed, terminates the
1667 >     * not the only worker, waits for event for up to a given
1668 >     * duration.  On timeout, if ctl has not changed, terminates the
1669       * worker, which will in turn wake up another worker to possibly
1670       * repeat this process.
1671       *
# Line 1630 | Line 1675 | public class ForkJoinPool extends Abstra
1675       */
1676      private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1677          if (w.eventCount < 0 && !tryTerminate(false, false) &&
1678 <            (int)prevCtl != 0 && ctl == currentCtl) {
1678 >            (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) {
1679 >            int dc = -(short)(currentCtl >>> TC_SHIFT);
1680 >            long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1681 >            long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop
1682              Thread wt = Thread.currentThread();
1635            Thread.yield();            // yield before block
1683              while (ctl == currentCtl) {
1637                long startTime = System.nanoTime();
1684                  Thread.interrupted();  // timed variant of version in scan()
1685                  U.putObject(wt, PARKBLOCKER, this);
1686                  w.parker = wt;
1687                  if (ctl == currentCtl)
1688 <                    U.park(false, SHRINK_RATE);
1688 >                    U.park(false, parkTime);
1689                  w.parker = null;
1690                  U.putObject(wt, PARKBLOCKER, null);
1691                  if (ctl != currentCtl)
1692                      break;
1693 <                if (System.nanoTime() - startTime >= SHRINK_TIMEOUT &&
1693 >                if (deadline - System.nanoTime() <= 0L &&
1694                      U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1695                      w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1696                      w.runState = -1;   // shrink
# Line 1665 | Line 1711 | public class ForkJoinPool extends Abstra
1711       * leaves hints in workers to speed up subsequent calls. The
1712       * implementation is very branchy to cope with potential
1713       * inconsistencies or loops encountering chains that are stale,
1714 <     * unknown, or so long that they are likely cyclic.  All of these
1669 <     * cases are dealt with by just retrying by caller.
1714 >     * unknown, or so long that they are likely cyclic.
1715       *
1716       * @param joiner the joining worker
1717       * @param task the task to join
1718 <     * @return true if found or ran a task (and so is immediately retryable)
1718 >     * @return 0 if no progress can be made, negative if task
1719 >     * known complete, else positive
1720       */
1721 <    private boolean tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
1722 <        WorkQueue[] ws;
1723 <        int m, depth = MAX_HELP;                // remaining chain depth
1724 <        boolean progress = false;
1725 <        if ((ws = workQueues) != null && (m = ws.length - 1) > 0 &&
1726 <            task.status >= 0) {
1727 <            ForkJoinTask<?> subtask = task;     // current target
1728 <            outer: for (WorkQueue j = joiner;;) {
1729 <                WorkQueue stealer = null;       // find stealer of subtask
1730 <                WorkQueue v = ws[j.stealHint & m]; // try hint
1731 <                if (v != null && v.currentSteal == subtask)
1732 <                    stealer = v;
1733 <                else {                          // scan
1734 <                    for (int i = 1; i <= m; i += 2) {
1735 <                        if ((v = ws[i]) != null && v.currentSteal == subtask &&
1736 <                            v != joiner) {
1737 <                            stealer = v;
1738 <                            j.stealHint = i;    // save hint
1739 <                            break;
1721 >    private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
1722 >        int stat = 0, steps = 0;                    // bound to avoid cycles
1723 >        if (joiner != null && task != null) {       // hoist null checks
1724 >            restart: for (;;) {
1725 >                ForkJoinTask<?> subtask = task;     // current target
1726 >                for (WorkQueue j = joiner, v;;) {   // v is stealer of subtask
1727 >                    WorkQueue[] ws; int m, s, h;
1728 >                    if ((s = task.status) < 0) {
1729 >                        stat = s;
1730 >                        break restart;
1731 >                    }
1732 >                    if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
1733 >                        break restart;              // shutting down
1734 >                    if ((v = ws[h = (j.stealHint | 1) & m]) == null ||
1735 >                        v.currentSteal != subtask) {
1736 >                        for (int origin = h;;) {    // find stealer
1737 >                            if (((h = (h + 2) & m) & 15) == 1 &&
1738 >                                (subtask.status < 0 || j.currentJoin != subtask))
1739 >                                continue restart;   // occasional staleness check
1740 >                            if ((v = ws[h]) != null &&
1741 >                                v.currentSteal == subtask) {
1742 >                                j.stealHint = h;    // save hint
1743 >                                break;
1744 >                            }
1745 >                            if (h == origin)
1746 >                                break restart;      // cannot find stealer
1747                          }
1748                      }
1749 <                    if (stealer == null)
1750 <                        break;
1751 <                }
1752 <
1753 <                for (WorkQueue q = stealer;;) { // try to help stealer
1754 <                    ForkJoinTask[] a; ForkJoinTask<?> t; int b;
1755 <                    if (task.status < 0)
1756 <                        break outer;
1757 <                    if ((b = q.base) - q.top < 0 && (a = q.array) != null) {
1758 <                        progress = true;
1759 <                        int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1760 <                        t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1761 <                        if (subtask.status < 0) // must recheck before taking
1762 <                            break outer;
1763 <                        if (t != null &&
1764 <                            q.base == b &&
1765 <                            U.compareAndSwapObject(a, i, t, null)) {
1766 <                            q.base = b + 1;
1767 <                            joiner.runSubtask(t);
1749 >                    for (;;) { // help stealer or descend to its stealer
1750 >                        ForkJoinTask[] a;  int b;
1751 >                        if (subtask.status < 0)     // surround probes with
1752 >                            continue restart;       //   consistency checks
1753 >                        if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
1754 >                            int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1755 >                            ForkJoinTask<?> t =
1756 >                                (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1757 >                            if (subtask.status < 0 || j.currentJoin != subtask ||
1758 >                                v.currentSteal != subtask)
1759 >                                continue restart;   // stale
1760 >                            stat = 1;               // apparent progress
1761 >                            if (t != null && v.base == b &&
1762 >                                U.compareAndSwapObject(a, i, t, null)) {
1763 >                                v.base = b + 1;     // help stealer
1764 >                                joiner.runSubtask(t);
1765 >                            }
1766 >                            else if (v.base == b && ++steps == MAX_HELP)
1767 >                                break restart;      // v apparently stalled
1768 >                        }
1769 >                        else {                      // empty -- try to descend
1770 >                            ForkJoinTask<?> next = v.currentJoin;
1771 >                            if (subtask.status < 0 || j.currentJoin != subtask ||
1772 >                                v.currentSteal != subtask)
1773 >                                continue restart;   // stale
1774 >                            else if (next == null || ++steps == MAX_HELP)
1775 >                                break restart;      // dead-end or maybe cyclic
1776 >                            else {
1777 >                                subtask = next;
1778 >                                j = v;
1779 >                                break;
1780 >                            }
1781                          }
1716                        else if (q.base == b)
1717                            break outer;        // possibly stalled
1718                    }
1719                    else {                      // descend
1720                        ForkJoinTask<?> next = stealer.currentJoin;
1721                        if (--depth <= 0 || subtask.status < 0 ||
1722                            next == null || next == subtask)
1723                            break outer;        // stale, dead-end, or cyclic
1724                        subtask = next;
1725                        j = stealer;
1726                        break;
1782                      }
1783                  }
1784              }
1785          }
1786 <        return progress;
1786 >        return stat;
1787      }
1788  
1789      /**
# Line 1757 | Line 1812 | public class ForkJoinPool extends Abstra
1812       * adds a new thread if no idle workers are available and either
1813       * pool would become completely starved or: (at least half
1814       * starved, and fewer than 50% spares exist, and there is at least
1815 <     * one task apparently available). Even though the availablity
1815 >     * one task apparently available). Even though the availability
1816       * check requires a full scan, it is worthwhile in reducing false
1817       * alarms.
1818       *
1819 <     * @param task if nonnull, a task being waited for
1820 <     * @param blocker if nonnull, a blocker being waited for
1819 >     * @param task if non-null, a task being waited for
1820 >     * @param blocker if non-null, a blocker being waited for
1821       * @return true if the caller can block, else should recheck and retry
1822       */
1823      final boolean tryCompensate(ForkJoinTask<?> task, ManagedBlocker blocker) {
# Line 1821 | Line 1876 | public class ForkJoinPool extends Abstra
1876      }
1877  
1878      /**
1879 <     * Helps and/or blocks until the given task is done
1879 >     * Helps and/or blocks until the given task is done.
1880       *
1881       * @param joiner the joining worker
1882       * @param task the task
1883       * @return task status on exit
1884       */
1885      final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
1886 <        ForkJoinTask<?> prevJoin = joiner.currentJoin;
1887 <        joiner.currentJoin = task;
1888 <        long startTime = 0L;
1889 <        for (int k = 0, s; ; ++k) {
1890 <            if ((joiner.isEmpty() ?                  // try to help
1891 <                 !tryHelpStealer(joiner, task) :
1892 <                 !joiner.tryRemoveAndExec(task))) {
1893 <                if (k == 0) {
1894 <                    startTime = System.nanoTime();
1895 <                    tryPollForAndExec(joiner, task); // check uncommon case
1896 <                }
1897 <                else if ((k & (MAX_HELP - 1)) == 0 &&
1898 <                         System.nanoTime() - startTime >= COMPENSATION_DELAY &&
1899 <                         tryCompensate(task, null)) {
1900 <                    if (task.trySetSignal() && task.status >= 0) {
1901 <                        synchronized (task) {
1902 <                            if (task.status >= 0) {
1903 <                                try {                // see ForkJoinTask
1904 <                                    task.wait();     //  for explanation
1905 <                                } catch (InterruptedException ie) {
1886 >        int s;
1887 >        if ((s = task.status) >= 0) {
1888 >            ForkJoinTask<?> prevJoin = joiner.currentJoin;
1889 >            joiner.currentJoin = task;
1890 >            long startTime = 0L;
1891 >            for (int k = 0;;) {
1892 >                if ((s = (joiner.isEmpty() ?           // try to help
1893 >                          tryHelpStealer(joiner, task) :
1894 >                          joiner.tryRemoveAndExec(task))) == 0 &&
1895 >                    (s = task.status) >= 0) {
1896 >                    if (k == 0) {
1897 >                        startTime = System.nanoTime();
1898 >                        tryPollForAndExec(joiner, task); // check uncommon case
1899 >                    }
1900 >                    else if ((k & (MAX_HELP - 1)) == 0 &&
1901 >                             System.nanoTime() - startTime >=
1902 >                             COMPENSATION_DELAY &&
1903 >                             tryCompensate(task, null)) {
1904 >                        if (task.trySetSignal()) {
1905 >                            synchronized (task) {
1906 >                                if (task.status >= 0) {
1907 >                                    try {                // see ForkJoinTask
1908 >                                        task.wait();     //  for explanation
1909 >                                    } catch (InterruptedException ie) {
1910 >                                    }
1911                                  }
1912 +                                else
1913 +                                    task.notifyAll();
1914                              }
1853                            else
1854                                task.notifyAll();
1915                          }
1916 +                        long c;                          // re-activate
1917 +                        do {} while (!U.compareAndSwapLong
1918 +                                     (this, CTL, c = ctl, c + AC_UNIT));
1919                      }
1857                    long c;                          // re-activate
1858                    do {} while (!U.compareAndSwapLong
1859                                 (this, CTL, c = ctl, c + AC_UNIT));
1920                  }
1921 +                if (s < 0 || (s = task.status) < 0) {
1922 +                    joiner.currentJoin = prevJoin;
1923 +                    break;
1924 +                }
1925 +                else if ((k++ & (MAX_HELP - 1)) == MAX_HELP >>> 1)
1926 +                    Thread.yield();                     // for politeness
1927              }
1862            if ((s = task.status) < 0) {
1863                joiner.currentJoin = prevJoin;
1864                return s;
1865            }
1866            else if ((k & (MAX_HELP - 1)) == MAX_HELP >>> 1)
1867                Thread.yield();                     // for politeness
1928          }
1929 +        return s;
1930      }
1931  
1932      /**
# Line 1882 | Line 1943 | public class ForkJoinPool extends Abstra
1943          while ((s = task.status) >= 0 &&
1944                 (joiner.isEmpty() ?
1945                  tryHelpStealer(joiner, task) :
1946 <                joiner.tryRemoveAndExec(task)))
1946 >                joiner.tryRemoveAndExec(task)) != 0)
1947              ;
1948          return s;
1949      }
# Line 1895 | Line 1956 | public class ForkJoinPool extends Abstra
1956       */
1957      private WorkQueue findNonEmptyStealQueue(WorkQueue w) {
1958          // Similar to loop in scan(), but ignoring submissions
1959 <        int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1959 >        int r;
1960 >        if (w == null) // allow external callers
1961 >            r = ThreadLocalRandom.current().nextInt();
1962 >        else {
1963 >            r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1964 >        }
1965          int step = (r >>> 16) | 1;
1966          for (WorkQueue[] ws;;) {
1967              int rs = runState, m;
# Line 1922 | Line 1988 | public class ForkJoinPool extends Abstra
1988       */
1989      final void helpQuiescePool(WorkQueue w) {
1990          for (boolean active = true;;) {
1991 <            if (w.base - w.top < 0)
1992 <                w.runLocalTasks();  // exhaust local queue
1991 >            ForkJoinTask<?> localTask; // exhaust local queue
1992 >            while ((localTask = w.nextLocalTask()) != null)
1993 >                localTask.doExec();
1994              WorkQueue q = findNonEmptyStealQueue(w);
1995              if (q != null) {
1996                  ForkJoinTask<?> t; int b;
# Line 1955 | Line 2022 | public class ForkJoinPool extends Abstra
2022      }
2023  
2024      /**
2025 +     * Restricted version of helpQuiescePool for non-FJ callers
2026 +     */
2027 +    static void externalHelpQuiescePool() {
2028 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q;
2029 +        ForkJoinTask<?> t; int b;
2030 +        int k = submitters.get().seed & SQMASK;
2031 +        if ((p = commonPool) != null &&
2032 +            (ws = p.workQueues) != null &&
2033 +            ws.length > (k &= p.submitMask) &&
2034 +            (w = ws[k]) != null &&
2035 +            (q = p.findNonEmptyStealQueue(w)) != null &&
2036 +            (b = q.base) - q.top < 0 &&
2037 +            (t = q.pollAt(b)) != null)
2038 +            t.doExec();
2039 +    }
2040 +
2041 +    /**
2042       * Gets and removes a local or stolen task for the given worker.
2043       *
2044       * @return a task, if available
# Line 1987 | Line 2071 | public class ForkJoinPool extends Abstra
2071                  8);
2072      }
2073  
2074 +    /**
2075 +     * Returns approximate submission queue length for the given caller
2076 +     */
2077 +    static int getEstimatedSubmitterQueueLength() {
2078 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
2079 +        int k = submitters.get().seed & SQMASK;
2080 +        return ((p = commonPool) != null &&
2081 +                p.runState >= 0 &&
2082 +                (ws = p.workQueues) != null &&
2083 +                ws.length > (k &= p.submitMask) &&
2084 +                (q = ws[k]) != null) ?
2085 +            q.queueSize() : 0;
2086 +    }
2087 +
2088      //  Termination
2089  
2090      /**
# Line 2168 | Line 2266 | public class ForkJoinPool extends Abstra
2266          lock.unlock();
2267      }
2268  
2269 +    /**
2270 +     * Returns the common pool instance
2271 +     *
2272 +     * @return the common pool instance
2273 +     */
2274 +    public static ForkJoinPool commonPool() {
2275 +        ForkJoinPool p;
2276 +        return (p = commonPool) != null? p : ensureCommonPool();
2277 +    }
2278 +
2279 +    private static ForkJoinPool ensureCommonPool() {
2280 +        ForkJoinPool p;
2281 +        if ((p = commonPool) == null) {
2282 +            final Mutex lock = initializationLock;
2283 +            lock.lock();
2284 +            try {
2285 +                if ((p = commonPool) == null) {
2286 +                    p = commonPool = new ForkJoinPool(commonPoolParallelism,
2287 +                                                      commonPoolFactory,
2288 +                                                      commonPoolUEH, false);
2289 +                    // use a more informative name string for workers
2290 +                    p.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2291 +                }
2292 +            } finally {
2293 +                lock.unlock();
2294 +            }
2295 +        }
2296 +        return p;
2297 +    }
2298 +
2299      // Execution methods
2300  
2301      /**
# Line 2341 | Line 2469 | public class ForkJoinPool extends Abstra
2469      }
2470  
2471      /**
2472 +     * Returns the targeted parallelism level of the common pool.
2473 +     *
2474 +     * @return the targeted parallelism level of the common pool
2475 +     */
2476 +    public static int getCommonPoolParallelism() {
2477 +        return commonPoolParallelism;
2478 +    }
2479 +
2480 +    /**
2481       * Returns the number of worker threads that have started but not
2482       * yet terminated.  The result returned by this method may differ
2483       * from {@link #getParallelism} when threads are created to
# Line 2592 | Line 2729 | public class ForkJoinPool extends Abstra
2729      }
2730  
2731      /**
2732 <     * Initiates an orderly shutdown in which previously submitted
2733 <     * tasks are executed, but no new tasks will be accepted.
2734 <     * Invocation has no additional effect if already shut down.
2735 <     * Tasks that are in the process of being submitted concurrently
2736 <     * during the course of this method may or may not be rejected.
2732 >     * Possibly initiates an orderly shutdown in which previously
2733 >     * submitted tasks are executed, but no new tasks will be
2734 >     * accepted. Invocation has no effect on execution state if this
2735 >     * is the {@link #commonPool}, and no additional effect if
2736 >     * already shut down.  Tasks that are in the process of being
2737 >     * submitted concurrently during the course of this method may or
2738 >     * may not be rejected.
2739       *
2740       * @throws SecurityException if a security manager exists and
2741       *         the caller is not permitted to modify threads
# Line 2605 | Line 2744 | public class ForkJoinPool extends Abstra
2744       */
2745      public void shutdown() {
2746          checkPermission();
2747 <        tryTerminate(false, true);
2747 >        if (this != commonPool)
2748 >            tryTerminate(false, true);
2749      }
2750  
2751      /**
2752 <     * Attempts to cancel and/or stop all tasks, and reject all
2753 <     * subsequently submitted tasks.  Tasks that are in the process of
2754 <     * being submitted or executed concurrently during the course of
2755 <     * this method may or may not be rejected. This method cancels
2756 <     * both existing and unexecuted tasks, in order to permit
2757 <     * termination in the presence of task dependencies. So the method
2758 <     * always returns an empty list (unlike the case for some other
2759 <     * Executors).
2752 >     * Possibly attempts to cancel and/or stop all tasks, and reject
2753 >     * all subsequently submitted tasks.  Invocation has no effect on
2754 >     * execution state if this is the {@link #commonPool}, and no
2755 >     * additional effect if already shut down. Otherwise, tasks that
2756 >     * are in the process of being submitted or executed concurrently
2757 >     * during the course of this method may or may not be
2758 >     * rejected. This method cancels both existing and unexecuted
2759 >     * tasks, in order to permit termination in the presence of task
2760 >     * dependencies. So the method always returns an empty list
2761 >     * (unlike the case for some other Executors).
2762       *
2763       * @return an empty list
2764       * @throws SecurityException if a security manager exists and
# Line 2626 | Line 2768 | public class ForkJoinPool extends Abstra
2768       */
2769      public List<Runnable> shutdownNow() {
2770          checkPermission();
2771 <        tryTerminate(true, true);
2771 >        if (this != commonPool)
2772 >            tryTerminate(true, true);
2773          return Collections.emptyList();
2774      }
2775  
# Line 2835 | Line 2978 | public class ForkJoinPool extends Abstra
2978          defaultForkJoinWorkerThreadFactory =
2979              new DefaultForkJoinWorkerThreadFactory();
2980          submitters = new ThreadSubmitter();
2981 +        initializationLock = new Mutex();
2982          int s;
2983          try {
2984              U = getUnsafe();
# Line 2853 | Line 2997 | public class ForkJoinPool extends Abstra
2997          if ((s & (s-1)) != 0)
2998              throw new Error("data type scale not a power of two");
2999          ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
3000 +
3001 +        // Establish configuration for default pool
3002 +        try {
3003 +            String pp = System.getProperty(propPrefix + "parallelism");
3004 +            String fp = System.getProperty(propPrefix + "threadFactory");
3005 +            String up = System.getProperty(propPrefix + "exceptionHandler");
3006 +            int par;
3007 +            if ((pp == null || (par = Integer.parseInt(pp)) <= 0))
3008 +                par = Runtime.getRuntime().availableProcessors();
3009 +            commonPoolParallelism = par;
3010 +            if (fp != null)
3011 +                commonPoolFactory = (ForkJoinWorkerThreadFactory)
3012 +                    ClassLoader.getSystemClassLoader().loadClass(fp).newInstance();
3013 +            else
3014 +                commonPoolFactory = defaultForkJoinWorkerThreadFactory;
3015 +            if (up != null)
3016 +                commonPoolUEH = (Thread.UncaughtExceptionHandler)
3017 +                    ClassLoader.getSystemClassLoader().loadClass(up).newInstance();
3018 +            else
3019 +                commonPoolUEH = null;
3020 +        } catch (Exception e) {
3021 +            throw new Error(e);
3022 +        }
3023      }
3024  
3025      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines