113 |
|
* - the class name of a {@link ForkJoinWorkerThreadFactory} |
114 |
|
* <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler} |
115 |
|
* - the class name of a {@link UncaughtExceptionHandler} |
116 |
– |
* <!-- |
116 |
|
* <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares} |
117 |
|
* - the maximum number of alloed extra threads to maintain target |
118 |
|
* parallelism (default 256). |
120 |
– |
* --> |
119 |
|
* </ul> |
120 |
|
* If a {@link SecurityManager} is present and no factory is |
121 |
|
* specified, then the default pool uses a factory supplying |
214 |
|
* but otherwise may err on the side of possibly making the queue |
215 |
|
* appear nonempty when a push, pop, or poll have not fully |
216 |
|
* committed. (Method isEmpty() checks the case of a partially |
217 |
< |
* completed removal of the last element.) Note that this means |
218 |
< |
* that the poll operation, considered individually, is not |
219 |
< |
* wait-free. One thief cannot successfully continue until another |
220 |
< |
* in-progress one (or, if previously empty, a push) completes. |
221 |
< |
* However, in the aggregate, we ensure at least probabilistic |
217 |
> |
* completed removal of the last element.) Because of this, the |
218 |
> |
* poll operation, considered individually, is not wait-free. One |
219 |
> |
* thief cannot successfully continue until another in-progress |
220 |
> |
* one (or, if previously empty, a push) completes. However, in |
221 |
> |
* the aggregate, we ensure at least probabilistic |
222 |
|
* non-blockingness. If an attempted steal fails, a thief always |
223 |
|
* chooses a different random victim target to try next. So, in |
224 |
|
* order for one thief to progress, it suffices for any |
260 |
|
* resizing) but we use only a simple spinlock (using field |
261 |
|
* qlock), because submitters encountering a busy queue move on to |
262 |
|
* try or create other queues -- they block only when creating and |
263 |
< |
* registering new queues. |
263 |
> |
* registering new queues. Additionally, "qlock" saturates to an |
264 |
> |
* unlockable value (-1) at shutdown. Unlocking still can be and |
265 |
> |
* is performed by cheaper ordered writes of "qlock" in successful |
266 |
> |
* cases, but uses CAS in unsuccessful cases. |
267 |
|
* |
268 |
|
* Management |
269 |
|
* ========== |
417 |
|
* cannot find a task to steal, it deactivates and enqueues. Very |
418 |
|
* often, the lack of tasks is transient due to GC or OS |
419 |
|
* scheduling. To reduce false-alarm deactivation, scanners |
420 |
< |
* compute checksums of queue states during sweeps. They give up |
421 |
< |
* and try to deactivate only after the sum is stable across |
422 |
< |
* scans. Further, to avoid missed signals, they repeat this |
423 |
< |
* scanning process after successful enqueuing until again stable. |
424 |
< |
* In this state, the worker cannot take/run a task it sees until |
425 |
< |
* it is released from the queue, so the worker itself eventually |
426 |
< |
* tries to release itself or any successor (see tryRelease). |
427 |
< |
* Otherwise, upon an empty scan, a deactivated worker uses an |
428 |
< |
* adaptive local spin construction (see awaitWork) before |
429 |
< |
* blocking (via park). Note the unusual conventions about |
420 |
> |
* compute checksums of queue states during sweeps. (The |
421 |
> |
* stability checks used here and elsewhere are probablistic |
422 |
> |
* variants of snapshot techniques -- see Herlihy & Shavit.) |
423 |
> |
* Workers give up and try to deactivate only after the sum is |
424 |
> |
* stable across scans. Further, to avoid missed signals, they |
425 |
> |
* repeat this scanning process after successful enqueuing until |
426 |
> |
* again stable. In this state, the worker cannot take/run a task |
427 |
> |
* it sees until it is released from the queue, so the worker |
428 |
> |
* itself eventually tries to release itself or any successor (see |
429 |
> |
* tryRelease). Otherwise, upon an empty scan, a deactivated |
430 |
> |
* worker uses an adaptive local spin construction (see awaitWork) |
431 |
> |
* before blocking (via park). Note the unusual conventions about |
432 |
|
* Thread.interrupts surrounding parking and other blocking: |
433 |
|
* Because interrupts are used solely to alert threads to check |
434 |
|
* termination, which is checked anyway upon blocking, we clear |
448 |
|
* noticeably long, and the time between signalling a thread and |
449 |
|
* it actually making progress can be very noticeably long, so it |
450 |
|
* is worth offloading these delays from critical paths as much as |
451 |
< |
* possible. Also, because enqueued workers are often rescanning |
451 |
> |
* possible. Also, because inactive workers are often rescanning |
452 |
|
* or spinning rather than blocking, we set and clear the "parker" |
453 |
|
* field of WorkQueues to reduce unnecessary calls to unpark. |
454 |
|
* (This requires a secondary recheck to avoid missed signals.) |
460 |
|
* number of threads decreases, eventually removing all workers. |
461 |
|
* Also, when more than two spare threads exist, excess threads |
462 |
|
* are immediately terminated at the next quiescent point. |
463 |
< |
* (Padding by two avoids hysteresis). |
463 |
> |
* (Padding by two avoids hysteresis.) |
464 |
|
* |
465 |
|
* Shutdown and Termination. A call to shutdownNow invokes |
466 |
|
* tryTerminate to atomically set a runState bit. The calling |
471 |
|
* number of workers). Calls to non-abrupt shutdown() preface |
472 |
|
* this by checking whether termination should commence. This |
473 |
|
* relies primarily on the active count bits of "ctl" maintaining |
474 |
< |
* consensus about quiescence. However, external submitters do not |
475 |
< |
* take part in this consensus. So, tryTerminate sweeps through |
476 |
< |
* queues to ensure lack of in-flight submissions and workers |
474 |
> |
* consensus -- tryterminate is called from awaitWork whenever |
475 |
> |
* quiescent. However, external submitters do not take part in |
476 |
> |
* this consensus. So, tryTerminate sweeps through queues (until |
477 |
> |
* stable) to ensure lack of in-flight submissions and workers |
478 |
|
* about to process them before triggering the "STOP" phase of |
479 |
< |
* termination. |
479 |
> |
* termination. (Note: there is an intrinsic conflict if |
480 |
> |
* helpQuiescePool is called when shutdown is enabled. Both wait |
481 |
> |
* for quiesence, but tryTerminate is biased to not trigger until |
482 |
> |
* helpQuiescePool completes.) |
483 |
> |
* |
484 |
|
* |
485 |
|
* Joining Tasks |
486 |
|
* ============= |
565 |
|
* continuation tasks) blocks on a join and there still remain |
566 |
|
* enough threads to ensure liveness. |
567 |
|
* |
568 |
< |
* Bounds. The compensation mechanism may be bounded. Bounds for |
569 |
< |
* the commonPool (see commonMaxSpares) better enable JVMs to cope |
568 |
> |
* The compensation mechanism may be bounded. Bounds for the |
569 |
> |
* commonPool (see commonMaxSpares) better enable JVMs to cope |
570 |
|
* with programming errors and abuse before running out of |
571 |
|
* resources to do so. In other cases, users may supply factories |
572 |
|
* that limit thread construction. The effects of bounding in this |
720 |
|
// Masks and units for WorkQueue.scanState and ctl sp subfield |
721 |
|
static final int SCANNING = 1; // false when running tasks |
722 |
|
static final int INACTIVE = 1 << 31; // must be negative |
723 |
< |
static final int SS_SHIFT = 16; // shift for version count |
716 |
< |
static final int SS_SEQ = 1 << SS_SHIFT; // version number |
717 |
< |
static final int SS_MASK = 0x7fffffff; // mask on update |
723 |
> |
static final int SS_SEQ = 1 << 16; // version count |
724 |
|
|
725 |
|
// Mode bits for ForkJoinPool.config and WorkQueue.config |
726 |
< |
static final int MODE_MASK = SMASK << 16; |
726 |
> |
static final int MODE_MASK = 0xffff << 16; // top half of int |
727 |
|
static final int LIFO_QUEUE = 0; |
728 |
|
static final int FIFO_QUEUE = 1 << 16; |
729 |
|
static final int SHARED_QUEUE = 1 << 31; // must be negative |
830 |
|
if ((p = pool) != null) |
831 |
|
p.signalWork(p.workQueues, this); |
832 |
|
} |
833 |
< |
else if (n == m) |
833 |
> |
else if (n >= m) |
834 |
|
growArray(); |
835 |
|
} |
836 |
|
} |
1166 |
|
s != Thread.State.TIMED_WAITING); |
1167 |
|
} |
1168 |
|
|
1169 |
< |
// Unsafe mechanics |
1169 |
> |
// Unsafe mechanics. Note that some are (and must be) the same as in FJP |
1170 |
|
private static final sun.misc.Unsafe U; |
1171 |
|
private static final int ABASE; |
1172 |
|
private static final int ASHIFT; |
1284 |
|
* spins. If/when MWAIT-like intrinsics becomes available, they |
1285 |
|
* may allow quieter spinning. The value of SPINS must be a power |
1286 |
|
* of two, at least 4. The current value causes spinning for a |
1287 |
< |
* small fraction of context-switch times that is worthwhile given |
1288 |
< |
* the typical likelihoods that blocking is not necessary. |
1287 |
> |
* small fraction of typical context-switch times, well worthwhile |
1288 |
> |
* given the typical likelihoods that blocking is not necessary. |
1289 |
|
*/ |
1290 |
|
private static final int SPINS = 1 << 11; |
1291 |
|
|
1583 |
|
*/ |
1584 |
|
final void signalWork(WorkQueue[] ws, WorkQueue q) { |
1585 |
|
long c; int sp, i; WorkQueue v; Thread p; |
1586 |
< |
while ((c = ctl) < 0L) { |
1586 |
> |
while ((c = ctl) < 0L) { // too few active |
1587 |
|
if ((sp = (int)c) == 0) { // no idle workers |
1588 |
|
if ((c & ADD_WORKER) != 0L) // too few workers |
1589 |
|
tryAddWorker(c); |
1621 |
|
*/ |
1622 |
|
private boolean tryRelease(long c, WorkQueue v, long inc) { |
1623 |
|
int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p; |
1624 |
< |
if (v != null && v.scanState == sp) { // v is at top of stack |
1624 |
> |
if (v != null && v.scanState == sp) { // v is at top of stack |
1625 |
|
long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred); |
1626 |
|
if (U.compareAndSwapLong(this, CTL, c, nc)) { |
1627 |
|
v.scanState = vs; |
1783 |
|
U.park(false, parkTime); |
1784 |
|
U.putOrderedObject(w, QPARKER, null); |
1785 |
|
U.putObject(wt, PARKBLOCKER, null); |
1780 |
– |
if (w.qlock < 0) // terminated while parked |
1781 |
– |
return false; |
1786 |
|
if (w.scanState >= 0) |
1787 |
|
break; |
1788 |
|
if (parkTime != 0L && ctl == c && |
2031 |
|
* caller if, by the time it tries to use the queue, it is empty. |
2032 |
|
*/ |
2033 |
|
private WorkQueue findNonEmptyStealQueue() { |
2034 |
< |
int r = ThreadLocalRandom.nextSecondarySeed(), oldSum = 0, checkSum; |
2035 |
< |
do { |
2036 |
< |
checkSum = 0; |
2037 |
< |
WorkQueue[] ws; WorkQueue q; int m, k, b; |
2038 |
< |
if ((ws = workQueues) != null && (m = ws.length - 1) > 0) { |
2039 |
< |
for (int i = 0; i <= m; ++i) { |
2040 |
< |
if ((k = (i + r + m) & m) <= m && k >= 0 && |
2041 |
< |
(q = ws[k]) != null) { |
2042 |
< |
if ((b = q.base) - q.top < 0) |
2043 |
< |
return q; |
2044 |
< |
checkSum += b; |
2045 |
< |
} |
2034 |
> |
WorkQueue[] ws; int m; // one-shot version of scan loop |
2035 |
> |
int r = ThreadLocalRandom.nextSecondarySeed(); |
2036 |
> |
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { |
2037 |
> |
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { |
2038 |
> |
WorkQueue q; int b; |
2039 |
> |
if ((q = ws[k]) != null) { |
2040 |
> |
if ((b = q.base) - q.top < 0) |
2041 |
> |
return q; |
2042 |
> |
checkSum += b; |
2043 |
> |
} |
2044 |
> |
if ((k = (k + 1) & m) == origin) { |
2045 |
> |
if (oldSum == (oldSum = checkSum)) |
2046 |
> |
break; |
2047 |
> |
checkSum = 0; |
2048 |
|
} |
2049 |
|
} |
2050 |
< |
} while (oldSum != (oldSum = checkSum)); |
2050 |
> |
} |
2051 |
|
return null; |
2052 |
|
} |
2053 |
|
|
2058 |
|
* find tasks either. |
2059 |
|
*/ |
2060 |
|
final void helpQuiescePool(WorkQueue w) { |
2061 |
+ |
ForkJoinTask<?> ps = w.currentSteal; // save context |
2062 |
|
for (boolean active = true;;) { |
2063 |
|
long c; WorkQueue q; ForkJoinTask<?> t; int b; |
2064 |
< |
while ((t = w.nextLocalTask()) != null) |
2058 |
< |
t.doExec(); |
2064 |
> |
w.execLocalTasks(); // run locals before each scan |
2065 |
|
if ((q = findNonEmptyStealQueue()) != null) { |
2066 |
|
if (!active) { // re-establish active count |
2067 |
|
active = true; |
2068 |
|
U.getAndAddLong(this, CTL, AC_UNIT); |
2069 |
|
} |
2070 |
|
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { |
2065 |
– |
ForkJoinTask<?> ps = w.currentSteal; |
2071 |
|
U.putOrderedObject(w, QCURRENTSTEAL, t); |
2072 |
|
t.doExec(); |
2068 |
– |
U.putOrderedObject(w, QCURRENTSTEAL, ps); |
2073 |
|
++w.nsteals; |
2074 |
|
} |
2075 |
|
} |
2084 |
|
U.compareAndSwapLong(this, CTL, c, c + AC_UNIT)) |
2085 |
|
break; |
2086 |
|
} |
2087 |
+ |
U.putOrderedObject(w, QCURRENTSTEAL, ps); |
2088 |
|
} |
2089 |
|
|
2090 |
|
/** |
2185 |
|
|
2186 |
|
if ((rs & STOP) == 0) { |
2187 |
|
if (!now) { // check quiescence |
2188 |
< |
outer: for (long oldSum = 0L;;) { // repeat until stable |
2188 |
> |
for (long oldSum = 0L;;) { // repeat until stable |
2189 |
|
WorkQueue[] ws; WorkQueue w; int m, b; long c; |
2190 |
|
long checkSum = ctl; |
2191 |
|
if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0) |
2196 |
|
if ((w = ws[i]) != null) { |
2197 |
|
if ((b = w.base) != w.top || w.scanState >= 0 || |
2198 |
|
w.currentSteal != null) { |
2194 |
– |
if ((runState & STOP) != 0) |
2195 |
– |
break outer; // already stopping |
2199 |
|
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); |
2200 |
< |
return false; // ensure recheck |
2200 |
> |
return false; // arrange for recheck |
2201 |
|
} |
2202 |
|
checkSum += b; |
2203 |
|
if ((i & 1) == 0) |
3287 |
|
private static final int ABASE; |
3288 |
|
private static final int ASHIFT; |
3289 |
|
private static final long CTL; |
3287 |
– |
private static final long PARKBLOCKER; |
3288 |
– |
private static final long STEALCOUNT; |
3290 |
|
private static final long RUNSTATE; |
3291 |
< |
private static final long QBASE; |
3291 |
> |
private static final long STEALCOUNT; |
3292 |
> |
private static final long PARKBLOCKER; |
3293 |
> |
private static final long QBASE; // these must be same as in WorkQueue |
3294 |
|
private static final long QTOP; |
3295 |
|
private static final long QLOCK; |
3296 |
|
private static final long QSCANSTATE; |
3305 |
|
Class<?> k = ForkJoinPool.class; |
3306 |
|
CTL = U.objectFieldOffset |
3307 |
|
(k.getDeclaredField("ctl")); |
3305 |
– |
STEALCOUNT = U.objectFieldOffset |
3306 |
– |
(k.getDeclaredField("stealCount")); |
3308 |
|
RUNSTATE = U.objectFieldOffset |
3309 |
|
(k.getDeclaredField("runState")); |
3310 |
+ |
STEALCOUNT = U.objectFieldOffset |
3311 |
+ |
(k.getDeclaredField("stealCount")); |
3312 |
|
Class<?> tk = Thread.class; |
3313 |
|
PARKBLOCKER = U.objectFieldOffset |
3314 |
|
(tk.getDeclaredField("parkBlocker")); |