289 |
|
* and their negations (used for thresholding) to fit into 16bit |
290 |
|
* subfields. |
291 |
|
* |
292 |
< |
* Field "runState" holds state bits (STARTED, STOP, etc). After |
293 |
< |
* starting, the field is updated (only during shutdown) under the |
294 |
< |
* auxState lock. |
292 |
> |
* Field "runState" holds lifetime status, atomically and |
293 |
> |
* monotonically setting STARTED, SHUTDOWN, STOP, and finally |
294 |
> |
* TERMINATED bits. |
295 |
|
* |
296 |
|
* Field "auxState" is a ReentrantLock subclass that also |
297 |
|
* opportunistically holds some other bookkeeping fields accessed |
298 |
|
* only when locked. It is mainly used to lock (infrequent) |
299 |
< |
* updates to runState and workQueues. The auxState instance is |
300 |
< |
* itself lazily constructed (see tryInitialize), requiring a |
301 |
< |
* double-check-style bootstrapping use of field runState, and |
302 |
< |
* locking a private static. |
299 |
> |
* updates to workQueues. The auxState instance is itself lazily |
300 |
> |
* constructed (see tryInitialize), requiring a double-check-style |
301 |
> |
* bootstrapping use of field runState, and locking a private |
302 |
> |
* static. |
303 |
|
* |
304 |
|
* Field "workQueues" holds references to WorkQueues. It is |
305 |
|
* updated (only during worker creation and termination) under the |
460 |
|
* thread, as well as every other worker thereafter terminating, |
461 |
|
* helps terminate others by setting their (qlock) status, |
462 |
|
* cancelling their unprocessed tasks, and waking them up, doing |
463 |
< |
* so repeatedly until stable (but with a loop bounded by the |
464 |
< |
* number of workers). Calls to non-abrupt shutdown() preface |
465 |
< |
* this by checking whether termination should commence. This |
466 |
< |
* relies primarily on the active count bits of "ctl" maintaining |
467 |
< |
* consensus -- tryTerminate is called from awaitWork whenever |
468 |
< |
* quiescent. However, external submitters do not take part in |
469 |
< |
* this consensus. So, tryTerminate sweeps through queues (until |
470 |
< |
* stable) to ensure lack of in-flight submissions and workers |
471 |
< |
* about to process them before triggering the "STOP" phase of |
472 |
< |
* termination. (Note: there is an intrinsic conflict if |
473 |
< |
* helpQuiescePool is called when shutdown is enabled. Both wait |
474 |
< |
* for quiescence, but tryTerminate is biased to not trigger until |
475 |
< |
* helpQuiescePool completes.) |
463 |
> |
* so repeatedly until stable. Calls to non-abrupt shutdown() |
464 |
> |
* preface this by checking whether termination should |
465 |
> |
* commence. This relies primarily on the active count bits of |
466 |
> |
* "ctl" maintaining consensus -- tryTerminate is called from |
467 |
> |
* awaitWork whenever quiescent. However, external submitters do |
468 |
> |
* not take part in this consensus. So, tryTerminate sweeps |
469 |
> |
* through queues (until stable) to ensure lack of in-flight |
470 |
> |
* submissions and workers about to process them before triggering |
471 |
> |
* the "STOP" phase of termination. (Note: there is an intrinsic |
472 |
> |
* conflict if helpQuiescePool is called when shutdown is |
473 |
> |
* enabled. Both wait for quiescence, but tryTerminate is biased |
474 |
> |
* to not trigger until helpQuiescePool completes.) |
475 |
|
* |
476 |
|
* Joining Tasks |
477 |
|
* ============= |
1468 |
|
* termination by external calls submitting tasks. |
1469 |
|
*/ |
1470 |
|
private void tryInitialize(boolean checkTermination) { |
1471 |
< |
if ((runState & STARTED) == 0) { // bootstrap by locking static field |
1472 |
< |
int rs; // create workQueues array with size a power of two |
1473 |
< |
int p = config & SMASK; // ensure at least 2 slots |
1474 |
< |
int n = (p > 1) ? p - 1 : 1; |
1476 |
< |
n |= n >>> 1; |
1471 |
> |
if (runState == 0) { // bootstrap by locking static field |
1472 |
> |
int p = config & SMASK; |
1473 |
> |
int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots |
1474 |
> |
n |= n >>> 1; // create workQueues array with size a power of two |
1475 |
|
n |= n >>> 2; |
1476 |
|
n |= n >>> 4; |
1477 |
|
n |= n >>> 8; |
1480 |
|
AuxState aux = new AuxState(); |
1481 |
|
WorkQueue[] ws = new WorkQueue[n]; |
1482 |
|
synchronized (modifyThreadPermission) { // double-check |
1483 |
< |
if (((rs = runState) & STARTED) == 0) { |
1486 |
< |
rs |= STARTED; |
1483 |
> |
if (runState == 0) { |
1484 |
|
workQueues = ws; |
1485 |
|
auxState = aux; |
1486 |
< |
runState = rs; |
1486 |
> |
runState = STARTED; |
1487 |
|
} |
1488 |
|
} |
1489 |
|
} |
1557 |
|
WorkQueue w = new WorkQueue(this, wt); |
1558 |
|
int i = 0; // assign a pool index |
1559 |
|
int mode = config & MODE_MASK; |
1563 |
– |
int rs = runState; |
1560 |
|
if ((aux = auxState) != null) { |
1561 |
|
aux.lock(); |
1562 |
|
try { |
1599 |
|
*/ |
1600 |
|
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { |
1601 |
|
WorkQueue w = null; |
1606 |
– |
int rs = runState; |
1602 |
|
if (wt != null && (w = wt.workQueue) != null) { |
1603 |
|
AuxState aux; WorkQueue[] ws; // remove index from array |
1604 |
|
int idx = w.config & SMASK; |
2370 |
|
* @param now if true, unconditionally terminate, else only |
2371 |
|
* if no work and no active workers |
2372 |
|
* @param enable if true, terminate when next possible |
2373 |
< |
* @return -1 : terminating or terminated, 0: retry if internal caller, else 1 |
2373 |
> |
* @return -1: terminating/terminated, 0: retry if internal caller, else 1 |
2374 |
|
*/ |
2375 |
|
private int tryTerminate(boolean now, boolean enable) { |
2376 |
< |
AuxState aux; |
2382 |
< |
while ((aux = auxState) == null) |
2383 |
< |
tryInitialize(false); // ensure initialized |
2376 |
> |
int rs; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED |
2377 |
|
|
2378 |
< |
if ((runState & SHUTDOWN) == 0) { |
2379 |
< |
if (!enable || this == common) |
2378 |
> |
while ((rs = runState) >= 0) { |
2379 |
> |
if (!enable || this == common) // cannot shutdown |
2380 |
|
return 1; |
2381 |
< |
aux.lock(); |
2382 |
< |
runState = runState | SHUTDOWN; |
2383 |
< |
aux.unlock(); |
2381 |
> |
else if (rs == 0) |
2382 |
> |
tryInitialize(false); // ensure initialized |
2383 |
> |
else |
2384 |
> |
U.compareAndSwapInt(this, RUNSTATE, rs, rs | SHUTDOWN); |
2385 |
|
} |
2386 |
|
|
2387 |
< |
if ((runState & STOP) == 0) { |
2387 |
> |
if ((rs & STOP) == 0) { // try to initiate termination |
2388 |
|
if (!now) { // check quiescence |
2389 |
|
for (long oldSum = 0L;;) { // repeat until stable |
2390 |
|
WorkQueue[] ws; WorkQueue w; int b; |
2404 |
|
break; |
2405 |
|
} |
2406 |
|
} |
2407 |
< |
aux.lock(); |
2408 |
< |
runState = runState | STOP; |
2415 |
< |
aux.unlock(); |
2407 |
> |
do {} while (!U.compareAndSwapInt(this, RUNSTATE, |
2408 |
> |
rs = runState, rs | STOP)); |
2409 |
|
} |
2410 |
|
|
2411 |
|
for (long oldSum = 0L;;) { // repeat until stable |
2433 |
|
} |
2434 |
|
|
2435 |
|
if ((short)(ctl >>> TC_SHIFT) + (config & SMASK) <= 0) { |
2436 |
< |
aux.lock(); |
2444 |
< |
runState |= TERMINATED; |
2445 |
< |
aux.unlock(); |
2436 |
> |
runState = (STARTED | SHUTDOWN | STOP | TERMINATED); // final write |
2437 |
|
synchronized (this) { |
2438 |
|
notifyAll(); // for awaitTermination |
2439 |
|
} |
2460 |
|
q.qlock = 1; // lock queue |
2461 |
|
boolean installed = false; |
2462 |
|
aux.lock(); |
2463 |
< |
try { // lock pool to install |
2463 |
> |
try { // lock pool to install |
2464 |
|
WorkQueue[] ws; |
2465 |
|
if ((ws = workQueues) != null && index < ws.length && |
2466 |
|
ws[index] == null) { |
2503 |
|
tryInitialize(true); |
2504 |
|
else if ((q = ws[k = (wl - 1) & r & SQMASK]) == null) |
2505 |
|
tryCreateExternalQueue(k); |
2506 |
< |
else if ((stat = q.sharedPush(task)) <= 0) { |
2507 |
< |
if ((runState & STOP) != 0) |
2508 |
< |
tryTerminate(false, false); |
2509 |
< |
else if (stat == 0) |
2519 |
< |
signalWork(); |
2506 |
> |
else if ((stat = q.sharedPush(task)) < 0) |
2507 |
> |
break; |
2508 |
> |
else if (stat == 0) { |
2509 |
> |
signalWork(); |
2510 |
|
break; |
2511 |
|
} |
2512 |
|
else // move if busy |
3425 |
|
// Unsafe mechanics |
3426 |
|
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); |
3427 |
|
private static final long CTL; |
3428 |
+ |
private static final long RUNSTATE; |
3429 |
|
private static final int ABASE; |
3430 |
|
private static final int ASHIFT; |
3431 |
|
|
3433 |
|
try { |
3434 |
|
CTL = U.objectFieldOffset |
3435 |
|
(ForkJoinPool.class.getDeclaredField("ctl")); |
3436 |
+ |
RUNSTATE = U.objectFieldOffset |
3437 |
+ |
(ForkJoinPool.class.getDeclaredField("runState")); |
3438 |
|
ABASE = U.arrayBaseOffset(ForkJoinTask[].class); |
3439 |
|
int scale = U.arrayIndexScale(ForkJoinTask[].class); |
3440 |
|
if ((scale & (scale - 1)) != 0) |