ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.127 by dl, Sun Mar 4 15:52:45 2012 UTC vs.
Revision 1.135 by dl, Sun Oct 28 22:36:01 2012 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 +
9   import java.util.ArrayList;
10   import java.util.Arrays;
11   import java.util.Collection;
# Line 41 | Line 42 | import java.util.concurrent.locks.Condit
42   * ForkJoinPool}s may also be appropriate for use with event-style
43   * tasks that are never joined.
44   *
45 < * <p>A {@code ForkJoinPool} is constructed with a given target
46 < * parallelism level; by default, equal to the number of available
47 < * processors. The pool attempts to maintain enough active (or
48 < * available) threads by dynamically adding, suspending, or resuming
49 < * internal worker threads, even if some tasks are stalled waiting to
50 < * join others. However, no such adjustments are guaranteed in the
51 < * face of blocked IO or other unmanaged synchronization. The nested
52 < * {@link ManagedBlocker} interface enables extension of the kinds of
45 > * <p>A static {@link #commonPool} is available and appropriate for
46 > * most applications. The common pool is constructed upon first
47 > * access, or upon usage by any ForkJoinTask that is not explictly
48 > * submitted to a specified pool. Using the common pool normally
49 > * reduces resource usage (its threads are slowly reclaimed during
50 > * periods of non-use, and reinstated upon subsequent use).  The
51 > * common pool is by default constructed with default parameters, but
52 > * these may be controlled by setting any or all of the three
53 > * properties {@code
54 > * java.util.concurrent.ForkJoinPool.common.{parallelism,
55 > * threadFactory, exceptionHandler}}.
56 > *
57 > * <p>For applications that require separate or custom pools, a {@code
58 > * ForkJoinPool} may be constructed with a given target parallelism
59 > * level; by default, equal to the number of available processors. The
60 > * pool attempts to maintain enough active (or available) threads by
61 > * dynamically adding, suspending, or resuming internal worker
62 > * threads, even if some tasks are stalled waiting to join
63 > * others. However, no such adjustments are guaranteed in the face of
64 > * blocked IO or other unmanaged synchronization. The nested {@link
65 > * ManagedBlocker} interface enables extension of the kinds of
66   * synchronization accommodated.
67   *
68   * <p>In addition to execution and lifecycle control methods, this
# Line 93 | Line 107 | import java.util.concurrent.locks.Condit
107   *  </tr>
108   * </table>
109   *
96 * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
97 * used for all parallel task execution in a program or subsystem.
98 * Otherwise, use would not usually outweigh the construction and
99 * bookkeeping overhead of creating a large set of threads. For
100 * example, a common pool could be used for the {@code SortTasks}
101 * illustrated in {@link RecursiveAction}. Because {@code
102 * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
103 * daemon} mode, there is typically no need to explicitly {@link
104 * #shutdown} such a pool upon program exit.
105 *
106 *  <pre> {@code
107 * static final ForkJoinPool mainPool = new ForkJoinPool();
108 * ...
109 * public void sort(long[] array) {
110 *   mainPool.invoke(new SortTask(array, 0, array.length));
111 * }}</pre>
112 *
110   * <p><b>Implementation notes</b>: This implementation restricts the
111   * maximum number of running threads to 32767. Attempts to create
112   * pools with greater than the maximum number result in
# Line 320 | Line 317 | public class ForkJoinPool extends Abstra
317       *
318       * Trimming workers. To release resources after periods of lack of
319       * use, a worker starting to wait when the pool is quiescent will
320 <     * time out and terminate if the pool has remained quiescent for
321 <     * SHRINK_RATE nanosecs. This will slowly propagate, eventually
322 <     * terminating all workers after long periods of non-use.
320 >     * time out and terminate if the pool has remained quiescent for a
321 >     * given period -- a short period if there are more threads than
322 >     * parallelism, longer as the number of threads decreases. This
323 >     * will slowly propagate, eventually terminating all workers after
324 >     * periods of non-use.
325       *
326       * Shutdown and Termination. A call to shutdownNow atomically sets
327       * a runState bit and then (non-atomically) sets each worker's
# Line 629 | Line 628 | public class ForkJoinPool extends Abstra
628          final ForkJoinPool pool;   // the containing pool (may be null)
629          final ForkJoinWorkerThread owner; // owning thread or null if shared
630          volatile Thread parker;    // == owner during call to park; else null
631 <        ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
631 >        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
632          ForkJoinTask<?> currentSteal; // current non-local task being executed
633          // Heuristic padding to ameliorate unfortunate memory placements
634          Object p00, p01, p02, p03, p04, p05, p06, p07;
# Line 813 | Line 812 | public class ForkJoinPool extends Abstra
812          }
813  
814          /**
815 +         * Version of tryUnpush for shared queues; called by non-FJ
816 +         * submitters. Conservatively fails to unpush if all workers
817 +         * are active unless there are multiple tasks in queue.
818 +         */
819 +        final boolean trySharedUnpush(ForkJoinTask<?> task, ForkJoinPool p) {
820 +            boolean success = false;
821 +            if (task != null && top != base && runState == 0 &&
822 +                U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
823 +                try {
824 +                    ForkJoinTask<?>[] a; int n, s;
825 +                    if ((a = array) != null && (n = (s = top) - base) > 0 &&
826 +                        (n > 1 || p == null || (int)(p.ctl >> AC_SHIFT) < 0)) {
827 +                        int j = (((a.length - 1) & --s) << ASHIFT) + ABASE;
828 +                        if (U.getObjectVolatile(a, j) == task &&
829 +                            U.compareAndSwapObject(a, j, task, null)) {
830 +                            top = s;
831 +                            success = true;
832 +                        }
833 +                    }
834 +                } finally {
835 +                    runState = 0;                         // unlock
836 +                }
837 +            }
838 +            return success;
839 +        }
840 +
841 +        /**
842           * Polls the given task only if it is at the current base.
843           */
844          final boolean pollFor(ForkJoinTask<?> task) {
# Line 920 | Line 946 | public class ForkJoinPool extends Abstra
946           * any other cancelled task. Returns (true) immediately on any CAS
947           * or consistency check failure so caller can retry.
948           *
949 <         * @return false if no progress can be made
949 >         * @return 0 if no progress can be made, else positive
950 >         * (this unusual convention simplifies use with tryHelpStealer.)
951           */
952 <        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
953 <            boolean removed = false, empty = true, progress = true;
952 >        final int tryRemoveAndExec(ForkJoinTask<?> task) {
953 >            int stat = 1;
954 >            boolean removed = false, empty = true;
955              ForkJoinTask<?>[] a; int m, s, b, n;
956              if ((a = array) != null && (m = a.length - 1) >= 0 &&
957                  (n = (s = top) - (b = base)) > 0) {
# Line 953 | Line 981 | public class ForkJoinPool extends Abstra
981                      }
982                      if (--n == 0) {
983                          if (!empty && base == b)
984 <                            progress = false;
984 >                            stat = 0;
985                          break;
986                      }
987                  }
988              }
989              if (removed)
990                  task.doExec();
991 <            return progress;
991 >            return stat;
992          }
993  
994          /**
# Line 1102 | Line 1130 | public class ForkJoinPool extends Abstra
1130      private static final RuntimePermission modifyThreadPermission;
1131  
1132      /**
1133 <     * Per-thread submission bookeeping. Shared across all pools
1133 >     * Per-thread submission bookkeeping. Shared across all pools
1134       * to reduce ThreadLocal pollution and because random motion
1135       * to avoid contention in one pool is likely to hold for others.
1136       */
1137      private static final ThreadSubmitter submitters;
1138  
1139 +    /** Common default pool */
1140 +    static volatile ForkJoinPool commonPool;
1141 +
1142 +    // commonPool construction parameters
1143 +    private static final String propPrefix =
1144 +        "java.util.concurrent.ForkJoinPool.common.";
1145 +    private static final Thread.UncaughtExceptionHandler commonPoolUEH;
1146 +    private static final ForkJoinWorkerThreadFactory commonPoolFactory;
1147 +    static final int commonPoolParallelism;
1148 +
1149 +    /** Static initialization lock */
1150 +    private static final Mutex initializationLock;
1151 +
1152      // static constants
1153  
1154      /**
1155 <     * The wakeup interval (in nanoseconds) for a worker waiting for a
1156 <     * task when the pool is quiescent to instead try to shrink the
1157 <     * number of workers.  The exact value does not matter too
1117 <     * much. It must be short enough to release resources during
1118 <     * sustained periods of idleness, but not so short that threads
1119 <     * are continually re-created.
1155 >     * Initial timeout value (in nanoseconds) for the tread triggering
1156 >     * quiescence to park waiting for new work. On timeout, the thread
1157 >     * will instead try to shrink the number of workers.
1158       */
1159 <    private static final long SHRINK_RATE =
1122 <        4L * 1000L * 1000L * 1000L; // 4 seconds
1159 >    private static final long IDLE_TIMEOUT      = 1000L * 1000L * 1000L; // 1sec
1160  
1161      /**
1162 <     * The timeout value for attempted shrinkage, includes
1126 <     * some slop to cope with system timer imprecision.
1162 >     * Timeout value when there are more threads than parallelism level
1163       */
1164 <    private static final long SHRINK_TIMEOUT = SHRINK_RATE - (SHRINK_RATE / 10);
1164 >    private static final long FAST_IDLE_TIMEOUT =  100L * 1000L * 1000L;
1165  
1166      /**
1167       * The maximum stolen->joining link depth allowed in method
# Line 1138 | Line 1174 | public class ForkJoinPool extends Abstra
1174       * traversal parameters at the expense of sometimes blocking when
1175       * we could be helping.
1176       */
1177 <    private static final int MAX_HELP = 32;
1177 >    private static final int MAX_HELP = 64;
1178  
1179      /**
1180       * Secondary time-based bound (in nanosecs) for helping attempts
# Line 1148 | Line 1184 | public class ForkJoinPool extends Abstra
1184       * value should roughly approximate the time required to create
1185       * and/or activate a worker thread.
1186       */
1187 <    private static final long COMPENSATION_DELAY = 100L * 1000L; // 0.1 millisec
1187 >    private static final long COMPENSATION_DELAY = 1L << 18; // ~0.25 millisec
1188  
1189      /**
1190       * Increment for seed generators. See class ThreadLocal for
# Line 1258 | Line 1294 | public class ForkJoinPool extends Abstra
1294      final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
1295      final AtomicLong stealCount;               // collect counts when terminated
1296      final AtomicInteger nextWorkerNumber;      // to create worker name string
1297 <    final String workerNamePrefix;             // to create worker name string
1297 >    String workerNamePrefix;                   // to create worker name string
1298  
1299      //  Creating, registering, and deregistering workers
1300  
# Line 1299 | Line 1335 | public class ForkJoinPool extends Abstra
1335       *
1336       * @param w the worker's queue
1337       */
1302
1338      final void registerWorker(WorkQueue w) {
1339          Mutex lock = this.lock;
1340          lock.lock();
1341          try {
1342              WorkQueue[] ws = workQueues;
1343              if (w != null && ws != null) {          // skip on shutdown/failure
1344 <                int rs, n =  ws.length, m = n - 1;
1344 >                int rs, n = ws.length, m = n - 1;
1345                  int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence
1346                  w.seed = (s == 0) ? 1 : s;          // ensure non-zero seed
1347                  int r = (s << 1) | 1;               // use odd-numbered indices
# Line 1374 | Line 1409 | public class ForkJoinPool extends Abstra
1409              U.throwException(ex);
1410      }
1411  
1377
1412      // Submissions
1413  
1414      /**
# Line 1422 | Line 1456 | public class ForkJoinPool extends Abstra
1456          }
1457      }
1458  
1459 +    /**
1460 +     * Submits the given (non-null) task to the common pool, if possible.
1461 +     */
1462 +    static void submitToCommonPool(ForkJoinTask<?> task) {
1463 +        ForkJoinPool p;
1464 +        if ((p = commonPool) == null)
1465 +            p = ensureCommonPool();
1466 +        p.doSubmit(task);
1467 +    }
1468 +
1469 +    /**
1470 +     * Returns true if the given task was submitted to common pool
1471 +     * and has not yet commenced execution, and is available for
1472 +     * removal according to execution policies; if so removing the
1473 +     * submission from the pool.
1474 +     *
1475 +     * @param task the task
1476 +     * @return true if successful
1477 +     */
1478 +    static boolean tryUnsubmitFromCommonPool(ForkJoinTask<?> task) {
1479 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1480 +        int k = submitters.get().seed & SQMASK;
1481 +        return ((p = commonPool) != null &&
1482 +                (ws = p.workQueues) != null &&
1483 +                ws.length > (k &= p.submitMask) &&
1484 +                (q = ws[k]) != null &&
1485 +                q.trySharedUnpush(task, p));
1486 +    }
1487 +
1488      // Maintaining ctl counts
1489  
1490      /**
# Line 1433 | Line 1496 | public class ForkJoinPool extends Abstra
1496      }
1497  
1498      /**
1499 <     * Tries to activate or create a worker if too few are active.
1499 >     * Tries to create one or activate one or more workers if too few are active.
1500       */
1501      final void signalWork() {
1502          long c; int u;
# Line 1517 | Line 1580 | public class ForkJoinPool extends Abstra
1580       * awaiting signal,
1581       *
1582       * @param w the worker (via its WorkQueue)
1583 <     * @return a task or null of none found
1583 >     * @return a task or null if none found
1584       */
1585      private final ForkJoinTask<?> scan(WorkQueue w) {
1586          WorkQueue[] ws;                       // first update random seed
# Line 1534 | Line 1597 | public class ForkJoinPool extends Abstra
1597                      t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1598                      if (q.base == b && ec >= 0 && t != null &&
1599                          U.compareAndSwapObject(a, i, t, null)) {
1600 <                        q.base = b + 1;       // specialization of pollAt
1600 >                        if (q.top - (q.base = b + 1) > 0)
1601 >                            signalWork();    // help pushes signal
1602                          return t;
1603                      }
1604                      else if (ec < 0 || j <= m) {
# Line 1545 | Line 1609 | public class ForkJoinPool extends Abstra
1609                  if (--j < 0)
1610                      break;
1611              }
1612 +
1613              long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns;
1614              if (e < 0)                        // decode ctl on empty scan
1615                  w.runState = -1;              // pool is terminating
# Line 1578 | Line 1643 | public class ForkJoinPool extends Abstra
1643                  }
1644              }
1645              else if (w.eventCount < 0) {      // already queued
1646 <                if ((nr = w.rescans) > 0) {   // continue rescanning
1647 <                    int ac = a + parallelism;
1648 <                    if (((w.rescans = (ac < nr) ? ac : nr - 1) & 3) == 0)
1649 <                        Thread.yield();       // yield before block
1585 <                }
1586 <                else {
1646 >                int ac = a + parallelism;
1647 >                if ((nr = w.rescans) > 0)     // continue rescanning
1648 >                    w.rescans = (ac < nr) ? ac : nr - 1;
1649 >                else if (((w.seed >>> 16) & ac) == 0) { // randomize park
1650                      Thread.interrupted();     // clear status
1651                      Thread wt = Thread.currentThread();
1652                      U.putObject(wt, PARKBLOCKER, this);
# Line 1601 | Line 1664 | public class ForkJoinPool extends Abstra
1664      /**
1665       * If inactivating worker w has caused the pool to become
1666       * quiescent, checks for pool termination, and, so long as this is
1667 <     * not the only worker, waits for event for up to SHRINK_RATE
1668 <     * nanosecs.  On timeout, if ctl has not changed, terminates the
1667 >     * not the only worker, waits for event for up to a given
1668 >     * duration.  On timeout, if ctl has not changed, terminates the
1669       * worker, which will in turn wake up another worker to possibly
1670       * repeat this process.
1671       *
# Line 1613 | Line 1676 | public class ForkJoinPool extends Abstra
1676      private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1677          if (w.eventCount < 0 && !tryTerminate(false, false) &&
1678              (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) {
1679 +            int dc = -(short)(currentCtl >>> TC_SHIFT);
1680 +            long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1681 +            long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop
1682              Thread wt = Thread.currentThread();
1617            Thread.yield();            // yield before block
1683              while (ctl == currentCtl) {
1619                long startTime = System.nanoTime();
1684                  Thread.interrupted();  // timed variant of version in scan()
1685                  U.putObject(wt, PARKBLOCKER, this);
1686                  w.parker = wt;
1687                  if (ctl == currentCtl)
1688 <                    U.park(false, SHRINK_RATE);
1688 >                    U.park(false, parkTime);
1689                  w.parker = null;
1690                  U.putObject(wt, PARKBLOCKER, null);
1691                  if (ctl != currentCtl)
1692                      break;
1693 <                if (System.nanoTime() - startTime >= SHRINK_TIMEOUT &&
1693 >                if (deadline - System.nanoTime() <= 0L &&
1694                      U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
1695                      w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
1696                      w.runState = -1;   // shrink
# Line 1647 | Line 1711 | public class ForkJoinPool extends Abstra
1711       * leaves hints in workers to speed up subsequent calls. The
1712       * implementation is very branchy to cope with potential
1713       * inconsistencies or loops encountering chains that are stale,
1714 <     * unknown, or so long that they are likely cyclic.  All of these
1651 <     * cases are dealt with by just retrying by caller.
1714 >     * unknown, or so long that they are likely cyclic.
1715       *
1716       * @param joiner the joining worker
1717       * @param task the task to join
1718 <     * @return true if found or ran a task (and so is immediately retryable)
1718 >     * @return 0 if no progress can be made, negative if task
1719 >     * known complete, else positive
1720       */
1721 <    private boolean tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
1722 <        WorkQueue[] ws;
1723 <        int m, depth = MAX_HELP;                // remaining chain depth
1724 <        boolean progress = false;
1725 <        if ((ws = workQueues) != null && (m = ws.length - 1) > 0 &&
1726 <            task.status >= 0) {
1727 <            ForkJoinTask<?> subtask = task;     // current target
1728 <            outer: for (WorkQueue j = joiner;;) {
1729 <                WorkQueue stealer = null;       // find stealer of subtask
1730 <                WorkQueue v = ws[j.stealHint & m]; // try hint
1731 <                if (v != null && v.currentSteal == subtask)
1732 <                    stealer = v;
1733 <                else {                          // scan
1734 <                    for (int i = 1; i <= m; i += 2) {
1735 <                        if ((v = ws[i]) != null && v.currentSteal == subtask &&
1736 <                            v != joiner) {
1737 <                            stealer = v;
1738 <                            j.stealHint = i;    // save hint
1739 <                            break;
1721 >    private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
1722 >        int stat = 0, steps = 0;                    // bound to avoid cycles
1723 >        if (joiner != null && task != null) {       // hoist null checks
1724 >            restart: for (;;) {
1725 >                ForkJoinTask<?> subtask = task;     // current target
1726 >                for (WorkQueue j = joiner, v;;) {   // v is stealer of subtask
1727 >                    WorkQueue[] ws; int m, s, h;
1728 >                    if ((s = task.status) < 0) {
1729 >                        stat = s;
1730 >                        break restart;
1731 >                    }
1732 >                    if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
1733 >                        break restart;              // shutting down
1734 >                    if ((v = ws[h = (j.stealHint | 1) & m]) == null ||
1735 >                        v.currentSteal != subtask) {
1736 >                        for (int origin = h;;) {    // find stealer
1737 >                            if (((h = (h + 2) & m) & 15) == 1 &&
1738 >                                (subtask.status < 0 || j.currentJoin != subtask))
1739 >                                continue restart;   // occasional staleness check
1740 >                            if ((v = ws[h]) != null &&
1741 >                                v.currentSteal == subtask) {
1742 >                                j.stealHint = h;    // save hint
1743 >                                break;
1744 >                            }
1745 >                            if (h == origin)
1746 >                                break restart;      // cannot find stealer
1747                          }
1748                      }
1749 <                    if (stealer == null)
1750 <                        break;
1751 <                }
1752 <
1753 <                for (WorkQueue q = stealer;;) { // try to help stealer
1754 <                    ForkJoinTask[] a; ForkJoinTask<?> t; int b;
1755 <                    if (task.status < 0)
1756 <                        break outer;
1757 <                    if ((b = q.base) - q.top < 0 && (a = q.array) != null) {
1758 <                        progress = true;
1759 <                        int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1760 <                        t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1761 <                        if (subtask.status < 0) // must recheck before taking
1762 <                            break outer;
1763 <                        if (t != null &&
1764 <                            q.base == b &&
1765 <                            U.compareAndSwapObject(a, i, t, null)) {
1766 <                            q.base = b + 1;
1767 <                            joiner.runSubtask(t);
1749 >                    for (;;) { // help stealer or descend to its stealer
1750 >                        ForkJoinTask[] a;  int b;
1751 >                        if (subtask.status < 0)     // surround probes with
1752 >                            continue restart;       //   consistency checks
1753 >                        if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
1754 >                            int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1755 >                            ForkJoinTask<?> t =
1756 >                                (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1757 >                            if (subtask.status < 0 || j.currentJoin != subtask ||
1758 >                                v.currentSteal != subtask)
1759 >                                continue restart;   // stale
1760 >                            stat = 1;               // apparent progress
1761 >                            if (t != null && v.base == b &&
1762 >                                U.compareAndSwapObject(a, i, t, null)) {
1763 >                                v.base = b + 1;     // help stealer
1764 >                                joiner.runSubtask(t);
1765 >                            }
1766 >                            else if (v.base == b && ++steps == MAX_HELP)
1767 >                                break restart;      // v apparently stalled
1768 >                        }
1769 >                        else {                      // empty -- try to descend
1770 >                            ForkJoinTask<?> next = v.currentJoin;
1771 >                            if (subtask.status < 0 || j.currentJoin != subtask ||
1772 >                                v.currentSteal != subtask)
1773 >                                continue restart;   // stale
1774 >                            else if (next == null || ++steps == MAX_HELP)
1775 >                                break restart;      // dead-end or maybe cyclic
1776 >                            else {
1777 >                                subtask = next;
1778 >                                j = v;
1779 >                                break;
1780 >                            }
1781                          }
1698                        else if (q.base == b)
1699                            break outer;        // possibly stalled
1700                    }
1701                    else {                      // descend
1702                        ForkJoinTask<?> next = stealer.currentJoin;
1703                        if (--depth <= 0 || subtask.status < 0 ||
1704                            next == null || next == subtask)
1705                            break outer;        // stale, dead-end, or cyclic
1706                        subtask = next;
1707                        j = stealer;
1708                        break;
1782                      }
1783                  }
1784              }
1785          }
1786 <        return progress;
1786 >        return stat;
1787      }
1788  
1789      /**
# Line 1811 | Line 1884 | public class ForkJoinPool extends Abstra
1884       */
1885      final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
1886          int s;
1814        ForkJoinTask<?> prevJoin = joiner.currentJoin;
1887          if ((s = task.status) >= 0) {
1888 +            ForkJoinTask<?> prevJoin = joiner.currentJoin;
1889              joiner.currentJoin = task;
1890              long startTime = 0L;
1891              for (int k = 0;;) {
1892 <                if ((joiner.isEmpty() ?                  // try to help
1893 <                     !tryHelpStealer(joiner, task) :
1894 <                     !joiner.tryRemoveAndExec(task))) {
1892 >                if ((s = (joiner.isEmpty() ?           // try to help
1893 >                          tryHelpStealer(joiner, task) :
1894 >                          joiner.tryRemoveAndExec(task))) == 0 &&
1895 >                    (s = task.status) >= 0) {
1896                      if (k == 0) {
1897                          startTime = System.nanoTime();
1898                          tryPollForAndExec(joiner, task); // check uncommon case
# Line 1827 | Line 1901 | public class ForkJoinPool extends Abstra
1901                               System.nanoTime() - startTime >=
1902                               COMPENSATION_DELAY &&
1903                               tryCompensate(task, null)) {
1904 <                        if (task.trySetSignal() && task.status >= 0) {
1904 >                        if (task.trySetSignal()) {
1905                              synchronized (task) {
1906                                  if (task.status >= 0) {
1907                                      try {                // see ForkJoinTask
# Line 1844 | Line 1918 | public class ForkJoinPool extends Abstra
1918                                       (this, CTL, c = ctl, c + AC_UNIT));
1919                      }
1920                  }
1921 <                if ((s = task.status) < 0) {
1921 >                if (s < 0 || (s = task.status) < 0) {
1922                      joiner.currentJoin = prevJoin;
1923                      break;
1924                  }
# Line 1869 | Line 1943 | public class ForkJoinPool extends Abstra
1943          while ((s = task.status) >= 0 &&
1944                 (joiner.isEmpty() ?
1945                  tryHelpStealer(joiner, task) :
1946 <                joiner.tryRemoveAndExec(task)))
1946 >                joiner.tryRemoveAndExec(task)) != 0)
1947              ;
1948          return s;
1949      }
# Line 1882 | Line 1956 | public class ForkJoinPool extends Abstra
1956       */
1957      private WorkQueue findNonEmptyStealQueue(WorkQueue w) {
1958          // Similar to loop in scan(), but ignoring submissions
1959 <        int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1959 >        int r;
1960 >        if (w == null) // allow external callers
1961 >            r = ThreadLocalRandom.current().nextInt();
1962 >        else {
1963 >            r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1964 >        }
1965          int step = (r >>> 16) | 1;
1966          for (WorkQueue[] ws;;) {
1967              int rs = runState, m;
# Line 1943 | Line 2022 | public class ForkJoinPool extends Abstra
2022      }
2023  
2024      /**
2025 +     * Restricted version of helpQuiescePool for non-FJ callers
2026 +     */
2027 +    static void externalHelpQuiescePool() {
2028 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue w, q;
2029 +        ForkJoinTask<?> t; int b;
2030 +        int k = submitters.get().seed & SQMASK;
2031 +        if ((p = commonPool) != null &&
2032 +            (ws = p.workQueues) != null &&
2033 +            ws.length > (k &= p.submitMask) &&
2034 +            (w = ws[k]) != null &&
2035 +            (q = p.findNonEmptyStealQueue(w)) != null &&
2036 +            (b = q.base) - q.top < 0 &&
2037 +            (t = q.pollAt(b)) != null)
2038 +            t.doExec();
2039 +    }
2040 +
2041 +    /**
2042       * Gets and removes a local or stolen task for the given worker.
2043       *
2044       * @return a task, if available
# Line 1975 | Line 2071 | public class ForkJoinPool extends Abstra
2071                  8);
2072      }
2073  
2074 +    /**
2075 +     * Returns approximate submission queue length for the given caller
2076 +     */
2077 +    static int getEstimatedSubmitterQueueLength() {
2078 +        ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
2079 +        int k = submitters.get().seed & SQMASK;
2080 +        return ((p = commonPool) != null &&
2081 +                p.runState >= 0 &&
2082 +                (ws = p.workQueues) != null &&
2083 +                ws.length > (k &= p.submitMask) &&
2084 +                (q = ws[k]) != null) ?
2085 +            q.queueSize() : 0;
2086 +    }
2087 +
2088      //  Termination
2089  
2090      /**
# Line 2156 | Line 2266 | public class ForkJoinPool extends Abstra
2266          lock.unlock();
2267      }
2268  
2269 +    /**
2270 +     * Returns the common pool instance
2271 +     *
2272 +     * @return the common pool instance
2273 +     */
2274 +    public static ForkJoinPool commonPool() {
2275 +        ForkJoinPool p;
2276 +        return (p = commonPool) != null? p : ensureCommonPool();
2277 +    }
2278 +
2279 +    private static ForkJoinPool ensureCommonPool() {
2280 +        ForkJoinPool p;
2281 +        if ((p = commonPool) == null) {
2282 +            final Mutex lock = initializationLock;
2283 +            lock.lock();
2284 +            try {
2285 +                if ((p = commonPool) == null) {
2286 +                    p = commonPool = new ForkJoinPool(commonPoolParallelism,
2287 +                                                      commonPoolFactory,
2288 +                                                      commonPoolUEH, false);
2289 +                    // use a more informative name string for workers
2290 +                    p.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2291 +                }
2292 +            } finally {
2293 +                lock.unlock();
2294 +            }
2295 +        }
2296 +        return p;
2297 +    }
2298 +
2299      // Execution methods
2300  
2301      /**
# Line 2329 | Line 2469 | public class ForkJoinPool extends Abstra
2469      }
2470  
2471      /**
2472 +     * Returns the targeted parallelism level of the common pool.
2473 +     *
2474 +     * @return the targeted parallelism level of the common pool
2475 +     */
2476 +    public static int getCommonPoolParallelism() {
2477 +        return commonPoolParallelism;
2478 +    }
2479 +
2480 +    /**
2481       * Returns the number of worker threads that have started but not
2482       * yet terminated.  The result returned by this method may differ
2483       * from {@link #getParallelism} when threads are created to
# Line 2580 | Line 2729 | public class ForkJoinPool extends Abstra
2729      }
2730  
2731      /**
2732 <     * Initiates an orderly shutdown in which previously submitted
2733 <     * tasks are executed, but no new tasks will be accepted.
2734 <     * Invocation has no additional effect if already shut down.
2735 <     * Tasks that are in the process of being submitted concurrently
2736 <     * during the course of this method may or may not be rejected.
2732 >     * Possibly initiates an orderly shutdown in which previously
2733 >     * submitted tasks are executed, but no new tasks will be
2734 >     * accepted. Invocation has no effect on execution state if this
2735 >     * is the {@link #commonPool}, and no additional effect if
2736 >     * already shut down.  Tasks that are in the process of being
2737 >     * submitted concurrently during the course of this method may or
2738 >     * may not be rejected.
2739       *
2740       * @throws SecurityException if a security manager exists and
2741       *         the caller is not permitted to modify threads
# Line 2593 | Line 2744 | public class ForkJoinPool extends Abstra
2744       */
2745      public void shutdown() {
2746          checkPermission();
2747 <        tryTerminate(false, true);
2747 >        if (this != commonPool)
2748 >            tryTerminate(false, true);
2749      }
2750  
2751      /**
2752 <     * Attempts to cancel and/or stop all tasks, and reject all
2753 <     * subsequently submitted tasks.  Tasks that are in the process of
2754 <     * being submitted or executed concurrently during the course of
2755 <     * this method may or may not be rejected. This method cancels
2756 <     * both existing and unexecuted tasks, in order to permit
2757 <     * termination in the presence of task dependencies. So the method
2758 <     * always returns an empty list (unlike the case for some other
2759 <     * Executors).
2752 >     * Possibly attempts to cancel and/or stop all tasks, and reject
2753 >     * all subsequently submitted tasks.  Invocation has no effect on
2754 >     * execution state if this is the {@link #commonPool}, and no
2755 >     * additional effect if already shut down. Otherwise, tasks that
2756 >     * are in the process of being submitted or executed concurrently
2757 >     * during the course of this method may or may not be
2758 >     * rejected. This method cancels both existing and unexecuted
2759 >     * tasks, in order to permit termination in the presence of task
2760 >     * dependencies. So the method always returns an empty list
2761 >     * (unlike the case for some other Executors).
2762       *
2763       * @return an empty list
2764       * @throws SecurityException if a security manager exists and
# Line 2614 | Line 2768 | public class ForkJoinPool extends Abstra
2768       */
2769      public List<Runnable> shutdownNow() {
2770          checkPermission();
2771 <        tryTerminate(true, true);
2771 >        if (this != commonPool)
2772 >            tryTerminate(true, true);
2773          return Collections.emptyList();
2774      }
2775  
# Line 2823 | Line 2978 | public class ForkJoinPool extends Abstra
2978          defaultForkJoinWorkerThreadFactory =
2979              new DefaultForkJoinWorkerThreadFactory();
2980          submitters = new ThreadSubmitter();
2981 +        initializationLock = new Mutex();
2982          int s;
2983          try {
2984              U = getUnsafe();
# Line 2841 | Line 2997 | public class ForkJoinPool extends Abstra
2997          if ((s & (s-1)) != 0)
2998              throw new Error("data type scale not a power of two");
2999          ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
3000 +
3001 +        // Establish configuration for default pool
3002 +        try {
3003 +            String pp = System.getProperty(propPrefix + "parallelism");
3004 +            String fp = System.getProperty(propPrefix + "threadFactory");
3005 +            String up = System.getProperty(propPrefix + "exceptionHandler");
3006 +            int par;
3007 +            if ((pp == null || (par = Integer.parseInt(pp)) <= 0))
3008 +                par = Runtime.getRuntime().availableProcessors();
3009 +            commonPoolParallelism = par;
3010 +            if (fp != null)
3011 +                commonPoolFactory = (ForkJoinWorkerThreadFactory)
3012 +                    ClassLoader.getSystemClassLoader().loadClass(fp).newInstance();
3013 +            else
3014 +                commonPoolFactory = defaultForkJoinWorkerThreadFactory;
3015 +            if (up != null)
3016 +                commonPoolUEH = (Thread.UncaughtExceptionHandler)
3017 +                    ClassLoader.getSystemClassLoader().loadClass(up).newInstance();
3018 +            else
3019 +                commonPoolUEH = null;
3020 +        } catch (Exception e) {
3021 +            throw new Error(e);
3022 +        }
3023      }
3024  
3025      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines