459 |
|
* are immediately terminated at the next quiescent point. |
460 |
|
* (Padding by two avoids hysteresis). |
461 |
|
* |
462 |
< |
* Shutdown and Termination. A call to shutdownNow atomically sets |
463 |
< |
* a runState bit and then (non-atomically) sets each worker's |
464 |
< |
* qlock status, cancels all unprocessed tasks, and wakes up all |
465 |
< |
* waiting workers (see tryTerminate). Detecting whether |
466 |
< |
* termination should commence after a non-abrupt shutdown() call |
467 |
< |
* relies on the active count bits of "ctl" maintaining consensus |
468 |
< |
* about quiescence. However, external submitters do not take part |
469 |
< |
* in this consensus. So, tryTerminate sweeps through queues to |
470 |
< |
* ensure lack of in-flight submissions and workers about to |
471 |
< |
* process them before triggering the "STOP" phase of |
462 |
> |
* Shutdown and Termination. A call to shutdownNow invokes |
463 |
> |
* tryTerminate to atomically set a runState bit. The calling |
464 |
> |
* thread, as well as every other worker thereafter terminating, |
465 |
> |
* helps terminate others by setting their (qlock) status, |
466 |
> |
* cancelling their unprocessed tasks, and waking them up, doing |
467 |
> |
* so repeatedly until stable (but with a loop bounded by the |
468 |
> |
* number of workers). Calls to non-abrupt shutdown() preface |
469 |
> |
* this by checking whether termination should commence. This |
470 |
> |
* relies primarily on the active count bits of "ctl" maintaining |
471 |
> |
* consensus about quiescence. However, external submitters do not |
472 |
> |
* take part in this consensus. So, tryTerminate sweeps through |
473 |
> |
* queues to ensure lack of in-flight submissions and workers |
474 |
> |
* about to process them before triggering the "STOP" phase of |
475 |
|
* termination. |
476 |
|
* |
477 |
|
* Joining Tasks |
1366 |
|
* conservative alternative to a pure spinlock. |
1367 |
|
*/ |
1368 |
|
private int awaitRunStateLock() { |
1369 |
< |
for (int spins = SPINS, r = 0, rs, nrs;;) { |
1369 |
> |
boolean wasInterrupted = false; |
1370 |
> |
for (int spins = SPINS, r = 0, rs, ns;;) { |
1371 |
|
if (((rs = runState) & RSLOCK) == 0) { |
1372 |
< |
if (U.compareAndSwapInt(this, RUNSTATE, rs, nrs = rs | RSLOCK)) |
1373 |
< |
return nrs; |
1372 |
> |
if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) { |
1373 |
> |
if (wasInterrupted) { |
1374 |
> |
try { |
1375 |
> |
Thread.currentThread().interrupt(); |
1376 |
> |
} catch (SecurityException ignore) { |
1377 |
> |
} |
1378 |
> |
} |
1379 |
> |
return ns; |
1380 |
> |
} |
1381 |
|
} |
1382 |
|
else if (r == 0) |
1383 |
|
r = ThreadLocalRandom.nextSecondarySeed(); |
1392 |
|
try { |
1393 |
|
wait(); |
1394 |
|
} catch (InterruptedException ie) { |
1395 |
< |
try { |
1396 |
< |
Thread.currentThread().interrupt(); |
1397 |
< |
} catch (SecurityException ignore) { |
1387 |
< |
} |
1395 |
> |
if (!(Thread.currentThread() instanceof |
1396 |
> |
ForkJoinWorkerThread)) |
1397 |
> |
wasInterrupted = true; |
1398 |
|
} |
1399 |
|
} |
1400 |
|
else |
1699 |
|
if ((k = (k + 1) & m) == origin) { // continue until stable |
1700 |
|
if ((ss >= 0 || (ss == (ss = w.scanState))) && |
1701 |
|
oldSum == (oldSum = checkSum)) { |
1702 |
< |
if (ss < 0) // already inactive |
1702 |
> |
if (ss < 0 || w.qlock < 0) // already inactive |
1703 |
|
break; |
1704 |
|
int ns = ss | INACTIVE; // try to inactivate |
1705 |
|
long nc = ((SP_MASK & ns) | |
1756 |
|
return false; |
1757 |
|
else if (!Thread.interrupted()) { |
1758 |
|
long c, prevctl, parkTime, deadline; |
1749 |
– |
if ((runState & STOP) != 0) // pool terminating |
1750 |
– |
return false; |
1759 |
|
int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK); |
1760 |
< |
if (ac <= 0 && tryTerminate(false, false)) |
1760 |
> |
if ((ac <= 0 && tryTerminate(false, false)) || |
1761 |
> |
(runState & STOP) != 0) // pool terminating |
1762 |
|
return false; |
1763 |
|
if (ac <= 0 && ss == (int)c) { // is last waiter |
1764 |
|
prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); |
1777 |
|
U.park(false, parkTime); |
1778 |
|
U.putOrderedObject(w, QPARKER, null); |
1779 |
|
U.putObject(wt, PARKBLOCKER, null); |
1780 |
+ |
if (w.qlock < 0) // terminated while parked |
1781 |
+ |
return false; |
1782 |
|
if (w.scanState >= 0) |
1783 |
|
break; |
1784 |
|
if (parkTime != 0L && ctl == c && |
2160 |
|
// Termination |
2161 |
|
|
2162 |
|
/** |
2163 |
< |
* Possibly initiates and/or completes termination. When |
2153 |
< |
* terminating (STOP phase), runs three passes through workQueues: |
2154 |
< |
* (0) Setting termination status (which also stops external |
2155 |
< |
* submitters by locking queues), (1) cancelling all tasks; (2) |
2156 |
< |
* interrupting lagging threads (likely in external tasks, but |
2157 |
< |
* possibly also blocked in joins). Each pass repeats previous |
2158 |
< |
* steps because of potential lagging thread creation. |
2163 |
> |
* Possibly initiates and/or completes termination. |
2164 |
|
* |
2165 |
|
* @param now if true, unconditionally terminate, else only |
2166 |
|
* if no work and no active workers |
2171 |
|
int rs; |
2172 |
|
if (this == common) // cannot shut down |
2173 |
|
return false; |
2174 |
< |
if ((rs = runState) >= 0) { // else already shutdown |
2174 |
> |
if ((rs = runState) >= 0) { |
2175 |
|
if (!enable) |
2176 |
|
return false; |
2177 |
< |
rs = lockRunState(); // enable |
2177 |
> |
rs = lockRunState(); // enter SHUTDOWN phase |
2178 |
|
unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN); |
2179 |
|
} |
2180 |
+ |
|
2181 |
|
if ((rs & STOP) == 0) { |
2182 |
< |
if (!now) { |
2183 |
< |
for (int oldSum = 0, checkSum = 0;;) { |
2184 |
< |
WorkQueue[] ws; WorkQueue w; int m, b; |
2185 |
< |
if ((int)(ctl >> AC_SHIFT) + (config & SMASK) > 0) |
2186 |
< |
return false; // not quiescent |
2182 |
> |
if (!now) { // check quiescence |
2183 |
> |
outer: for (long oldSum = 0L;;) { // repeat until stable |
2184 |
> |
WorkQueue[] ws; WorkQueue w; int m, b; long c; |
2185 |
> |
long checkSum = ctl; |
2186 |
> |
if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0) |
2187 |
> |
return false; // still active workers |
2188 |
|
if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) |
2189 |
< |
break; // scan for submissions |
2189 |
> |
break; // check queues |
2190 |
|
for (int i = 0; i <= m; ++i) { |
2191 |
|
if ((w = ws[i]) != null) { |
2192 |
< |
if ((i & 1) == 0) |
2193 |
< |
w.qlock = -1; // disable external queue |
2194 |
< |
else if (w.scanState >= 0) |
2195 |
< |
return false; // still active |
2196 |
< |
if ((b = w.base) != w.top ) |
2197 |
< |
return false; |
2192 |
> |
if ((b = w.base) != w.top || w.scanState >= 0 || |
2193 |
> |
w.currentSteal != null) { |
2194 |
> |
if ((runState & STOP) != 0) |
2195 |
> |
break outer; // already stopping |
2196 |
> |
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); |
2197 |
> |
return false; // ensure recheck |
2198 |
> |
} |
2199 |
|
checkSum += b; |
2200 |
+ |
if ((i & 1) == 0) |
2201 |
+ |
w.qlock = -1; // try to disable external |
2202 |
|
} |
2203 |
|
} |
2204 |
|
if (oldSum == (oldSum = checkSum)) |
2205 |
< |
break; // continue until stable |
2196 |
< |
checkSum = 0; |
2205 |
> |
break; |
2206 |
|
} |
2207 |
|
} |
2208 |
< |
rs = lockRunState(); // enter STOP phase |
2209 |
< |
int ns = rs & STOP; |
2210 |
< |
if (ns != 0 || !now || |
2211 |
< |
(int)(ctl >> AC_SHIFT) + (config & SMASK) <= 0) |
2203 |
< |
ns = STOP; // recheck under lock |
2204 |
< |
unlockRunState(rs, (rs & ~RSLOCK) | ns); |
2205 |
< |
if (ns == 0) |
2206 |
< |
return false; |
2208 |
> |
if ((runState & STOP) == 0) { |
2209 |
> |
rs = lockRunState(); // enter STOP phase |
2210 |
> |
unlockRunState(rs, (rs & ~RSLOCK) | STOP); |
2211 |
> |
} |
2212 |
|
} |
2213 |
< |
for (int pass = 0; pass < 3; ++pass) { // clobber other workers |
2214 |
< |
WorkQueue[] ws; int n; |
2215 |
< |
if ((ws = workQueues) != null && (n = ws.length) > 0) { |
2216 |
< |
WorkQueue w; Thread wt; |
2217 |
< |
for (int i = 0; i < n; ++i) { |
2218 |
< |
if ((w = ws[i]) != null) { |
2219 |
< |
w.qlock = -1; |
2220 |
< |
if (pass > 0) { |
2221 |
< |
w.cancelAll(); // clear queue |
2222 |
< |
if (pass > 1 && (wt = w.owner) != null) { |
2223 |
< |
if (!wt.isInterrupted()) { |
2224 |
< |
try { |
2225 |
< |
wt.interrupt(); |
2226 |
< |
} catch (Throwable ignore) { |
2227 |
< |
} |
2213 |
> |
|
2214 |
> |
int pass = 0; // 3 passes to help terminate |
2215 |
> |
for (long oldSum = 0L;;) { // or until done or stable |
2216 |
> |
WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m; |
2217 |
> |
long checkSum = ctl; |
2218 |
> |
if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 || |
2219 |
> |
(ws = workQueues) == null || (m = ws.length - 1) <= 0) { |
2220 |
> |
if ((runState & TERMINATED) == 0) { |
2221 |
> |
rs = lockRunState(); // done |
2222 |
> |
unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED); |
2223 |
> |
synchronized (this) { notifyAll(); } // for awaitTermination |
2224 |
> |
} |
2225 |
> |
break; |
2226 |
> |
} |
2227 |
> |
for (int i = 0; i <= m; ++i) { |
2228 |
> |
if ((w = ws[i]) != null) { |
2229 |
> |
checkSum += w.base; |
2230 |
> |
w.qlock = -1; // try to disable |
2231 |
> |
if (pass > 0) { |
2232 |
> |
w.cancelAll(); // clear queue |
2233 |
> |
if (pass > 1 && (wt = w.owner) != null) { |
2234 |
> |
if (!wt.isInterrupted()) { |
2235 |
> |
try { // unblock join |
2236 |
> |
wt.interrupt(); |
2237 |
> |
} catch (Throwable ignore) { |
2238 |
|
} |
2224 |
– |
U.unpark(wt); // wake up |
2239 |
|
} |
2240 |
+ |
if (w.scanState < 0) |
2241 |
+ |
U.unpark(wt); // wake up |
2242 |
|
} |
2243 |
|
} |
2244 |
|
} |
2245 |
|
} |
2246 |
< |
} |
2247 |
< |
if ((short)(ctl >>> TC_SHIFT) + (config & SMASK) <= 0) { |
2248 |
< |
rs = lockRunState(); // done -- no more workers |
2249 |
< |
unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED); |
2250 |
< |
synchronized (this) { // release awaitTermination |
2251 |
< |
notifyAll(); |
2246 |
> |
if (checkSum != oldSum) { // unstable |
2247 |
> |
oldSum = checkSum; |
2248 |
> |
pass = 0; |
2249 |
> |
} |
2250 |
> |
else if (pass > 3 && pass > m) // can't further help |
2251 |
> |
break; |
2252 |
> |
else if (++pass > 1) { // try to dequeue |
2253 |
> |
long c; int j = 0, sp; // bound attempts |
2254 |
> |
while (j++ <= m && (sp = (int)(c = ctl)) != 0) |
2255 |
> |
tryRelease(c, ws[sp & m], AC_UNIT); |
2256 |
|
} |
2257 |
|
} |
2258 |
|
return true; |
2278 |
|
for (;;) { |
2279 |
|
WorkQueue[] ws; WorkQueue q; int rs, m, k; |
2280 |
|
boolean move = false; |
2281 |
< |
if ((rs = runState) < 0) |
2281 |
> |
if ((rs = runState) < 0) { |
2282 |
> |
tryTerminate(false, false); // help terminate |
2283 |
|
throw new RejectedExecutionException(); |
2284 |
+ |
} |
2285 |
|
else if ((rs & STARTED) == 0 || // initialize workQueues array |
2286 |
|
((ws = workQueues) == null || (m = ws.length - 1) < 0)) { |
2287 |
|
int ns = 0; |
2326 |
|
q = new WorkQueue(this, null); |
2327 |
|
q.hint = r; |
2328 |
|
q.config = k | SHARED_QUEUE; |
2329 |
+ |
q.scanState = INACTIVE; |
2330 |
|
rs = lockRunState(); // publish index |
2331 |
|
if (rs > 0 && (ws = workQueues) != null && |
2332 |
|
k < ws.length && ws[k] == null) |