15 |
|
import java.util.Collection; |
16 |
|
import java.util.Collections; |
17 |
|
import java.util.List; |
18 |
< |
import java.util.concurrent.atomic.AtomicLong; |
18 |
> |
import java.util.concurrent.locks.ReentrantLock; |
19 |
|
import java.util.concurrent.locks.LockSupport; |
20 |
|
|
21 |
|
/** |
290 |
|
* subfields. |
291 |
|
* |
292 |
|
* Field "runState" holds state bits (STARTED, STOP, etc). After |
293 |
< |
* starting, the field is updated under a lock (only during |
294 |
< |
* shutdown); opportunistically using the "stealCounter" object as |
295 |
< |
* a monitor. However stealCounter is itself lazily initialized, |
296 |
< |
* so tryInitialize bootstraps this using a private static lock. |
293 |
> |
* starting, the field is updated (only during shutdown) under the |
294 |
> |
* auxState lock. |
295 |
> |
* |
296 |
> |
* Field "auxState" is a ReentrantLock subclass that also |
297 |
> |
* opportunistically holds some other bookkeeping fields accessed |
298 |
> |
* only when locked. It is mainly used to lock (infrequent) |
299 |
> |
* updates to runState and workQueues. The auxState instance is |
300 |
> |
* itself lazily constructed (see tryInitialize), requiring a |
301 |
> |
* double-check-style boostrapping use of field runState, and |
302 |
> |
* locking a private static. |
303 |
|
* |
304 |
|
* Field "workQueues" holds references to WorkQueues. It is |
305 |
< |
* updated only under the lock, but is otherwise concurrently |
306 |
< |
* readable, and accessed directly. We also ensure that reads of |
307 |
< |
* the array reference itself never become too stale (for example, |
308 |
< |
* re-reading before each scan). To simplify index-based |
309 |
< |
* operations, the array size is always a power of two, and all |
310 |
< |
* readers must tolerate null slots. Worker queues are at odd |
311 |
< |
* indices. Shared (submission) queues are at even indices, up to |
312 |
< |
* a maximum of 64 slots, to limit growth even if array needs to |
313 |
< |
* expand to add more workers. Grouping them together in this way |
314 |
< |
* simplifies and speeds up task scanning. |
305 |
> |
* updated (only during worker creation and termination) under the |
306 |
> |
* lock, but is otherwise concurrently readable, and accessed |
307 |
> |
* directly. We also ensure that reads of the array reference |
308 |
> |
* itself never become too stale (for example, re-reading before |
309 |
> |
* each scan). To simplify index-based operations, the array size |
310 |
> |
* is always a power of two, and all readers must tolerate null |
311 |
> |
* slots. Worker queues are at odd indices. Shared (submission) |
312 |
> |
* queues are at even indices, up to a maximum of 64 slots, to |
313 |
> |
* limit growth even if array needs to expand to add more |
314 |
> |
* workers. Grouping them together in this way simplifies and |
315 |
> |
* speeds up task scanning. |
316 |
|
* |
317 |
|
* All worker thread creation is on-demand, triggered by task |
318 |
|
* submissions, replacement of terminated workers, and/or |
705 |
|
public final boolean exec() { return true; } |
706 |
|
} |
707 |
|
|
708 |
+ |
/** |
709 |
+ |
* Additional fields and lock created upon initialization. |
710 |
+ |
*/ |
711 |
+ |
static final class AuxState extends ReentrantLock { |
712 |
+ |
private static final long serialVersionUID = -6001602636862214147L; |
713 |
+ |
volatile long stealCount; // cumulative steal count |
714 |
+ |
long indexSeed; // index bits for registerWorker |
715 |
+ |
AuxState() {} |
716 |
+ |
} |
717 |
+ |
|
718 |
|
// Constants shared across ForkJoinPool and WorkQueue |
719 |
|
|
720 |
|
// Bounds |
885 |
|
if ((a = array) != null && b != s && (al = a.length) > 0) { |
886 |
|
int index = (al - 1) & --s; |
887 |
|
long offset = ((long)index << ASHIFT) + ABASE; |
888 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObject(a, offset); |
889 |
< |
if (t != null && U.compareAndSwapObject(a, offset, t, null)) { |
888 |
> |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
889 |
> |
U.getObject(a, offset); |
890 |
> |
if (t != null && |
891 |
> |
U.compareAndSwapObject(a, offset, t, null)) { |
892 |
|
top = s; |
893 |
|
U.storeFence(); |
894 |
|
return t; |
1037 |
|
if ((a = array) != null && (al = a.length) > 0) { |
1038 |
|
int index = (al - 1) & s; |
1039 |
|
long offset = ((long)index << ASHIFT) + ABASE; |
1040 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObject(a, offset); |
1040 |
> |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1041 |
> |
U.getObject(a, offset); |
1042 |
|
if (t == task && |
1043 |
|
U.compareAndSwapInt(this, QLOCK, 0, 1)) { |
1044 |
|
if (U.compareAndSwapObject(a, offset, task, null)) { |
1139 |
|
} |
1140 |
|
|
1141 |
|
/** |
1142 |
< |
* Adds steal count to pool stealCounter if it exists, and resets. |
1142 |
> |
* Adds steal count to pool steal count if it exists, and resets. |
1143 |
|
*/ |
1144 |
|
final void transferStealCount(ForkJoinPool p) { |
1145 |
< |
AtomicLong sc; |
1146 |
< |
if (p != null && (sc = p.stealCounter) != null) { |
1145 |
> |
AuxState aux; |
1146 |
> |
if (p != null && (aux = p.auxState) != null) { |
1147 |
|
int s = nsteals; |
1148 |
|
nsteals = 0; // if negative, correct for overflow |
1149 |
< |
sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s)); |
1149 |
> |
long inc = (long)(s < 0 ? Integer.MAX_VALUE : s); |
1150 |
> |
aux.lock(); |
1151 |
> |
try { |
1152 |
> |
aux.stealCount += inc; |
1153 |
> |
} finally { |
1154 |
> |
aux.unlock(); |
1155 |
> |
} |
1156 |
|
} |
1157 |
|
} |
1158 |
|
|
1467 |
|
volatile long ctl; // main pool control |
1468 |
|
volatile int runState; |
1469 |
|
final int config; // parallelism, mode |
1470 |
< |
int indexSeed; // to generate worker index |
1445 |
< |
volatile AtomicLong stealCounter; // also used as sync monitor |
1470 |
> |
AuxState auxState; // lock, steal counts |
1471 |
|
volatile WorkQueue[] workQueues; // main registry |
1472 |
|
final String workerNamePrefix; // to create worker name string |
1473 |
|
final ForkJoinWorkerThreadFactory factory; |
1485 |
|
int n = (p > 1) ? p - 1 : 1; |
1486 |
|
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; |
1487 |
|
n |= n >>> 8; n |= n >>> 16; n = ((n + 1) << 1) & SMASK; |
1488 |
< |
AtomicLong sc = new AtomicLong(); |
1488 |
> |
AuxState aux = new AuxState(); |
1489 |
|
WorkQueue[] ws = new WorkQueue[n]; |
1490 |
|
synchronized (modifyThreadPermission) { // double-check |
1491 |
|
if (((rs = runState) & STARTED) == 0) { |
1492 |
+ |
rs |= STARTED; |
1493 |
|
workQueues = ws; |
1494 |
< |
runState = rs | STARTED; |
1495 |
< |
stealCounter = sc; |
1494 |
> |
auxState = aux; |
1495 |
> |
runState = rs; |
1496 |
|
} |
1497 |
|
} |
1498 |
|
} |
1559 |
|
*/ |
1560 |
|
final WorkQueue registerWorker(ForkJoinWorkerThread wt) { |
1561 |
|
UncaughtExceptionHandler handler; |
1562 |
< |
AtomicLong lock = stealCounter; |
1562 |
> |
AuxState aux; |
1563 |
|
wt.setDaemon(true); // configure thread |
1564 |
|
if ((handler = ueh) != null) |
1565 |
|
wt.setUncaughtExceptionHandler(handler); |
1566 |
|
WorkQueue w = new WorkQueue(this, wt); |
1567 |
|
int i = 0; // assign a pool index |
1568 |
|
int mode = config & MODE_MASK; |
1569 |
< |
if (lock != null) { |
1570 |
< |
synchronized (lock) { |
1569 |
> |
int rs = runState; |
1570 |
> |
if ((aux = auxState) != null) { |
1571 |
> |
aux.lock(); |
1572 |
> |
try { |
1573 |
> |
int s = (int)(aux.indexSeed += SEED_INCREMENT), n, m; |
1574 |
|
WorkQueue[] ws = workQueues; |
1546 |
– |
int s = indexSeed += SEED_INCREMENT, n, m; |
1575 |
|
if (ws != null && (n = ws.length) > 0) { |
1576 |
|
i = (m = n - 1) & ((s << 1) | 1); // odd-numbered indices |
1577 |
|
if (ws[i] != null) { // collision |
1590 |
|
w.scanState = i | (s & 0x7fff0000); // random seq bits |
1591 |
|
ws[i] = w; |
1592 |
|
} |
1593 |
+ |
} finally { |
1594 |
+ |
aux.unlock(); |
1595 |
|
} |
1596 |
|
} |
1597 |
|
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); |
1609 |
|
*/ |
1610 |
|
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { |
1611 |
|
WorkQueue w = null; |
1612 |
+ |
int rs = runState; |
1613 |
|
if (wt != null && (w = wt.workQueue) != null) { |
1614 |
< |
AtomicLong lock; WorkQueue[] ws; // remove index from array |
1614 |
> |
AuxState aux; WorkQueue[] ws; // remove index from array |
1615 |
|
int idx = w.config & SMASK; |
1616 |
< |
if ((lock = stealCounter) != null) { |
1617 |
< |
synchronized (lock) { |
1616 |
> |
int ns = w.nsteals; |
1617 |
> |
if ((aux = auxState) != null) { |
1618 |
> |
aux.lock(); |
1619 |
> |
try { |
1620 |
|
if ((ws = workQueues) != null && ws.length > idx && |
1621 |
|
ws[idx] == w) |
1622 |
|
ws[idx] = null; |
1623 |
+ |
aux.stealCount += ns; |
1624 |
+ |
} finally { |
1625 |
+ |
aux.unlock(); |
1626 |
|
} |
1627 |
|
} |
1628 |
|
} |
1635 |
|
} |
1636 |
|
if (w != null) { |
1637 |
|
w.qlock = -1; // ensure set |
1602 |
– |
w.transferStealCount(this); |
1638 |
|
w.cancelAll(); // cancel remaining tasks |
1639 |
|
} |
1640 |
|
for (;;) { // possibly replace |
1735 |
|
(v = ws[m & sp]) != null) { |
1736 |
|
long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + AC_UNIT)); |
1737 |
|
int ns = sp & ~UNSIGNALLED; |
1738 |
< |
if ((w == v || w.scanState < 0) && |
1738 |
> |
if (w.scanState < 0 && |
1739 |
|
v.scanState == sp && |
1740 |
|
U.compareAndSwapLong(this, CTL, c, nc)) { |
1741 |
|
v.scanState = ns; |
1810 |
|
long deadline = (((scale <= 0) ? 1 : scale) * IDLE_TIMEOUT_MS + |
1811 |
|
System.currentTimeMillis()); |
1812 |
|
if (w != null && w.scanState < 0) { |
1813 |
< |
int ss; AtomicLong lock; |
1813 |
> |
int ss; AuxState aux; |
1814 |
|
if (runState < 0 && tryTerminate(false, false)) |
1815 |
|
stat = w.qlock = -1; // help terminate |
1816 |
|
else if ((stat = w.qlock) >= 0 && w.scanState < 0) { |
1820 |
|
w.parker = null; |
1821 |
|
if ((stat = w.qlock) >= 0 && (ss = w.scanState) < 0 && |
1822 |
|
!Thread.interrupted() && (int)c == ss && |
1823 |
< |
(lock = stealCounter) != null && ctl == c && |
1823 |
> |
(aux = auxState) != null && ctl == c && |
1824 |
|
deadline - System.currentTimeMillis() <= TIMEOUT_SLOP_MS) { |
1825 |
< |
synchronized (lock) { // pre-deregister |
1825 |
> |
aux.lock(); |
1826 |
> |
try { // pre-deregister |
1827 |
|
WorkQueue[] ws; |
1828 |
|
int cfg = w.config, idx = cfg & SMASK; |
1829 |
|
long nc = ((UC_MASK & (c - TC_UNIT)) | |
1835 |
|
w.config = cfg | UNREGISTERED; |
1836 |
|
stat = w.qlock = -1; |
1837 |
|
} |
1838 |
+ |
} finally { |
1839 |
+ |
aux.unlock(); |
1840 |
|
} |
1841 |
|
} |
1842 |
|
} |
1949 |
|
long offset = ((long)index << ASHIFT) + ABASE; |
1950 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1951 |
|
U.getObjectVolatile(a, offset); |
1952 |
< |
if (t == null || b++ != q.base) |
1953 |
< |
break; // busy or empty |
1952 |
> |
if (t == null) |
1953 |
> |
break; // empty or busy |
1954 |
> |
else if (b++ != q.base) |
1955 |
> |
break; // busy |
1956 |
|
else if (ss < 0) { |
1957 |
|
tryReactivate(w, ws, r); |
1958 |
|
break; // retry upon rescan |
2097 |
|
if ((a = v.array) != null && (al = a.length) > 0) { |
2098 |
|
int index = (al - 1) & b; |
2099 |
|
long offset = ((long)index << ASHIFT) + ABASE; |
2100 |
< |
t = (ForkJoinTask<?>)U.getObjectVolatile(a, offset); |
2100 |
> |
t = (ForkJoinTask<?>) |
2101 |
> |
U.getObjectVolatile(a, offset); |
2102 |
|
if (t != null && b++ == v.base) { |
2103 |
|
if (j.currentJoin != subtask || |
2104 |
|
v.currentSteal != subtask || |
2384 |
|
* @return true if now terminating or terminated |
2385 |
|
*/ |
2386 |
|
private boolean tryTerminate(boolean now, boolean enable) { |
2387 |
< |
AtomicLong lock; int rs; |
2387 |
> |
AuxState aux; int rs; |
2388 |
|
if ((rs = runState) >= 0 && (!enable || this == common)) |
2389 |
|
return false; |
2390 |
< |
while ((lock = stealCounter) == null) |
2390 |
> |
while ((aux = auxState) == null) |
2391 |
|
tryInitialize(false); |
2392 |
< |
synchronized (lock) { |
2393 |
< |
rs = runState = runState | SHUTDOWN; |
2394 |
< |
} |
2354 |
< |
|
2392 |
> |
aux.lock(); |
2393 |
> |
rs = runState = runState | SHUTDOWN; |
2394 |
> |
aux.unlock(); |
2395 |
|
if ((rs & STOP) == 0) { |
2396 |
|
if (!now) { // check quiescence |
2397 |
|
for (long oldSum = 0L;;) { // repeat until stable |
2419 |
|
break; |
2420 |
|
} |
2421 |
|
} |
2422 |
< |
synchronized (lock) { |
2423 |
< |
rs = runState = runState | STOP; |
2424 |
< |
} |
2422 |
> |
aux.lock(); |
2423 |
> |
rs = runState = runState | STOP; |
2424 |
> |
aux.unlock(); |
2425 |
|
} |
2426 |
|
|
2427 |
|
int pass = 0; // 3 passes to help terminate |
2430 |
|
long checkSum = ctl; |
2431 |
|
if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 || |
2432 |
|
(ws = workQueues) == null || (m = ws.length - 1) < 0) { |
2433 |
< |
synchronized (lock) { |
2434 |
< |
rs = runState = runState | TERMINATED; |
2435 |
< |
} |
2433 |
> |
aux.lock(); |
2434 |
> |
runState |= TERMINATED; |
2435 |
> |
aux.unlock(); |
2436 |
|
synchronized (this) { |
2437 |
|
notifyAll(); // for awaitTermination |
2438 |
|
} |
2481 |
|
* @param index the index of the new queue |
2482 |
|
*/ |
2483 |
|
private void tryCreateExternalQueue(int index) { |
2484 |
< |
AtomicLong lock; |
2485 |
< |
if ((lock = stealCounter) != null && index >= 0) { |
2484 |
> |
AuxState aux; |
2485 |
> |
if ((aux = auxState) != null && index >= 0) { |
2486 |
|
WorkQueue q = new WorkQueue(this, null); |
2487 |
|
q.config = index; |
2488 |
|
q.scanState = ~UNSIGNALLED; |
2489 |
|
q.qlock = 1; // lock queue |
2490 |
|
boolean installed = false; |
2491 |
< |
synchronized (lock) { // lock pool to install |
2491 |
> |
aux.lock(); |
2492 |
> |
try { // lock pool to install |
2493 |
|
WorkQueue[] ws; |
2494 |
|
if ((ws = workQueues) != null && index < ws.length && |
2495 |
|
ws[index] == null) { |
2496 |
|
ws[index] = q; // else throw away |
2497 |
|
installed = true; |
2498 |
|
} |
2499 |
+ |
} finally { |
2500 |
+ |
aux.unlock(); |
2501 |
|
} |
2502 |
|
if (installed) { |
2503 |
|
try { |
2966 |
|
* @return the number of steals |
2967 |
|
*/ |
2968 |
|
public long getStealCount() { |
2969 |
< |
AtomicLong sc = stealCounter; |
2970 |
< |
long count = (sc == null) ? 0L : sc.get(); |
2969 |
> |
AuxState sc = auxState; |
2970 |
> |
long count = (sc == null) ? 0L : sc.stealCount; |
2971 |
|
WorkQueue[] ws; WorkQueue w; |
2972 |
|
if ((ws = workQueues) != null) { |
2973 |
|
for (int i = 1; i < ws.length; i += 2) { |
3098 |
|
public String toString() { |
3099 |
|
// Use a single pass through workQueues to collect counts |
3100 |
|
long qt = 0L, qs = 0L; int rc = 0; |
3101 |
< |
AtomicLong sc = stealCounter; |
3102 |
< |
long st = (sc == null) ? 0L : sc.get(); |
3101 |
> |
AuxState sc = auxState; |
3102 |
> |
long st = (sc == null) ? 0L : sc.stealCount; |
3103 |
|
long c = ctl; |
3104 |
|
WorkQueue[] ws; WorkQueue w; |
3105 |
|
if ((ws = workQueues) != null) { |