ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166e/ForkJoinPool.java (file contents):
Revision 1.7 by dl, Sun Oct 28 22:35:45 2012 UTC vs.
Revision 1.8 by dl, Mon Oct 29 17:23:26 2012 UTC

# Line 44 | Line 44 | import java.util.concurrent.locks.Condit
44   * tasks that are never joined.
45   *
46   * <p>A static {@link #commonPool} is available and appropriate for
47 < * most applications. The common pool is constructed upon first
48 < * access, or upon usage by any ForkJoinTask that is not explictly
49 < * submitted to a specified pool. Using the common pool normally
50 < * reduces resource usage (its threads are slowly reclaimed during
51 < * periods of non-use, and reinstated upon subsequent use).  The
52 < * common pool is by default constructed with default parameters, but
53 < * these may be controlled by setting any or all of the three
54 < * properties {@code
47 > * most applications. The common pool is used by any ForkJoinTask that
48 > * is not explicitly submitted to a specified pool. Using the common
49 > * pool normally reduces resource usage (its threads are slowly
50 > * reclaimed during periods of non-use, and reinstated upon subsequent
51 > * use).  The common pool is by default constructed with default
52 > * parameters, but these may be controlled by setting any or all of
53 > * the three properties {@code
54   * java.util.concurrent.ForkJoinPool.common.{parallelism,
55   * threadFactory, exceptionHandler}}.
56   *
# Line 237 | Line 236 | public class ForkJoinPool extends Abstra
236       * when locked remains available to check consistency.
237       *
238       * Recording WorkQueues.  WorkQueues are recorded in the
239 <     * "workQueues" array that is created upon pool construction and
240 <     * expanded if necessary.  Updates to the array while recording
241 <     * new workers and unrecording terminated ones are protected from
242 <     * each other by a lock but the array is otherwise concurrently
243 <     * readable, and accessed directly.  To simplify index-based
244 <     * operations, the array size is always a power of two, and all
245 <     * readers must tolerate null slots. Shared (submission) queues
246 <     * are at even indices, worker queues at odd indices. Grouping
247 <     * them together in this way simplifies and speeds up task
249 <     * scanning.
239 >     * "workQueues" array that is created upon first use and expanded
240 >     * if necessary.  Updates to the array while recording new workers
241 >     * and unrecording terminated ones are protected from each other
242 >     * by a lock but the array is otherwise concurrently readable, and
243 >     * accessed directly.  To simplify index-based operations, the
244 >     * array size is always a power of two, and all readers must
245 >     * tolerate null slots. Shared (submission) queues are at even
246 >     * indices, worker queues at odd indices. Grouping them together
247 >     * in this way simplifies and speeds up task scanning.
248       *
249       * All worker thread creation is on-demand, triggered by task
250       * submissions, replacement of terminated workers, and/or
# Line 504 | Line 502 | public class ForkJoinPool extends Abstra
502      }
503  
504      /**
507     * A simple non-reentrant lock used for exclusion when managing
508     * queues and workers. We use a custom lock so that we can readily
509     * probe lock state in constructions that check among alternative
510     * actions. The lock is normally only very briefly held, and
511     * sometimes treated as a spinlock, but other usages block to
512     * reduce overall contention in those cases where locked code
513     * bodies perform allocation/resizing.
514     */
515    static final class Mutex extends AbstractQueuedSynchronizer {
516        public final boolean tryAcquire(int ignore) {
517            return compareAndSetState(0, 1);
518        }
519        public final boolean tryRelease(int ignore) {
520            setState(0);
521            return true;
522        }
523        public final void lock() { acquire(0); }
524        public final void unlock() { release(0); }
525        public final boolean isHeldExclusively() { return getState() == 1; }
526        public final Condition newCondition() { return new ConditionObject(); }
527    }
528
529    /**
505       * Class for artificial tasks that are used to replace the target
506       * of local joins if they are removed from an interior queue slot
507       * in WorkQueue.tryRemoveAndExec. We don't need the proxy to
# Line 814 | Line 789 | public class ForkJoinPool extends Abstra
789  
790          /**
791           * Version of tryUnpush for shared queues; called by non-FJ
792 <         * submitters. Conservatively fails to unpush if all workers
818 <         * are active unless there are multiple tasks in queue.
792 >         * submitters after prechecking that task probably exists.
793           */
794 <        final boolean trySharedUnpush(ForkJoinTask<?> task, ForkJoinPool p) {
794 >        final boolean trySharedUnpush(ForkJoinTask<?> t) {
795              boolean success = false;
796 <            if (task != null && top != base && runState == 0 &&
823 <                U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
796 >            if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
797                  try {
798 <                    ForkJoinTask<?>[] a; int n, s;
799 <                    if ((a = array) != null && (n = (s = top) - base) > 0 &&
800 <                        (n > 1 || p == null || (int)(p.ctl >> AC_SHIFT) < 0)) {
801 <                        int j = (((a.length - 1) & --s) << ASHIFT) + ABASE;
802 <                        if (U.getObjectVolatile(a, j) == task &&
803 <                            U.compareAndSwapObject(a, j, task, null)) {
831 <                            top = s;
832 <                            success = true;
833 <                        }
798 >                    ForkJoinTask<?>[] a; int s;
799 >                    if ((a = array) != null && (s = top) != base &&
800 >                        U.compareAndSwapObject
801 >                        (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
802 >                        top = s;
803 >                        success = true;
804                      }
805                  } finally {
806                      runState = 0;                         // unlock
# Line 1113 | Line 1083 | public class ForkJoinPool extends Abstra
1083      public static final ForkJoinWorkerThreadFactory
1084          defaultForkJoinWorkerThreadFactory;
1085  
1086 +
1087 +    /** Property prefix for constructing common pool */
1088 +    private static final String propPrefix =
1089 +        "java.util.concurrent.ForkJoinPool.common.";
1090 +
1091 +    /**
1092 +     * Common (static) pool. Non-null for public use unless a static
1093 +     * construction exception, but internal usages must null-check on
1094 +     * use.
1095 +     */
1096 +    static final ForkJoinPool commonPool;
1097 +
1098 +    /**
1099 +     * Common pool parallelism. Must equal commonPool.parallelism.
1100 +     */
1101 +    static final int commonPoolParallelism;
1102 +
1103      /**
1104       * Generator for assigning sequence numbers as pool names.
1105       */
# Line 1137 | Line 1124 | public class ForkJoinPool extends Abstra
1124       */
1125      private static final ThreadSubmitter submitters;
1126  
1140    /** Common default pool */
1141    static volatile ForkJoinPool commonPool;
1142
1143    // commonPool construction parameters
1144    private static final String propPrefix =
1145        "java.util.concurrent.ForkJoinPool.common.";
1146    private static final Thread.UncaughtExceptionHandler commonPoolUEH;
1147    private static final ForkJoinWorkerThreadFactory commonPoolFactory;
1148    static final int commonPoolParallelism;
1149
1150    /** Static initialization lock */
1151    private static final Mutex initializationLock;
1152
1127      // static constants
1128  
1129      /**
1130 <     * Initial timeout value (in nanoseconds) for the tread triggering
1130 >     * Initial timeout value (in nanoseconds) for the thread triggering
1131       * quiescence to park waiting for new work. On timeout, the thread
1132       * will instead try to shrink the number of workers.
1133       */
# Line 1282 | Line 1256 | public class ForkJoinPool extends Abstra
1256       * empirically works OK on current JVMs.
1257       */
1258  
1259 +    volatile long stealCount;                  // collects worker counts
1260      volatile long ctl;                         // main pool control
1261      final int parallelism;                     // parallelism level
1262      final int localMode;                       // per-worker scheduling mode
1263 +    volatile int nextWorkerNumber;             // to create worker name string
1264      final int submitMask;                      // submit queue index bound
1265      int nextSeed;                              // for initializing worker seeds
1266 +    volatile int mainLock;                     // spinlock for array updates
1267      volatile int runState;                     // shutdown status and seq
1268      WorkQueue[] workQueues;                    // main registry
1292    final Mutex lock;                          // for registration
1293    final Condition termination;               // for awaitTermination
1269      final ForkJoinWorkerThreadFactory factory; // factory for new workers
1270      final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
1271 <    final AtomicLong stealCount;               // collect counts when terminated
1272 <    final AtomicInteger nextWorkerNumber;      // to create worker name string
1273 <    String workerNamePrefix;                   // to create worker name string
1271 >    final String workerNamePrefix;             // to create worker name string
1272 >
1273 >    /*
1274 >     * Mechanics for main lock protecting worker array updates.  Uses
1275 >     * the same strategy as ConcurrentHashMap bins -- a spinLock for
1276 >     * normal cases, but falling back to builtin lock when (rarely)
1277 >     * needed.  See internal ConcurrentHashMap documentation for
1278 >     * explanation.
1279 >     */
1280 >
1281 >    static final int LOCK_WAITING = 2; // bit to indicate need for signal
1282 >    static final int MAX_LOCK_SPINS = 1 << 8;
1283 >
1284 >    private void tryAwaitMainLock() {
1285 >        int spins = MAX_LOCK_SPINS, r = 0, h;
1286 >        while (((h = mainLock) & 1) != 0) {
1287 >            if (r == 0)
1288 >                r = ThreadLocalRandom.current().nextInt(); // randomize spins
1289 >            else if (spins >= 0) {
1290 >                r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift
1291 >                if (r >= 0)
1292 >                    --spins;
1293 >            }
1294 >            else if (U.compareAndSwapInt(this, MAINLOCK, h, h | LOCK_WAITING)) {
1295 >                synchronized (this) {
1296 >                    if ((mainLock & LOCK_WAITING) != 0) {
1297 >                        try {
1298 >                            wait();
1299 >                        } catch (InterruptedException ie) {
1300 >                            Thread.currentThread().interrupt();
1301 >                        }
1302 >                    }
1303 >                    else
1304 >                        notifyAll(); // possibly won race vs signaller
1305 >                }
1306 >                break;
1307 >            }
1308 >        }
1309 >    }
1310  
1311      //  Creating, registering, and deregistering workers
1312  
# Line 1323 | Line 1334 | public class ForkJoinPool extends Abstra
1334       * ForkJoinWorkerThread.
1335       */
1336      final String nextWorkerName() {
1337 <        return workerNamePrefix.concat
1338 <            (Integer.toString(nextWorkerNumber.addAndGet(1)));
1337 >        int n;
1338 >        do {} while(!U.compareAndSwapInt(this, NEXTWORKERNUMBER,
1339 >                                         n = nextWorkerNumber, ++n));
1340 >        return workerNamePrefix.concat(Integer.toString(n));
1341      }
1342  
1343      /**
# Line 1337 | Line 1350 | public class ForkJoinPool extends Abstra
1350       * @param w the worker's queue
1351       */
1352      final void registerWorker(WorkQueue w) {
1353 <        Mutex lock = this.lock;
1354 <        lock.lock();
1353 >        while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1))
1354 >            tryAwaitMainLock();
1355          try {
1356 <            WorkQueue[] ws = workQueues;
1357 <            if (w != null && ws != null) {          // skip on shutdown/failure
1356 >            WorkQueue[] ws;
1357 >            if ((ws = workQueues) == null)
1358 >                ws = workQueues = new WorkQueue[submitMask + 1];
1359 >            if (w != null) {
1360                  int rs, n =  ws.length, m = n - 1;
1361                  int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence
1362                  w.seed = (s == 0) ? 1 : s;          // ensure non-zero seed
# Line 1362 | Line 1377 | public class ForkJoinPool extends Abstra
1377                  runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN);
1378              }
1379          } finally {
1380 <            lock.unlock();
1380 >            if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) {
1381 >                mainLock = 0;
1382 >                synchronized (this) { notifyAll(); };
1383 >            }
1384          }
1385 +
1386      }
1387  
1388      /**
# Line 1376 | Line 1395 | public class ForkJoinPool extends Abstra
1395       * @param ex the exception causing failure, or null if none
1396       */
1397      final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1379        Mutex lock = this.lock;
1398          WorkQueue w = null;
1399          if (wt != null && (w = wt.workQueue) != null) {
1400              w.runState = -1;                // ensure runState is set
1401 <            stealCount.getAndAdd(w.totalSteals + w.nsteals);
1401 >            long steals = w.totalSteals + w.nsteals, sc;
1402 >            do {} while(!U.compareAndSwapLong(this, STEALCOUNT,
1403 >                                              sc = stealCount, sc + steals));
1404              int idx = w.poolIndex;
1405 <            lock.lock();
1406 <            try {                           // remove record from array
1405 >            while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1))
1406 >                tryAwaitMainLock();
1407 >            try {
1408                  WorkQueue[] ws = workQueues;
1409                  if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
1410                      ws[idx] = null;
1411              } finally {
1412 <                lock.unlock();
1412 >                if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) {
1413 >                    mainLock = 0;
1414 >                    synchronized (this) { notifyAll(); };
1415 >                }
1416              }
1417          }
1418  
# Line 1427 | Line 1451 | public class ForkJoinPool extends Abstra
1451          for (int r = s.seed, m = submitMask;;) {
1452              WorkQueue[] ws; WorkQueue q;
1453              int k = r & m & SQMASK;          // use only even indices
1454 <            if (runState < 0 || (ws = workQueues) == null || ws.length <= k)
1454 >            if (runState < 0)
1455                  throw new RejectedExecutionException(); // shutting down
1456 +            else if ((ws = workQueues) == null || ws.length <= k) {
1457 +                while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1))
1458 +                    tryAwaitMainLock();
1459 +                try {
1460 +                    if (workQueues == null)
1461 +                        workQueues = new WorkQueue[submitMask + 1];
1462 +                } finally {
1463 +                    if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) {
1464 +                        mainLock = 0;
1465 +                        synchronized (this) { notifyAll(); };
1466 +                    }
1467 +                }
1468 +            }
1469              else if ((q = ws[k]) == null) {  // create new queue
1470                  WorkQueue nq = new WorkQueue(this, null, SHARED_QUEUE);
1471 <                Mutex lock = this.lock;      // construct outside lock
1472 <                lock.lock();
1473 <                try {                        // recheck under lock
1471 >                while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1))
1472 >                    tryAwaitMainLock();
1473 >                try {
1474                      int rs = runState;       // to update seq
1475                      if (ws == workQueues && ws[k] == null) {
1476                          ws[k] = nq;
1477                          runState = ((rs & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN));
1478                      }
1479                  } finally {
1480 <                    lock.unlock();
1480 >                    if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) {
1481 >                        mainLock = 0;
1482 >                        synchronized (this) { notifyAll(); };
1483 >                    }
1484                  }
1485              }
1486              else if (q.trySharedPush(task)) {
# Line 1463 | Line 1503 | public class ForkJoinPool extends Abstra
1503      static void submitToCommonPool(ForkJoinTask<?> task) {
1504          ForkJoinPool p;
1505          if ((p = commonPool) == null)
1506 <            p = ensureCommonPool();
1506 >            throw new RejectedExecutionException("Common Pool Unavailable");
1507          p.doSubmit(task);
1508      }
1509  
# Line 1477 | Line 1517 | public class ForkJoinPool extends Abstra
1517       * @return true if successful
1518       */
1519      static boolean tryUnsubmitFromCommonPool(ForkJoinTask<?> task) {
1520 +        // Peek, looking for task and eligibility before
1521 +        // using trySharedUnpush to actually take it under lock
1522          ForkJoinPool p; WorkQueue[] ws; WorkQueue q;
1523 +        ForkJoinTask<?>[] a; int t, s, n;
1524          int k = submitters.get().seed & SQMASK;
1525          return ((p = commonPool) != null &&
1526                  (ws = p.workQueues) != null &&
1527                  ws.length > (k &= p.submitMask) &&
1528                  (q = ws[k]) != null &&
1529 <                q.trySharedUnpush(task, p));
1529 >                (a = q.array) != null &&
1530 >                (n = (t = q.top) - q.base) > 0 &&
1531 >                (n > 1 || (int)(p.ctl >> AC_SHIFT) < 0) &&
1532 >                (s = t - 1) >= 0 && s < a.length && a[s] == task &&
1533 >                q.trySharedUnpush(task));
1534      }
1535  
1536      // Maintaining ctl counts
# Line 1957 | Line 2004 | public class ForkJoinPool extends Abstra
2004       */
2005      private WorkQueue findNonEmptyStealQueue(WorkQueue w) {
2006          // Similar to loop in scan(), but ignoring submissions
2007 <        int r;
1961 <        if (w == null) // allow external callers
1962 <            r = ThreadLocalRandom.current().nextInt();
1963 <        else {
1964 <            r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1965 <        }
2007 >        int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
2008          int step = (r >>> 16) | 1;
2009          for (WorkQueue[] ws;;) {
2010              int rs = runState, m;
# Line 2103 | Line 2145 | public class ForkJoinPool extends Abstra
2145       * @return true if now terminating or terminated
2146       */
2147      private boolean tryTerminate(boolean now, boolean enable) {
2106        Mutex lock = this.lock;
2148          for (long c;;) {
2149              if (((c = ctl) & STOP_BIT) != 0) {      // already terminating
2150                  if ((short)(c >>> TC_SHIFT) == -parallelism) {
2151 <                    lock.lock();                    // don't need try/finally
2152 <                    termination.signalAll();        // signal when 0 workers
2153 <                    lock.unlock();
2151 >                    synchronized(this) {
2152 >                        notifyAll();                // signal when 0 workers
2153 >                    }
2154                  }
2155                  return true;
2156              }
2157              if (runState >= 0) {                    // not yet enabled
2158                  if (!enable)
2159                      return false;
2160 <                lock.lock();
2161 <                runState |= SHUTDOWN;
2162 <                lock.unlock();
2160 >                while (!U.compareAndSwapInt(this, MAINLOCK, 0, 1))
2161 >                    tryAwaitMainLock();
2162 >                try {
2163 >                    runState |= SHUTDOWN;
2164 >                } finally {
2165 >                    if (!U.compareAndSwapInt(this, MAINLOCK, 1, 0)) {
2166 >                        mainLock = 0;
2167 >                        synchronized (this) { notifyAll(); };
2168 >                    }
2169 >                }
2170              }
2171              if (!now) {                             // check if idle & no tasks
2172                  if ((int)(c >> AC_SHIFT) != -parallelism ||
# Line 2251 | Line 2299 | public class ForkJoinPool extends Abstra
2299          // Use nearest power 2 for workQueues size. See Hackers Delight sec 3.2.
2300          int n = parallelism - 1;
2301          n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2302 <        int size = (n + 1) << 1;        // #slots = 2*#workers
2255 <        this.submitMask = size - 1;     // room for max # of submit queues
2256 <        this.workQueues = new WorkQueue[size];
2257 <        this.termination = (this.lock = new Mutex()).newCondition();
2258 <        this.stealCount = new AtomicLong();
2259 <        this.nextWorkerNumber = new AtomicInteger();
2302 >        this.submitMask = ((n + 1) << 1) - 1;
2303          int pn = poolNumberGenerator.incrementAndGet();
2304          StringBuilder sb = new StringBuilder("ForkJoinPool-");
2305          sb.append(Integer.toString(pn));
2306          sb.append("-worker-");
2307          this.workerNamePrefix = sb.toString();
2265        lock.lock();
2308          this.runState = 1;              // set init flag
2267        lock.unlock();
2309      }
2310  
2311      /**
2312 <     * Returns the common pool instance
2312 >     * Constructor for common pool, suitable only for static initialization.
2313 >     * Basically the same as above, but uses smallest possible initial footprint.
2314 >     */
2315 >    ForkJoinPool(int parallelism, int submitMask,
2316 >                 ForkJoinWorkerThreadFactory factory,
2317 >                 Thread.UncaughtExceptionHandler handler) {
2318 >        this.factory = factory;
2319 >        this.ueh = handler;
2320 >        this.submitMask = submitMask;
2321 >        this.parallelism = parallelism;
2322 >        long np = (long)(-parallelism);
2323 >        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
2324 >        this.localMode = LIFO_QUEUE;
2325 >        this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2326 >        this.runState = 1;
2327 >    }
2328 >
2329 >    /**
2330 >     * Returns the common pool instance.
2331       *
2332       * @return the common pool instance
2333       */
2334      public static ForkJoinPool commonPool() {
2335          ForkJoinPool p;
2336 <        return (p = commonPool) != null? p : ensureCommonPool();
2337 <    }
2279 <
2280 <    private static ForkJoinPool ensureCommonPool() {
2281 <        ForkJoinPool p;
2282 <        if ((p = commonPool) == null) {
2283 <            final Mutex lock = initializationLock;
2284 <            lock.lock();
2285 <            try {
2286 <                if ((p = commonPool) == null) {
2287 <                    p = commonPool = new ForkJoinPool(commonPoolParallelism,
2288 <                                                      commonPoolFactory,
2289 <                                                      commonPoolUEH, false);
2290 <                    // use a more informative name string for workers
2291 <                    p.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2292 <                }
2293 <            } finally {
2294 <                lock.unlock();
2295 <            }
2296 <        }
2336 >        if ((p = commonPool) == null)
2337 >            throw new Error("Common Pool Unavailable");
2338          return p;
2339      }
2340  
# Line 2559 | Line 2600 | public class ForkJoinPool extends Abstra
2600       * @return the number of steals
2601       */
2602      public long getStealCount() {
2603 <        long count = stealCount.get();
2603 >        long count = stealCount;
2604          WorkQueue[] ws; WorkQueue w;
2605          if ((ws = workQueues) != null) {
2606              for (int i = 1; i < ws.length; i += 2) {
# Line 2689 | Line 2730 | public class ForkJoinPool extends Abstra
2730      public String toString() {
2731          // Use a single pass through workQueues to collect counts
2732          long qt = 0L, qs = 0L; int rc = 0;
2733 <        long st = stealCount.get();
2733 >        long st = stealCount;
2734          long c = ctl;
2735          WorkQueue[] ws; WorkQueue w;
2736          if ((ws = workQueues) != null) {
# Line 2827 | Line 2868 | public class ForkJoinPool extends Abstra
2868      public boolean awaitTermination(long timeout, TimeUnit unit)
2869          throws InterruptedException {
2870          long nanos = unit.toNanos(timeout);
2871 <        final Mutex lock = this.lock;
2872 <        lock.lock();
2873 <        try {
2874 <            for (;;) {
2875 <                if (isTerminated())
2876 <                    return true;
2877 <                if (nanos <= 0)
2878 <                    return false;
2879 <                nanos = termination.awaitNanos(nanos);
2871 >        if (isTerminated())
2872 >            return true;
2873 >        long startTime = System.nanoTime();
2874 >        boolean terminated = false;
2875 >        synchronized(this) {
2876 >            for (long waitTime = nanos, millis = 0L;;) {
2877 >                if (terminated = isTerminated() ||
2878 >                    waitTime <= 0L ||
2879 >                    (millis = unit.toMillis(waitTime)) <= 0L)
2880 >                    break;
2881 >                wait(millis);
2882 >                waitTime = nanos - (System.nanoTime() - startTime);
2883              }
2840        } finally {
2841            lock.unlock();
2884          }
2885 +        return terminated;
2886      }
2887  
2888      /**
# Line 2971 | Line 3014 | public class ForkJoinPool extends Abstra
3014      private static final long PARKBLOCKER;
3015      private static final int ABASE;
3016      private static final int ASHIFT;
3017 +    private static final long NEXTWORKERNUMBER;
3018 +    private static final long STEALCOUNT;
3019 +    private static final long MAINLOCK;
3020  
3021      static {
3022          poolNumberGenerator = new AtomicInteger();
# Line 2979 | Line 3025 | public class ForkJoinPool extends Abstra
3025          defaultForkJoinWorkerThreadFactory =
3026              new DefaultForkJoinWorkerThreadFactory();
3027          submitters = new ThreadSubmitter();
2982        initializationLock = new Mutex();
3028          int s;
3029          try {
3030              U = getUnsafe();
# Line 2987 | Line 3032 | public class ForkJoinPool extends Abstra
3032              Class<?> ak = ForkJoinTask[].class;
3033              CTL = U.objectFieldOffset
3034                  (k.getDeclaredField("ctl"));
3035 +            NEXTWORKERNUMBER = U.objectFieldOffset
3036 +                (k.getDeclaredField("nextWorkerNumber"));
3037 +            STEALCOUNT = U.objectFieldOffset
3038 +                (k.getDeclaredField("stealCount"));
3039 +            MAINLOCK = U.objectFieldOffset
3040 +                (k.getDeclaredField("mainLock"));
3041              Class<?> tk = Thread.class;
3042              PARKBLOCKER = U.objectFieldOffset
3043                  (tk.getDeclaredField("parkBlocker"));
3044              ABASE = U.arrayBaseOffset(ak);
3045              s = U.arrayIndexScale(ak);
3046 +            ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
3047          } catch (Exception e) {
3048              throw new Error(e);
3049          }
3050          if ((s & (s-1)) != 0)
3051              throw new Error("data type scale not a power of two");
3052 <        ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
3001 <
3002 <        // Establish configuration for default pool
3003 <        try {
3052 >        try { // Establish common pool
3053              String pp = System.getProperty(propPrefix + "parallelism");
3054              String fp = System.getProperty(propPrefix + "threadFactory");
3055              String up = System.getProperty(propPrefix + "exceptionHandler");
3056 +            ForkJoinWorkerThreadFactory fac = (fp == null) ?
3057 +                defaultForkJoinWorkerThreadFactory :
3058 +                ((ForkJoinWorkerThreadFactory)ClassLoader.
3059 +                 getSystemClassLoader().loadClass(fp).newInstance());
3060 +            Thread.UncaughtExceptionHandler ueh = (up == null)? null :
3061 +                ((Thread.UncaughtExceptionHandler)ClassLoader.
3062 +                 getSystemClassLoader().loadClass(up).newInstance());
3063              int par;
3064              if ((pp == null || (par = Integer.parseInt(pp)) <= 0))
3065                  par = Runtime.getRuntime().availableProcessors();
3066 +            if (par > MAX_CAP)
3067 +                par = MAX_CAP;
3068              commonPoolParallelism = par;
3069 <            if (fp != null)
3070 <                commonPoolFactory = (ForkJoinWorkerThreadFactory)
3071 <                    ClassLoader.getSystemClassLoader().loadClass(fp).newInstance();
3072 <            else
3073 <                commonPoolFactory = defaultForkJoinWorkerThreadFactory;
3016 <            if (up != null)
3017 <                commonPoolUEH = (Thread.UncaughtExceptionHandler)
3018 <                    ClassLoader.getSystemClassLoader().loadClass(up).newInstance();
3019 <            else
3020 <                commonPoolUEH = null;
3069 >            int n = par - 1; // precompute submit mask
3070 >            n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
3071 >            n |= n >>> 8; n |= n >>> 16;
3072 >            int mask = ((n + 1) << 1) - 1;
3073 >            commonPool = new ForkJoinPool(par, mask, fac, ueh);
3074          } catch (Exception e) {
3075              throw new Error(e);
3076          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines