ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.204 by jsr166, Tue Jul 8 00:06:29 2014 UTC vs.
Revision 1.205 by dl, Tue Jul 8 14:17:09 2014 UTC

# Line 206 | Line 206 | public class ForkJoinPool extends Abstra
206       * indices guarantee that top == base means the queue is empty,
207       * but otherwise may err on the side of possibly making the queue
208       * appear nonempty when a push, pop, or poll have not fully
209 <     * committed. Note that this means that the poll operation,
210 <     * considered individually, is not wait-free. One thief cannot
211 <     * successfully continue until another in-progress one (or, if
212 <     * previously empty, a push) completes.  However, in the
213 <     * aggregate, we ensure at least probabilistic non-blockingness.
214 <     * If an attempted steal fails, a thief always chooses a different
215 <     * random victim target to try next. So, in order for one thief to
216 <     * progress, it suffices for any in-progress poll or new push on
217 <     * any empty queue to complete. (This is why we normally use
218 <     * method pollAt and its variants that try once at the apparent
219 <     * base index, else consider alternative actions, rather than
220 <     * method poll, which retries.)
209 >     * committed. (Method isEmpty() checks the case of a partially
210 >     * completed removal of the last element.)  Note that this means
211 >     * that the poll operation, considered individually, is not
212 >     * wait-free. One thief cannot successfully continue until another
213 >     * in-progress one (or, if previously empty, a push) completes.
214 >     * However, in the aggregate, we ensure at least probabilistic
215 >     * non-blockingness.  If an attempted steal fails, a thief always
216 >     * chooses a different random victim target to try next. So, in
217 >     * order for one thief to progress, it suffices for any
218 >     * in-progress poll or new push on any empty queue to
219 >     * complete. (This is why we normally use method pollAt and its
220 >     * variants that try once at the apparent base index, else
221 >     * consider alternative actions, rather than method poll, which
222 >     * retries.)
223       *
224       * This approach also enables support of a user mode in which
225       * local task processing is in FIFO, not LIFO order, simply by
# Line 425 | Line 427 | public class ForkJoinPool extends Abstra
427       * Signalling and activation.  Workers are created or activated
428       * only when there appears to be at least one task they might be
429       * able to find and execute.  Upon push (either by a worker or an
430 <     * external submission) to a previously empty queue, workers are
431 <     * signalled if idle, or created if fewer exist than the given
432 <     * parallelism level.  These primary signals are buttressed by
433 <     * others whenever other threads remove a task from a queue and
434 <     * notice that there are other tasks there as well.  On most
435 <     * platforms, signalling (unpark) overhead time is noticeably
436 <     * long, and the time between signalling a thread and it actually
437 <     * making progress can be very noticeably long, so it is worth
438 <     * offloading these delays from critical paths as much as
430 >     * external submission) to a previously (possibly) empty queue,
431 >     * workers are signalled if idle, or created if fewer exist than
432 >     * the given parallelism level.  These primary signals are
433 >     * buttressed by others whenever other threads remove a task from
434 >     * a queue and notice that there are other tasks there as well.
435 >     * On most platforms, signalling (unpark) overhead time is
436 >     * noticeably long, and the time between signalling a thread and
437 >     * it actually making progress can be very noticeably long, so it
438 >     * is worth offloading these delays from critical paths as much as
439       * possible. Also, because enqueued workers are often rescanning
440       * or spinning rather than blocking, we set and clear the "parker"
441       * field of WorkQueues to reduce unnecessary calls to unpark.
# Line 449 | Line 451 | public class ForkJoinPool extends Abstra
451       * Shutdown and Termination. A call to shutdownNow atomically sets
452       * a runState bit and then (non-atomically) sets each worker's
453       * qlock status, cancels all unprocessed tasks, and wakes up all
454 <     * waiting workers.  Detecting whether termination should commence
455 <     * after a non-abrupt shutdown() call relies on the active count
456 <     * bits of "ctl" maintaining consensus about quiescence.
454 >     * waiting workers (see tryTerminate).  Detecting whether
455 >     * termination should commence after a non-abrupt shutdown() call
456 >     * relies on the active count bits of "ctl" maintaining consensus
457 >     * about quiescence. However, external submitters do not take part
458 >     * in this consensus.  So, tryTerminate sweeps through submission
459 >     * queues to ensure lack of in-flight submissions before
460 >     * triggering the "STOP" phase of termination.
461       *
462       * Joining Tasks
463       * =============
# Line 534 | Line 540 | public class ForkJoinPool extends Abstra
540       * threads) in the most common case in which it is rarely
541       * beneficial: when a worker with an empty queue (thus no
542       * continuation tasks) blocks on a join and there still remain
543 <     * enough threads to ensure liveness.
543 >     * enough threads to ensure liveness. Also, whenever more than two
544 >     * spare threads are generated, they are killed (see awaitWork) at
545 >     * the next quiescent point (padding by two avoids hysteresis).
546       *
547       * Bounds. The compensation mechanism is bounded (see MAX_SPARES),
548       * to better enable JVMs to cope with programming errors and abuse
# Line 546 | Line 554 | public class ForkJoinPool extends Abstra
554       * the JVM and OS. So the number of simultaneously live threads
555       * may transiently exceed bounds.
556       *
557 +     *
558       * Common Pool
559       * ===========
560       *
# Line 793 | Line 802 | public class ForkJoinPool extends Abstra
802           * @throws RejectedExecutionException if array cannot be resized
803           */
804          final void push(ForkJoinTask<?> task) {
805 <            int s; ForkJoinTask<?>[] a; ForkJoinPool p;
806 <            int n = base - (s = top);     // negative of incoming size
805 >            ForkJoinTask<?>[] a; ForkJoinPool p;
806 >            int b = base, s = top, n;
807              if ((a = array) != null) {    // ignore if queue removed
808                  int m = a.length - 1;     // fenced write for task visibility
809                  U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
810                  U.putOrderedInt(this, QTOP, s + 1);
811 <                if (n == 0) {
811 >                if ((n = s - b) <= 1) {
812                      if ((p = pool) != null)
813                          p.signalWork(p.workQueues, this);
814                  }
815 <                else if (n + m == 0)
815 >                else if (n == m)
816                      growArray();
817              }
818          }
# Line 1231 | Line 1240 | public class ForkJoinPool extends Abstra
1240      private static final long IDLE_TIMEOUT      = 2000L * 1000L * 1000L; // 2sec
1241  
1242      /**
1234     * Timeout value when there are more threads than parallelism level
1235     */
1236    private static final long FAST_IDLE_TIMEOUT = 100L * 1000L * 1000L; // 100ms
1237
1238    /**
1243       * Tolerance for idle timeouts, to cope with timer undershoots
1244       */
1245      private static final long TIMEOUT_SLOP      = 20L * 1000L * 1000L;  // 20ms
# Line 1304 | Line 1308 | public class ForkJoinPool extends Abstra
1308      private static final long TC_MASK    = 0xffffL << TC_SHIFT;
1309      private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1310  
1311 <    // runState bits (arbitrary powers of two)
1311 >    // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
1312      private static final int  RSLOCK     = 1;
1313      private static final int  RSIGNAL    = 1 << 1;
1314      private static final int  STARTED    = 1 << 2;
1315 <    private static final int  SHUTDOWN   = 1 << 3;
1316 <    private static final int  STOP       = 1 << 4;
1317 <    private static final int  TERMINATED = 1 << 5;
1315 >    private static final int  STOP       = 1 << 29;
1316 >    private static final int  TERMINATED = 1 << 30;
1317 >    private static final int  SHUTDOWN   = 1 << 31;
1318  
1319      // Instance fields
1320      volatile long stealCount;            // collects worker counts
# Line 1515 | Line 1519 | public class ForkJoinPool extends Abstra
1519              U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals
1520          }
1521          if (!tryTerminate(false, false) && w != null && w.array != null) {
1522 <            WorkQueue[] ws = workQueues; int m, sp;
1523 <            if (ws != null && (m = ws.length - 1) >= 0 && (c = ctl) < 0L) {
1524 <                if ((sp = (int)c) != 0)               // wake up replacement
1525 <                    tryRelease(c, ws[sp & m], AC_UNIT);
1526 <                else if (ex != null && (c & ADD_WORKER) != 0L)
1522 >            WorkQueue[] ws; int m, sp;
1523 >            while ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1524 >                if ((sp = (int)(c = ctl)) != 0) {     // wake up replacement
1525 >                    if (tryRelease(c, ws[sp & m], AC_UNIT))
1526 >                        break;
1527 >                }
1528 >                else if (ex != null && (c & ADD_WORKER) != 0L) {
1529                      tryAddWorker(c);                  // create replacement
1530 +                    break;
1531 +                }
1532 +                else
1533 +                    break;
1534              }
1535          }
1536          if (ex == null)                               // help clean on way out
# Line 1560 | Line 1570 | public class ForkJoinPool extends Abstra
1570                      U.unpark(p);
1571                  break;
1572              }
1573 <            if (q != null && q.isEmpty())              // no more work
1573 >            if (q != null && q.base == q.top)          // no more work
1574                  break;
1575          }
1576      }
# Line 1724 | Line 1734 | public class ForkJoinPool extends Abstra
1734                  if (ac <= 0 && tryTerminate(false, false))
1735                      return false;
1736                  if (ac <= 0 && ss == (int)c) {        // is last waiter
1727                    int t = (short)(c >>> TC_SHIFT);  // use timed wait
1737                      prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
1738 <                    parkTime = (t > 0 ? FAST_IDLE_TIMEOUT:
1739 <                                (1 - t) * IDLE_TIMEOUT);
1738 >                    int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
1739 >                    if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
1740 >                        return false;
1741 >                    parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
1742                      deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1743                  }
1744                  else
# Line 1867 | Line 1878 | public class ForkJoinPool extends Abstra
1878                              if (U.compareAndSwapObject(a, i, t, null)) {
1879                                  v.base = b + 1;
1880                                  ForkJoinTask<?> ps = w.currentSteal;
1881 <                                U.putOrderedObject(w, QCURRENTSTEAL, t);
1882 <                                t.doExec();
1881 >                                int top = w.top;
1882 >                                do {
1883 >                                    U.putOrderedObject(w, QCURRENTSTEAL, t);
1884 >                                    t.doExec();        // clear local tasks too
1885 >                                } while (task.status >= 0 &&
1886 >                                         w.top != top &&
1887 >                                         (t = w.pop()) != null);
1888                                  U.putOrderedObject(w, QCURRENTSTEAL, ps);
1889 <                                if (!w.isEmpty())
1889 >                                if (w.base != w.top)
1890                                      return;            // can't further help
1891                              }
1892                          }
# Line 2131 | Line 2147 | public class ForkJoinPool extends Abstra
2147          int rs;
2148          if (this == common)                       // cannot shut down
2149              return false;
2150 <        if (((rs = runState) & SHUTDOWN) == 0) {  // enable
2150 >        if ((rs = runState) >= 0) {               // else already shutdown
2151              if (!enable)
2152                  return false;
2153 <            rs = lockRunState();
2153 >            rs = lockRunState();                  // enable
2154              unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
2155          }
2156          if ((rs & STOP) == 0) {
2157              if (!now && (int)(ctl >> AC_SHIFT) + (config & SMASK) > 0)
2158                  return false;
2159 <            WorkQueue[] ws; WorkQueue w;          // validate no submissions
2159 >            WorkQueue[] ws; WorkQueue w;          // check external submissions
2160              if ((ws = workQueues) != null) {
2161                  for (int i = 0; i < ws.length; ++i) {
2162                      if ((w = ws[i]) != null &&
2163 <                        (!w.isEmpty() || ((i & 1) != 0 && w.scanState >= 0))) {
2163 >                        (w.base != w.top ||
2164 >                         ((i & 1) != 0 && w.scanState >= 0))) {
2165                          signalWork(ws, w);
2166                          return false;
2167                      }
2168                  }
2169              }
2170 <            rs = lockRunState();                 // enter STOP phase
2170 >            rs = lockRunState();                  // enter STOP phase
2171              unlockRunState(rs, (rs & ~RSLOCK) | STOP);
2172          }
2173          for (int pass = 0; pass < 3; ++pass) {    // clobber other workers
# Line 2206 | Line 2223 | public class ForkJoinPool extends Abstra
2223          for (;;) {
2224              WorkQueue[] ws; WorkQueue q; int rs, m, k;
2225              boolean move = false;
2226 <            if (((rs = runState) & SHUTDOWN) != 0)
2226 >            if ((rs = runState) < 0)
2227                  throw new RejectedExecutionException();
2228              else if ((rs & STARTED) == 0 ||     // initialize workQueues array
2229                       ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
# Line 2275 | Line 2292 | public class ForkJoinPool extends Abstra
2292      final void externalPush(ForkJoinTask<?> task) {
2293          WorkQueue[] ws; WorkQueue q; int m;
2294          int r = ThreadLocalRandom.getProbe();
2295 +        int rs = runState;
2296          if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
2297 <            (q = ws[m & r & SQMASK]) != null && r != 0 &&
2297 >            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
2298              U.compareAndSwapInt(q, QLOCK, 0, 1)) {
2299              ForkJoinTask<?>[] a; int am, n, s;
2300              if ((a = q.array) != null &&
# Line 2285 | Line 2303 | public class ForkJoinPool extends Abstra
2303                  U.putOrderedObject(a, j, task);
2304                  U.putOrderedInt(q, QTOP, s + 1);
2305                  U.putOrderedInt(q, QLOCK, 0);
2306 <                if (n == 0)
2306 >                if (n <= 1)
2307                      signalWork(ws, q);
2308                  return;
2309              }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines