ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.209 by dl, Thu Jul 10 16:00:59 2014 UTC vs.
Revision 1.210 by dl, Fri Jul 11 16:10:38 2014 UTC

# Line 459 | Line 459 | public class ForkJoinPool extends Abstra
459       * are immediately terminated at the next quiescent point.
460       * (Padding by two avoids hysteresis).
461       *
462 <     * Shutdown and Termination. A call to shutdownNow atomically sets
463 <     * a runState bit and then (non-atomically) sets each worker's
464 <     * qlock status, cancels all unprocessed tasks, and wakes up all
465 <     * waiting workers (see tryTerminate).  Detecting whether
466 <     * termination should commence after a non-abrupt shutdown() call
467 <     * relies on the active count bits of "ctl" maintaining consensus
468 <     * about quiescence. However, external submitters do not take part
469 <     * in this consensus.  So, tryTerminate sweeps through queues to
470 <     * ensure lack of in-flight submissions and workers about to
471 <     * process them before triggering the "STOP" phase of
462 >     * Shutdown and Termination. A call to shutdownNow invokes
463 >     * tryTerminate to atomically set a runState bit. The calling
464 >     * thread, as well as every other worker thereafter terminating,
465 >     * helps terminate others by setting their (qlock) status,
466 >     * cancelling their unprocessed tasks, and waking them up, doing
467 >     * so repeatedly until stable (but with a loop bounded by the
468 >     * number of workers).  Calls to non-abrupt shutdown() preface
469 >     * this by checking whether termination should commence. This
470 >     * relies primarily on the active count bits of "ctl" maintaining
471 >     * consensus about quiescence. However, external submitters do not
472 >     * take part in this consensus.  So, tryTerminate sweeps through
473 >     * queues to ensure lack of in-flight submissions and workers
474 >     * about to process them before triggering the "STOP" phase of
475       * termination.
476       *
477       * Joining Tasks
# Line 1363 | Line 1366 | public class ForkJoinPool extends Abstra
1366       * conservative alternative to a pure spinlock.
1367       */
1368      private int awaitRunStateLock() {
1369 <        for (int spins = SPINS, r = 0, rs, nrs;;) {
1369 >        boolean wasInterrupted = false;
1370 >        for (int spins = SPINS, r = 0, rs, ns;;) {
1371              if (((rs = runState) & RSLOCK) == 0) {
1372 <                if (U.compareAndSwapInt(this, RUNSTATE, rs, nrs = rs | RSLOCK))
1373 <                    return nrs;
1372 >                if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
1373 >                    if (wasInterrupted) {
1374 >                        try {
1375 >                            Thread.currentThread().interrupt();
1376 >                        } catch (SecurityException ignore) {
1377 >                        }
1378 >                    }
1379 >                    return ns;
1380 >                }
1381              }
1382              else if (r == 0)
1383                  r = ThreadLocalRandom.nextSecondarySeed();
# Line 1381 | Line 1392 | public class ForkJoinPool extends Abstra
1392                          try {
1393                              wait();
1394                          } catch (InterruptedException ie) {
1395 <                            try {
1396 <                                Thread.currentThread().interrupt();
1397 <                            } catch (SecurityException ignore) {
1387 <                            }
1395 >                            if (!(Thread.currentThread() instanceof
1396 >                                  ForkJoinWorkerThread))
1397 >                                wasInterrupted = true;
1398                          }
1399                      }
1400                      else
# Line 1689 | Line 1699 | public class ForkJoinPool extends Abstra
1699                  if ((k = (k + 1) & m) == origin) {    // continue until stable
1700                      if ((ss >= 0 || (ss == (ss = w.scanState))) &&
1701                          oldSum == (oldSum = checkSum)) {
1702 <                        if (ss < 0)                   // already inactive
1702 >                        if (ss < 0 || w.qlock < 0)    // already inactive
1703                              break;
1704                          int ns = ss | INACTIVE;       // try to inactivate
1705                          long nc = ((SP_MASK & ns) |
# Line 1746 | Line 1756 | public class ForkJoinPool extends Abstra
1756                  return false;
1757              else if (!Thread.interrupted()) {
1758                  long c, prevctl, parkTime, deadline;
1749                if ((runState & STOP) != 0)           // pool terminating
1750                    return false;
1759                  int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
1760 <                if (ac <= 0 && tryTerminate(false, false))
1760 >                if ((ac <= 0 && tryTerminate(false, false)) ||
1761 >                    (runState & STOP) != 0)           // pool terminating
1762                      return false;
1763                  if (ac <= 0 && ss == (int)c) {        // is last waiter
1764                      prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
# Line 1768 | Line 1777 | public class ForkJoinPool extends Abstra
1777                      U.park(false, parkTime);
1778                  U.putOrderedObject(w, QPARKER, null);
1779                  U.putObject(wt, PARKBLOCKER, null);
1780 +                if (w.qlock < 0)                      // terminated while parked
1781 +                    return false;
1782                  if (w.scanState >= 0)
1783                      break;
1784                  if (parkTime != 0L && ctl == c &&
# Line 2149 | Line 2160 | public class ForkJoinPool extends Abstra
2160      //  Termination
2161  
2162      /**
2163 <     * Possibly initiates and/or completes termination.  When
2153 <     * terminating (STOP phase), runs three passes through workQueues:
2154 <     * (0) Setting termination status (which also stops external
2155 <     * submitters by locking queues), (1) cancelling all tasks; (2)
2156 <     * interrupting lagging threads (likely in external tasks, but
2157 <     * possibly also blocked in joins).  Each pass repeats previous
2158 <     * steps because of potential lagging thread creation.
2163 >     * Possibly initiates and/or completes termination.
2164       *
2165       * @param now if true, unconditionally terminate, else only
2166       * if no work and no active workers
# Line 2166 | Line 2171 | public class ForkJoinPool extends Abstra
2171          int rs;
2172          if (this == common)                       // cannot shut down
2173              return false;
2174 <        if ((rs = runState) >= 0) {               // else already shutdown
2174 >        if ((rs = runState) >= 0) {
2175              if (!enable)
2176                  return false;
2177 <            rs = lockRunState();                  // enable
2177 >            rs = lockRunState();                  // enter SHUTDOWN phase
2178              unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
2179          }
2180 +
2181          if ((rs & STOP) == 0) {
2182 <            if (!now) {
2183 <                for (int oldSum = 0, checkSum = 0;;) {
2184 <                    WorkQueue[] ws; WorkQueue w; int m, b;
2185 <                    if ((int)(ctl >> AC_SHIFT) + (config & SMASK) > 0)
2186 <                        return false;             // not quiescent
2182 >            if (!now) {                           // check quiescence
2183 >                outer: for (long oldSum = 0L;;) { // repeat until stable
2184 >                    WorkQueue[] ws; WorkQueue w; int m, b; long c;
2185 >                    long checkSum = ctl;
2186 >                    if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
2187 >                        return false;             // still active workers
2188                      if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
2189 <                        break;                    // scan for submissions
2189 >                        break;                    // check queues
2190                      for (int i = 0; i <= m; ++i) {
2191                          if ((w = ws[i]) != null) {
2192 <                            if ((i & 1) == 0)
2193 <                                w.qlock = -1;     // disable external queue
2194 <                            else if (w.scanState >= 0)
2195 <                                return false;     // still active
2196 <                            if ((b = w.base) != w.top )
2197 <                                return false;
2192 >                            if ((b = w.base) != w.top || w.scanState >= 0 ||
2193 >                                w.currentSteal != null) {
2194 >                                if ((runState & STOP) != 0)
2195 >                                    break outer;  // already stopping
2196 >                                tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
2197 >                                return false;     // ensure recheck
2198 >                            }
2199                              checkSum += b;
2200 +                            if ((i & 1) == 0)
2201 +                                w.qlock = -1;     // try to disable external
2202                          }
2203                      }
2204                      if (oldSum == (oldSum = checkSum))
2205 <                        break;                    // continue until stable
2196 <                    checkSum = 0;
2205 >                        break;
2206                  }
2207              }
2208 <            rs = lockRunState();                  // enter STOP phase
2209 <            int ns = rs & STOP;
2210 <            if (ns != 0 || !now ||
2211 <                (int)(ctl >> AC_SHIFT) + (config & SMASK) <= 0)
2203 <                ns = STOP;                        // recheck under lock
2204 <            unlockRunState(rs, (rs & ~RSLOCK) | ns);
2205 <            if (ns == 0)
2206 <                return false;
2208 >            if ((runState & STOP) == 0) {
2209 >                rs = lockRunState();              // enter STOP phase
2210 >                unlockRunState(rs, (rs & ~RSLOCK) | STOP);
2211 >            }
2212          }
2213 <        for (int pass = 0; pass < 3; ++pass) {    // clobber other workers
2214 <            WorkQueue[] ws; int n;
2215 <            if ((ws = workQueues) != null && (n = ws.length) > 0) {
2216 <                WorkQueue w; Thread wt;
2217 <                for (int i = 0; i < n; ++i) {
2218 <                    if ((w = ws[i]) != null) {
2219 <                        w.qlock = -1;
2220 <                        if (pass > 0) {
2221 <                            w.cancelAll();       // clear queue
2222 <                            if (pass > 1 && (wt = w.owner) != null) {
2223 <                                if (!wt.isInterrupted()) {
2224 <                                    try {
2225 <                                        wt.interrupt();
2226 <                                    } catch (Throwable ignore) {
2227 <                                    }
2213 >
2214 >        int pass = 0;                             // 3 passes to help terminate
2215 >        for (long oldSum = 0L;;) {                // or until done or stable
2216 >            WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
2217 >            long checkSum = ctl;
2218 >            if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
2219 >                (ws = workQueues) == null || (m = ws.length - 1) <= 0) {
2220 >                if ((runState & TERMINATED) == 0) {
2221 >                    rs = lockRunState();          // done
2222 >                    unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
2223 >                    synchronized (this) { notifyAll(); } // for awaitTermination
2224 >                }
2225 >                break;
2226 >            }
2227 >            for (int i = 0; i <= m; ++i) {
2228 >                if ((w = ws[i]) != null) {
2229 >                    checkSum += w.base;
2230 >                    w.qlock = -1;                 // try to disable
2231 >                    if (pass > 0) {
2232 >                        w.cancelAll();            // clear queue
2233 >                        if (pass > 1 && (wt = w.owner) != null) {
2234 >                            if (!wt.isInterrupted()) {
2235 >                                try {             // unblock join
2236 >                                    wt.interrupt();
2237 >                                } catch (Throwable ignore) {
2238                                  }
2224                                U.unpark(wt);    // wake up
2239                              }
2240 +                            if (w.scanState < 0)
2241 +                                U.unpark(wt);     // wake up
2242                          }
2243                      }
2244                  }
2245              }
2246 <        }
2247 <        if ((short)(ctl >>> TC_SHIFT) + (config & SMASK) <= 0) {
2248 <            rs = lockRunState();                  // done -- no more workers
2249 <            unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
2250 <            synchronized (this) {                 // release awaitTermination
2251 <                notifyAll();
2246 >            if (checkSum != oldSum) {             // unstable
2247 >                oldSum = checkSum;
2248 >                pass = 0;
2249 >            }
2250 >            else if (pass > 3 && pass > m)        // can't further help
2251 >                break;
2252 >            else if (++pass > 1) {                // try to dequeue
2253 >                long c; int j = 0, sp;            // bound attempts
2254 >                while (j++ <= m && (sp = (int)(c = ctl)) != 0)
2255 >                    tryRelease(c, ws[sp & m], AC_UNIT);
2256              }
2257          }
2258          return true;
# Line 2258 | Line 2278 | public class ForkJoinPool extends Abstra
2278          for (;;) {
2279              WorkQueue[] ws; WorkQueue q; int rs, m, k;
2280              boolean move = false;
2281 <            if ((rs = runState) < 0)
2281 >            if ((rs = runState) < 0) {
2282 >                tryTerminate(false, false);     // help terminate
2283                  throw new RejectedExecutionException();
2284 +            }
2285              else if ((rs & STARTED) == 0 ||     // initialize workQueues array
2286                       ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
2287                  int ns = 0;
# Line 2304 | Line 2326 | public class ForkJoinPool extends Abstra
2326                  q = new WorkQueue(this, null);
2327                  q.hint = r;
2328                  q.config = k | SHARED_QUEUE;
2329 +                q.scanState = INACTIVE;
2330                  rs = lockRunState();           // publish index
2331                  if (rs > 0 &&  (ws = workQueues) != null &&
2332                      k < ws.length && ws[k] == null)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines