206 |
|
* indices guarantee that top == base means the queue is empty, |
207 |
|
* but otherwise may err on the side of possibly making the queue |
208 |
|
* appear nonempty when a push, pop, or poll have not fully |
209 |
< |
* committed. Note that this means that the poll operation, |
210 |
< |
* considered individually, is not wait-free. One thief cannot |
211 |
< |
* successfully continue until another in-progress one (or, if |
212 |
< |
* previously empty, a push) completes. However, in the |
213 |
< |
* aggregate, we ensure at least probabilistic non-blockingness. |
214 |
< |
* If an attempted steal fails, a thief always chooses a different |
215 |
< |
* random victim target to try next. So, in order for one thief to |
216 |
< |
* progress, it suffices for any in-progress poll or new push on |
217 |
< |
* any empty queue to complete. (This is why we normally use |
218 |
< |
* method pollAt and its variants that try once at the apparent |
219 |
< |
* base index, else consider alternative actions, rather than |
220 |
< |
* method poll, which retries.) |
209 |
> |
* committed. (Method isEmpty() checks the case of a partially |
210 |
> |
* completed removal of the last element.) Note that this means |
211 |
> |
* that the poll operation, considered individually, is not |
212 |
> |
* wait-free. One thief cannot successfully continue until another |
213 |
> |
* in-progress one (or, if previously empty, a push) completes. |
214 |
> |
* However, in the aggregate, we ensure at least probabilistic |
215 |
> |
* non-blockingness. If an attempted steal fails, a thief always |
216 |
> |
* chooses a different random victim target to try next. So, in |
217 |
> |
* order for one thief to progress, it suffices for any |
218 |
> |
* in-progress poll or new push on any empty queue to |
219 |
> |
* complete. (This is why we normally use method pollAt and its |
220 |
> |
* variants that try once at the apparent base index, else |
221 |
> |
* consider alternative actions, rather than method poll, which |
222 |
> |
* retries.) |
223 |
|
* |
224 |
|
* This approach also enables support of a user mode in which |
225 |
|
* local task processing is in FIFO, not LIFO order, simply by |
427 |
|
* Signalling and activation. Workers are created or activated |
428 |
|
* only when there appears to be at least one task they might be |
429 |
|
* able to find and execute. Upon push (either by a worker or an |
430 |
< |
* external submission) to a previously empty queue, workers are |
431 |
< |
* signalled if idle, or created if fewer exist than the given |
432 |
< |
* parallelism level. These primary signals are buttressed by |
433 |
< |
* others whenever other threads remove a task from a queue and |
434 |
< |
* notice that there are other tasks there as well. On most |
435 |
< |
* platforms, signalling (unpark) overhead time is noticeably |
436 |
< |
* long, and the time between signalling a thread and it actually |
437 |
< |
* making progress can be very noticeably long, so it is worth |
438 |
< |
* offloading these delays from critical paths as much as |
430 |
> |
* external submission) to a previously (possibly) empty queue, |
431 |
> |
* workers are signalled if idle, or created if fewer exist than |
432 |
> |
* the given parallelism level. These primary signals are |
433 |
> |
* buttressed by others whenever other threads remove a task from |
434 |
> |
* a queue and notice that there are other tasks there as well. |
435 |
> |
* On most platforms, signalling (unpark) overhead time is |
436 |
> |
* noticeably long, and the time between signalling a thread and |
437 |
> |
* it actually making progress can be very noticeably long, so it |
438 |
> |
* is worth offloading these delays from critical paths as much as |
439 |
|
* possible. Also, because enqueued workers are often rescanning |
440 |
|
* or spinning rather than blocking, we set and clear the "parker" |
441 |
|
* field of WorkQueues to reduce unnecessary calls to unpark. |
451 |
|
* Shutdown and Termination. A call to shutdownNow atomically sets |
452 |
|
* a runState bit and then (non-atomically) sets each worker's |
453 |
|
* qlock status, cancels all unprocessed tasks, and wakes up all |
454 |
< |
* waiting workers. Detecting whether termination should commence |
455 |
< |
* after a non-abrupt shutdown() call relies on the active count |
456 |
< |
* bits of "ctl" maintaining consensus about quiescence. |
454 |
> |
* waiting workers (see tryTerminate). Detecting whether |
455 |
> |
* termination should commence after a non-abrupt shutdown() call |
456 |
> |
* relies on the active count bits of "ctl" maintaining consensus |
457 |
> |
* about quiescence. However, external submitters do not take part |
458 |
> |
* in this consensus. So, tryTerminate sweeps through submission |
459 |
> |
* queues to ensure lack of in-flight submissions before |
460 |
> |
* triggering the "STOP" phase of termination. |
461 |
|
* |
462 |
|
* Joining Tasks |
463 |
|
* ============= |
540 |
|
* threads) in the most common case in which it is rarely |
541 |
|
* beneficial: when a worker with an empty queue (thus no |
542 |
|
* continuation tasks) blocks on a join and there still remain |
543 |
< |
* enough threads to ensure liveness. |
543 |
> |
* enough threads to ensure liveness. Also, whenever more than two |
544 |
> |
* spare threads are generated, they are killed (see awaitWork) at |
545 |
> |
* the next quiescent point (padding by two avoids hysteresis). |
546 |
|
* |
547 |
|
* Bounds. The compensation mechanism is bounded (see MAX_SPARES), |
548 |
|
* to better enable JVMs to cope with programming errors and abuse |
554 |
|
* the JVM and OS. So the number of simultaneously live threads |
555 |
|
* may transiently exceed bounds. |
556 |
|
* |
557 |
+ |
* |
558 |
|
* Common Pool |
559 |
|
* =========== |
560 |
|
* |
802 |
|
* @throws RejectedExecutionException if array cannot be resized |
803 |
|
*/ |
804 |
|
final void push(ForkJoinTask<?> task) { |
805 |
< |
int s; ForkJoinTask<?>[] a; ForkJoinPool p; |
806 |
< |
int n = base - (s = top); // negative of incoming size |
805 |
> |
ForkJoinTask<?>[] a; ForkJoinPool p; |
806 |
> |
int b = base, s = top, n; |
807 |
|
if ((a = array) != null) { // ignore if queue removed |
808 |
|
int m = a.length - 1; // fenced write for task visibility |
809 |
|
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); |
810 |
|
U.putOrderedInt(this, QTOP, s + 1); |
811 |
< |
if (n == 0) { |
811 |
> |
if ((n = s - b) <= 1) { |
812 |
|
if ((p = pool) != null) |
813 |
|
p.signalWork(p.workQueues, this); |
814 |
|
} |
815 |
< |
else if (n + m == 0) |
815 |
> |
else if (n == m) |
816 |
|
growArray(); |
817 |
|
} |
818 |
|
} |
1240 |
|
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec |
1241 |
|
|
1242 |
|
/** |
1234 |
– |
* Timeout value when there are more threads than parallelism level |
1235 |
– |
*/ |
1236 |
– |
private static final long FAST_IDLE_TIMEOUT = 100L * 1000L * 1000L; // 100ms |
1237 |
– |
|
1238 |
– |
/** |
1243 |
|
* Tolerance for idle timeouts, to cope with timer undershoots |
1244 |
|
*/ |
1245 |
|
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms |
1308 |
|
private static final long TC_MASK = 0xffffL << TC_SHIFT; |
1309 |
|
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign |
1310 |
|
|
1311 |
< |
// runState bits (arbitrary powers of two) |
1311 |
> |
// runState bits: SHUTDOWN must be negative, others arbitrary powers of two |
1312 |
|
private static final int RSLOCK = 1; |
1313 |
|
private static final int RSIGNAL = 1 << 1; |
1314 |
|
private static final int STARTED = 1 << 2; |
1315 |
< |
private static final int SHUTDOWN = 1 << 3; |
1316 |
< |
private static final int STOP = 1 << 4; |
1317 |
< |
private static final int TERMINATED = 1 << 5; |
1315 |
> |
private static final int STOP = 1 << 29; |
1316 |
> |
private static final int TERMINATED = 1 << 30; |
1317 |
> |
private static final int SHUTDOWN = 1 << 31; |
1318 |
|
|
1319 |
|
// Instance fields |
1320 |
|
volatile long stealCount; // collects worker counts |
1519 |
|
U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals |
1520 |
|
} |
1521 |
|
if (!tryTerminate(false, false) && w != null && w.array != null) { |
1522 |
< |
WorkQueue[] ws = workQueues; int m, sp; |
1523 |
< |
if (ws != null && (m = ws.length - 1) >= 0 && (c = ctl) < 0L) { |
1524 |
< |
if ((sp = (int)c) != 0) // wake up replacement |
1525 |
< |
tryRelease(c, ws[sp & m], AC_UNIT); |
1526 |
< |
else if (ex != null && (c & ADD_WORKER) != 0L) |
1522 |
> |
WorkQueue[] ws; int m, sp; |
1523 |
> |
while ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { |
1524 |
> |
if ((sp = (int)(c = ctl)) != 0) { // wake up replacement |
1525 |
> |
if (tryRelease(c, ws[sp & m], AC_UNIT)) |
1526 |
> |
break; |
1527 |
> |
} |
1528 |
> |
else if (ex != null && (c & ADD_WORKER) != 0L) { |
1529 |
|
tryAddWorker(c); // create replacement |
1530 |
+ |
break; |
1531 |
+ |
} |
1532 |
+ |
else |
1533 |
+ |
break; |
1534 |
|
} |
1535 |
|
} |
1536 |
|
if (ex == null) // help clean on way out |
1570 |
|
U.unpark(p); |
1571 |
|
break; |
1572 |
|
} |
1573 |
< |
if (q != null && q.isEmpty()) // no more work |
1573 |
> |
if (q != null && q.base == q.top) // no more work |
1574 |
|
break; |
1575 |
|
} |
1576 |
|
} |
1734 |
|
if (ac <= 0 && tryTerminate(false, false)) |
1735 |
|
return false; |
1736 |
|
if (ac <= 0 && ss == (int)c) { // is last waiter |
1727 |
– |
int t = (short)(c >>> TC_SHIFT); // use timed wait |
1737 |
|
prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); |
1738 |
< |
parkTime = (t > 0 ? FAST_IDLE_TIMEOUT: |
1739 |
< |
(1 - t) * IDLE_TIMEOUT); |
1738 |
> |
int t = (short)(c >>> TC_SHIFT); // shrink excess spares |
1739 |
> |
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl)) |
1740 |
> |
return false; |
1741 |
> |
parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); |
1742 |
|
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; |
1743 |
|
} |
1744 |
|
else |
1878 |
|
if (U.compareAndSwapObject(a, i, t, null)) { |
1879 |
|
v.base = b + 1; |
1880 |
|
ForkJoinTask<?> ps = w.currentSteal; |
1881 |
< |
U.putOrderedObject(w, QCURRENTSTEAL, t); |
1882 |
< |
t.doExec(); |
1881 |
> |
int top = w.top; |
1882 |
> |
do { |
1883 |
> |
U.putOrderedObject(w, QCURRENTSTEAL, t); |
1884 |
> |
t.doExec(); // clear local tasks too |
1885 |
> |
} while (task.status >= 0 && |
1886 |
> |
w.top != top && |
1887 |
> |
(t = w.pop()) != null); |
1888 |
|
U.putOrderedObject(w, QCURRENTSTEAL, ps); |
1889 |
< |
if (!w.isEmpty()) |
1889 |
> |
if (w.base != w.top) |
1890 |
|
return; // can't further help |
1891 |
|
} |
1892 |
|
} |
2147 |
|
int rs; |
2148 |
|
if (this == common) // cannot shut down |
2149 |
|
return false; |
2150 |
< |
if (((rs = runState) & SHUTDOWN) == 0) { // enable |
2150 |
> |
if ((rs = runState) >= 0) { // else already shutdown |
2151 |
|
if (!enable) |
2152 |
|
return false; |
2153 |
< |
rs = lockRunState(); |
2153 |
> |
rs = lockRunState(); // enable |
2154 |
|
unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN); |
2155 |
|
} |
2156 |
|
if ((rs & STOP) == 0) { |
2157 |
|
if (!now && (int)(ctl >> AC_SHIFT) + (config & SMASK) > 0) |
2158 |
|
return false; |
2159 |
< |
WorkQueue[] ws; WorkQueue w; // validate no submissions |
2159 |
> |
WorkQueue[] ws; WorkQueue w; // check external submissions |
2160 |
|
if ((ws = workQueues) != null) { |
2161 |
|
for (int i = 0; i < ws.length; ++i) { |
2162 |
|
if ((w = ws[i]) != null && |
2163 |
< |
(!w.isEmpty() || ((i & 1) != 0 && w.scanState >= 0))) { |
2163 |
> |
(w.base != w.top || |
2164 |
> |
((i & 1) != 0 && w.scanState >= 0))) { |
2165 |
|
signalWork(ws, w); |
2166 |
|
return false; |
2167 |
|
} |
2168 |
|
} |
2169 |
|
} |
2170 |
< |
rs = lockRunState(); // enter STOP phase |
2170 |
> |
rs = lockRunState(); // enter STOP phase |
2171 |
|
unlockRunState(rs, (rs & ~RSLOCK) | STOP); |
2172 |
|
} |
2173 |
|
for (int pass = 0; pass < 3; ++pass) { // clobber other workers |
2223 |
|
for (;;) { |
2224 |
|
WorkQueue[] ws; WorkQueue q; int rs, m, k; |
2225 |
|
boolean move = false; |
2226 |
< |
if (((rs = runState) & SHUTDOWN) != 0) |
2226 |
> |
if ((rs = runState) < 0) |
2227 |
|
throw new RejectedExecutionException(); |
2228 |
|
else if ((rs & STARTED) == 0 || // initialize workQueues array |
2229 |
|
((ws = workQueues) == null || (m = ws.length - 1) < 0)) { |
2292 |
|
final void externalPush(ForkJoinTask<?> task) { |
2293 |
|
WorkQueue[] ws; WorkQueue q; int m; |
2294 |
|
int r = ThreadLocalRandom.getProbe(); |
2295 |
+ |
int rs = runState; |
2296 |
|
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && |
2297 |
< |
(q = ws[m & r & SQMASK]) != null && r != 0 && |
2297 |
> |
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && |
2298 |
|
U.compareAndSwapInt(q, QLOCK, 0, 1)) { |
2299 |
|
ForkJoinTask<?>[] a; int am, n, s; |
2300 |
|
if ((a = q.array) != null && |
2303 |
|
U.putOrderedObject(a, j, task); |
2304 |
|
U.putOrderedInt(q, QTOP, s + 1); |
2305 |
|
U.putOrderedInt(q, QLOCK, 0); |
2306 |
< |
if (n == 0) |
2306 |
> |
if (n <= 1) |
2307 |
|
signalWork(ws, q); |
2308 |
|
return; |
2309 |
|
} |