289 |
|
* and their negations (used for thresholding) to fit into 16bit |
290 |
|
* subfields. |
291 |
|
* |
292 |
< |
* Field "runState" holds CASed state bits (STARTED, STOP, etc). |
293 |
< |
* |
294 |
< |
* Field "workQueues" holds references to WorkQueues. We need a |
295 |
< |
* lock to create, resize, and install workers in this array. We |
296 |
< |
* opportunistically use the "stealCounter" object as a monitor |
297 |
< |
* protecting access; although it too is lazily initialized (see |
298 |
< |
* tryInitialize) and only exists when runstate is STARTED. The |
299 |
< |
* workQueues array is otherwise concurrently readable, and |
300 |
< |
* accessed directly. We also ensure that reads of the array |
301 |
< |
* reference itself never become too stale (for example, |
292 |
> |
* Field "runState" holds state bits (STARTED, STOP, etc). After |
293 |
> |
* starting, the field is updated under a lock (only during |
294 |
> |
* shutdown); opportunistically using the "stealCounter" object as |
295 |
> |
* a monitor. However stealCounter is itself lazily initialized, |
296 |
> |
* so tryInitialize bootstraps this using a private static lock. |
297 |
> |
* |
298 |
> |
* Field "workQueues" holds references to WorkQueues. It is |
299 |
> |
* updated only under the lock, but is otherwise concurrently |
300 |
> |
* readable, and accessed directly. We also ensure that reads of |
301 |
> |
* the array reference itself never become too stale (for example, |
302 |
|
* re-reading before each scan). To simplify index-based |
303 |
|
* operations, the array size is always a power of two, and all |
304 |
|
* readers must tolerate null slots. Worker queues are at odd |
1158 |
|
} |
1159 |
|
} |
1160 |
|
else if (base == b) // replace with proxy |
1161 |
< |
removed = |
1162 |
< |
U.compareAndSwapObject(a, offset, t, |
1163 |
< |
new EmptyTask()); |
1161 |
> |
removed = U.compareAndSwapObject(a, offset, t, |
1162 |
> |
new EmptyTask()); |
1163 |
|
if (removed) { |
1164 |
|
ForkJoinTask<?> ps = currentSteal; |
1165 |
|
(currentSteal = task).doExec(); |
1449 |
|
final UncaughtExceptionHandler ueh; // per-worker UEH |
1450 |
|
|
1451 |
|
/** |
1452 |
< |
* Instantiates fields upon first submission, and/or throws |
1453 |
< |
* exception if terminating. Called only by externalPush. |
1452 |
> |
* Instantiates fields upon first submission, or upon shutdown if |
1453 |
> |
* no submissions. If checkTermination true, also responds to |
1454 |
> |
* termination by external calls submitting tasks. |
1455 |
|
*/ |
1456 |
< |
private void tryInitialize() { |
1456 |
> |
private void tryInitialize(boolean checkTermination) { |
1457 |
|
if ((runState & STARTED) == 0) { // bootstrap by locking static field |
1458 |
|
int rs; // create workQueues array with size a power of two |
1459 |
|
int p = config & SMASK; // ensure at least 2 slots |
1460 |
|
int n = (p > 1) ? p - 1 : 1; |
1461 |
|
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; |
1462 |
< |
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; |
1463 |
< |
synchronized (modifyThreadPermission) { |
1464 |
< |
if ((runState & STARTED) == 0) { |
1465 |
< |
stealCounter = new AtomicLong(); |
1466 |
< |
workQueues = new WorkQueue[n]; |
1467 |
< |
do {} while (!U.compareAndSwapInt(this, RUNSTATE, |
1468 |
< |
rs = runState, |
1469 |
< |
rs | STARTED)); |
1462 |
> |
n |= n >>> 8; n |= n >>> 16; n = ((n + 1) << 1) & SMASK; |
1463 |
> |
AtomicLong sc = new AtomicLong(); |
1464 |
> |
WorkQueue[] ws = new WorkQueue[n]; |
1465 |
> |
synchronized(modifyThreadPermission) { // double-check |
1466 |
> |
if (((rs = runState) & STARTED) == 0) { |
1467 |
> |
workQueues = ws; |
1468 |
> |
runState = rs | STARTED; |
1469 |
> |
stealCounter = sc; |
1470 |
|
} |
1471 |
|
} |
1472 |
|
} |
1473 |
< |
if (runState < 0) { |
1473 |
> |
if (checkTermination && runState < 0) { |
1474 |
|
tryTerminate(false, false); // help terminate |
1475 |
|
throw new RejectedExecutionException(); |
1476 |
|
} |
1533 |
|
*/ |
1534 |
|
final WorkQueue registerWorker(ForkJoinWorkerThread wt) { |
1535 |
|
UncaughtExceptionHandler handler; |
1536 |
< |
Object lock = stealCounter; |
1536 |
> |
AtomicLong lock = stealCounter; |
1537 |
|
wt.setDaemon(true); // configure thread |
1538 |
|
if ((handler = ueh) != null) |
1539 |
|
wt.setUncaughtExceptionHandler(handler); |
1580 |
|
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { |
1581 |
|
WorkQueue w = null; |
1582 |
|
if (wt != null && (w = wt.workQueue) != null) { |
1583 |
< |
Object lock; WorkQueue[] ws; // remove index from array |
1583 |
> |
AtomicLong lock; WorkQueue[] ws; // remove index from array |
1584 |
|
int idx = w.config & SMASK; |
1585 |
|
if ((lock = stealCounter) != null) { |
1586 |
|
synchronized (lock) { |
1775 |
|
long deadline = (((scale <= 0) ? 1 : scale) * IDLE_TIMEOUT_MS + |
1776 |
|
System.currentTimeMillis()); |
1777 |
|
if (w != null && w.scanState < 0) { |
1778 |
< |
int ss; Object lock; |
1778 |
> |
int ss; AtomicLong lock; |
1779 |
|
if (runState < 0 && tryTerminate(false, false)) |
1780 |
|
stat = w.qlock = -1; // help terminate |
1781 |
|
else if ((stat = w.qlock) >= 0 && w.scanState < 0) { |
1914 |
|
if (t == null || b++ != q.base) |
1915 |
|
break; // busy or empty |
1916 |
|
else if (ss < 0) { |
1917 |
< |
if ((ss = w.scanState) >= 0) { |
1918 |
< |
tryReactivate(w, ws, r); |
1919 |
< |
break; // retry upon rescan |
1920 |
< |
} |
1921 |
< |
r |= (1 << 31); // ensure full scan |
1917 |
> |
tryReactivate(w, ws, r); |
1918 |
> |
break; // retry upon rescan |
1919 |
|
} |
1920 |
|
else if (!U.compareAndSwapObject(a, offset, t, null)) |
1921 |
|
break; // contended |
2343 |
|
* @return true if now terminating or terminated |
2344 |
|
*/ |
2345 |
|
private boolean tryTerminate(boolean now, boolean enable) { |
2346 |
< |
int rs; |
2347 |
< |
for (;;) { |
2348 |
< |
if ((rs = runState) < 0) // already shut down |
2349 |
< |
break; |
2350 |
< |
if (!enable || this == common) |
2351 |
< |
return false; |
2352 |
< |
if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | SHUTDOWN)) |
2356 |
< |
break; |
2346 |
> |
AtomicLong lock; int rs; |
2347 |
> |
if ((rs = runState) >= 0 && (!enable || this == common)) |
2348 |
> |
return false; |
2349 |
> |
while ((lock = stealCounter) == null) |
2350 |
> |
tryInitialize(false); |
2351 |
> |
synchronized(lock) { |
2352 |
> |
rs = runState = runState | SHUTDOWN; |
2353 |
|
} |
2354 |
|
|
2355 |
|
if ((rs & STOP) == 0) { |
2379 |
|
break; |
2380 |
|
} |
2381 |
|
} |
2382 |
< |
while (((rs = runState) & STOP) == 0) // enter STOP phase |
2383 |
< |
U.compareAndSwapInt(this, RUNSTATE, rs, rs | STOP); |
2382 |
> |
synchronized(lock) { |
2383 |
> |
rs = runState = runState | STOP; |
2384 |
> |
} |
2385 |
|
} |
2386 |
|
|
2387 |
|
int pass = 0; // 3 passes to help terminate |
2390 |
|
long checkSum = ctl; |
2391 |
|
if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 || |
2392 |
|
(ws = workQueues) == null || (m = ws.length - 1) < 0) { |
2393 |
< |
while (((rs = runState) & TERMINATED) == 0) { |
2394 |
< |
if (U.compareAndSwapInt(this, RUNSTATE, rs, |
2395 |
< |
rs | TERMINATED)) { |
2396 |
< |
synchronized (this) { |
2397 |
< |
notifyAll(); // for awaitTermination |
2401 |
< |
} |
2402 |
< |
break; |
2403 |
< |
} |
2393 |
> |
synchronized(lock) { |
2394 |
> |
rs = runState = runState | TERMINATED; |
2395 |
> |
} |
2396 |
> |
synchronized (this) { |
2397 |
> |
notifyAll(); // for awaitTermination |
2398 |
|
} |
2399 |
|
break; |
2400 |
|
} |
2441 |
|
* @param index the index of the new queue |
2442 |
|
*/ |
2443 |
|
private void tryCreateExternalQueue(int index) { |
2444 |
< |
Object lock; |
2444 |
> |
AtomicLong lock; |
2445 |
|
if ((lock = stealCounter) != null && index >= 0) { |
2446 |
|
WorkQueue q = new WorkQueue(this, null); |
2447 |
|
q.config = index; |
2486 |
|
int rs = runState; |
2487 |
|
WorkQueue[] ws = workQueues; |
2488 |
|
if (rs <= 0 || ws == null || (wl = ws.length) <= 0) |
2489 |
< |
tryInitialize(); |
2489 |
> |
tryInitialize(true); |
2490 |
|
else if ((q = ws[k = (wl - 1) & r & SQMASK]) == null) |
2491 |
|
tryCreateExternalQueue(k); |
2492 |
|
else if ((stat = q.sharedPush(task)) < 0) |
3412 |
|
// Unsafe mechanics |
3413 |
|
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); |
3414 |
|
private static final long CTL; |
3421 |
– |
private static final long RUNSTATE; |
3415 |
|
private static final int ABASE; |
3416 |
|
private static final int ASHIFT; |
3417 |
|
|
3419 |
|
try { |
3420 |
|
CTL = U.objectFieldOffset |
3421 |
|
(ForkJoinPool.class.getDeclaredField("ctl")); |
3429 |
– |
RUNSTATE = U.objectFieldOffset |
3430 |
– |
(ForkJoinPool.class.getDeclaredField("runState")); |
3431 |
– |
|
3422 |
|
ABASE = U.arrayBaseOffset(ForkJoinTask[].class); |
3423 |
|
int scale = U.arrayIndexScale(ForkJoinTask[].class); |
3424 |
|
if ((scale & (scale - 1)) != 0) |