635 |
|
|
636 |
|
volatile int eventCount; // encoded inactivation count; < 0 if inactive |
637 |
|
int nextWait; // encoded record of next event waiter |
638 |
– |
int hint; // stability hash or steal index hint |
638 |
|
int nsteals; // number of steals |
639 |
< |
int poolIndex; // index of this queue in pool |
640 |
< |
final int mode; // 0: lifo, > 0: fifo, < 0: shared |
639 |
> |
int hint; // steal index hint |
640 |
> |
short poolIndex; // index of this queue in pool |
641 |
> |
final short mode; // 0: lifo, > 0: fifo, < 0: shared |
642 |
|
volatile int qlock; // 1: locked, -1: terminate; else 0 |
643 |
|
volatile int base; // index of next slot for poll |
644 |
|
int top; // index of next slot for push |
653 |
|
int seed) { |
654 |
|
this.pool = pool; |
655 |
|
this.owner = owner; |
656 |
< |
this.mode = mode; |
656 |
> |
this.mode = (short)mode; |
657 |
|
this.hint = seed; // store initial seed for runWorker |
658 |
|
// Place indices in the center of array (that is not yet allocated) |
659 |
|
base = top = INITIAL_QUEUE_CAPACITY >>> 1; |
1071 |
|
/** |
1072 |
|
* Common pool parallelism. To allow simpler use and management |
1073 |
|
* when common pool threads are disabled, we allow the underlying |
1074 |
< |
* common.config field to be zero, but in that case still report |
1074 |
> |
* common.parallelism field to be zero, but in that case still report |
1075 |
|
* parallelism as 1 to reflect resulting caller-runs mechanics. |
1076 |
|
*/ |
1077 |
|
static final int commonParallelism; |
1216 |
|
volatile long ctl; // main pool control |
1217 |
|
volatile int plock; // shutdown status and seqLock |
1218 |
|
volatile int indexSeed; // worker/submitter index seed |
1219 |
< |
final int config; // mode and parallelism level |
1219 |
> |
final short parallelism; // parallelism level |
1220 |
> |
final short mode; // LIFO/FIFO |
1221 |
|
WorkQueue[] workQueues; // main registry |
1222 |
|
final ForkJoinWorkerThreadFactory factory; |
1223 |
|
final UncaughtExceptionHandler ueh; // per-worker UEH |
1318 |
|
do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed, |
1319 |
|
s += SEED_INCREMENT) || |
1320 |
|
s == 0); // skip 0 |
1321 |
< |
WorkQueue w = new WorkQueue(this, wt, config >>> 16, s); |
1321 |
> |
WorkQueue w = new WorkQueue(this, wt, mode, s); |
1322 |
|
if (((ps = plock) & PL_LOCK) != 0 || |
1323 |
|
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) |
1324 |
|
ps = acquirePlock(); |
1338 |
|
} |
1339 |
|
} |
1340 |
|
} |
1341 |
< |
w.eventCount = w.poolIndex = r; // volatile write orders |
1341 |
> |
w.poolIndex = (short)r; |
1342 |
> |
w.eventCount = r; // volatile write orders |
1343 |
|
ws[r] = w; |
1344 |
|
} |
1345 |
|
} finally { |
1346 |
|
if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) |
1347 |
|
releasePlock(nps); |
1348 |
|
} |
1349 |
< |
wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex))); |
1349 |
> |
wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1))); |
1350 |
|
return w; |
1351 |
|
} |
1352 |
|
|
1482 |
|
throw new RejectedExecutionException(); |
1483 |
|
else if (ps == 0 || (ws = workQueues) == null || |
1484 |
|
(m = ws.length - 1) < 0) { // initialize workQueues |
1485 |
< |
int p = config & SMASK; // find power of two table size |
1485 |
> |
int p = parallelism; // find power of two table size |
1486 |
|
int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots |
1487 |
|
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; |
1488 |
|
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; |
1521 |
|
move = true; // move on failure |
1522 |
|
} |
1523 |
|
else if (((ps = plock) & PL_LOCK) == 0) { // create new queue |
1524 |
< |
(q = new WorkQueue(this, null, SHARED_QUEUE, r)).poolIndex = k; |
1524 |
> |
q = new WorkQueue(this, null, SHARED_QUEUE, r); |
1525 |
> |
q.poolIndex = (short)k; |
1526 |
|
if (((ps = plock) & PL_LOCK) != 0 || |
1527 |
|
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) |
1528 |
|
ps = acquirePlock(); |
1545 |
|
* Increments active count; mainly called upon return from blocking. |
1546 |
|
*/ |
1547 |
|
final void incrementActiveCount() { |
1548 |
< |
U.getAndAddLong(this, CTL, AC_UNIT); |
1548 |
> |
long c; |
1549 |
> |
do {} while(!U.compareAndSwapLong |
1550 |
> |
(this, CTL, c = ctl, ((c & ~AC_MASK) | |
1551 |
> |
((c & AC_MASK) + AC_UNIT)))); |
1552 |
|
} |
1553 |
|
|
1554 |
|
/** |
1679 |
|
!Thread.interrupted()) { |
1680 |
|
int e = (int)c; |
1681 |
|
int u = (int)(c >>> 32); |
1682 |
< |
int d = (u >> UAC_SHIFT) + (config & SMASK); // 0 if quiescent |
1682 |
> |
int d = (u >> UAC_SHIFT) + parallelism; // active count |
1683 |
|
|
1684 |
< |
if (e < 0 || (d == 0 && tryTerminate(false, false))) |
1684 |
> |
if (e < 0 || (d <= 0 && tryTerminate(false, false))) |
1685 |
|
stat = w.qlock = -1; // pool is terminating |
1686 |
|
else if ((ns = w.nsteals) != 0) { // collect steals and retry |
1687 |
|
w.nsteals = 0; |
1688 |
|
U.getAndAddLong(this, STEALCOUNT, (long)ns); |
1689 |
|
} |
1690 |
|
else { |
1691 |
< |
long pc = ((d != 0 || ec != (e | INT_SIGN)) ? 0L : |
1691 |
> |
long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L : |
1692 |
|
((long)(w.nextWait & E_MASK)) | // ctl to restore |
1693 |
|
((long)(u + UAC_UNIT)) << 32); |
1694 |
|
if (pc != 0L) { // timed wait if last waiter |
1879 |
|
*/ |
1880 |
|
final boolean tryCompensate(long c) { |
1881 |
|
WorkQueue[] ws = workQueues; |
1882 |
< |
int pc = config & SMASK, e = (int)c, m, tc; |
1882 |
> |
int pc = parallelism, e = (int)c, m, tc; |
1883 |
|
if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) { |
1884 |
|
WorkQueue w = ws[e & m]; |
1885 |
|
if (e != 0 && w != null) { |
1958 |
|
task.notifyAll(); |
1959 |
|
} |
1960 |
|
} |
1961 |
< |
U.getAndAddLong(this, CTL, AC_UNIT); // reactivate |
1961 |
> |
long c; // reactivate |
1962 |
> |
do {} while(!U.compareAndSwapLong |
1963 |
> |
(this, CTL, c = ctl, |
1964 |
> |
((c & ~AC_MASK) | |
1965 |
> |
((c & AC_MASK) + AC_UNIT)))); |
1966 |
|
} |
1967 |
|
} |
1968 |
|
} |
2032 |
|
if ((q = findNonEmptyStealQueue()) != null) { |
2033 |
|
if (!active) { // re-establish active count |
2034 |
|
active = true; |
2035 |
< |
U.getAndAddLong(this, CTL, AC_UNIT); |
2035 |
> |
do {} while(!U.compareAndSwapLong |
2036 |
> |
(this, CTL, c = ctl, |
2037 |
> |
((c & ~AC_MASK) | |
2038 |
> |
((c & AC_MASK) + AC_UNIT)))); |
2039 |
|
} |
2040 |
|
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { |
2041 |
|
(w.currentSteal = t).doExec(); |
2043 |
|
} |
2044 |
|
} |
2045 |
|
else if (active) { // decrement active count without queuing |
2046 |
< |
long nc = (c = ctl) - AC_UNIT; |
2047 |
< |
if ((int)(nc >> AC_SHIFT) + (config & SMASK) == 0) |
2048 |
< |
return; // bypass decrement-then-increment |
2046 |
> |
long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT); |
2047 |
> |
if ((int)(nc >> AC_SHIFT) + parallelism == 0) |
2048 |
> |
break; // bypass decrement-then-increment |
2049 |
|
if (U.compareAndSwapLong(this, CTL, c, nc)) |
2050 |
|
active = false; |
2051 |
|
} |
2052 |
< |
else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) == 0 && |
2053 |
< |
U.compareAndSwapLong(this, CTL, c, c + AC_UNIT)) |
2054 |
< |
return; |
2052 |
> |
else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 && |
2053 |
> |
U.compareAndSwapLong |
2054 |
> |
(this, CTL, c, ((c & ~AC_MASK) | |
2055 |
> |
((c & AC_MASK) + AC_UNIT)))) |
2056 |
> |
break; |
2057 |
|
} |
2058 |
|
} |
2059 |
|
|
2123 |
|
static int getSurplusQueuedTaskCount() { |
2124 |
|
Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; |
2125 |
|
if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) { |
2126 |
< |
int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK; |
2126 |
> |
int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).parallelism; |
2127 |
|
int n = (q = wt.workQueue).top - q.base; |
2128 |
|
int a = (int)(pool.ctl >> AC_SHIFT) + p; |
2129 |
|
return n - (a > (p >>>= 1) ? 0 : |
2167 |
|
} |
2168 |
|
for (long c;;) { |
2169 |
|
if (((c = ctl) & STOP_BIT) != 0) { // already terminating |
2170 |
< |
if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) { |
2170 |
> |
if ((short)(c >>> TC_SHIFT) + parallelism <= 0) { |
2171 |
|
synchronized (this) { |
2172 |
|
notifyAll(); // signal when 0 workers |
2173 |
|
} |
2176 |
|
} |
2177 |
|
if (!now) { // check if idle & no tasks |
2178 |
|
WorkQueue[] ws; WorkQueue w; |
2179 |
< |
if ((int)(c >> AC_SHIFT) != -(config & SMASK)) |
2179 |
> |
if ((int)(c >> AC_SHIFT) + parallelism > 0) |
2180 |
|
return false; |
2181 |
|
if ((ws = workQueues) != null) { |
2182 |
|
for (int i = 0; i < ws.length; ++i) { |
2183 |
< |
if ((w = ws[i]) != null) { |
2184 |
< |
if (!w.isEmpty()) |
2185 |
< |
return false; |
2186 |
< |
if ((i & 1) != 0 && w.eventCount >= 0) |
2187 |
< |
return false; // unqueued inactive worker |
2183 |
> |
if ((w = ws[i]) != null && |
2184 |
> |
(!w.isEmpty() || |
2185 |
> |
((i & 1) != 0 && w.eventCount >= 0))) { |
2186 |
> |
signalWork(ws, w); |
2187 |
> |
return false; |
2188 |
|
} |
2189 |
|
} |
2190 |
|
} |
2373 |
|
this(checkParallelism(parallelism), |
2374 |
|
checkFactory(factory), |
2375 |
|
handler, |
2376 |
< |
asyncMode, |
2376 |
> |
(asyncMode ? FIFO_QUEUE : LIFO_QUEUE), |
2377 |
|
"ForkJoinPool-" + nextPoolId() + "-worker-"); |
2378 |
|
checkPermission(); |
2379 |
|
} |
2399 |
|
private ForkJoinPool(int parallelism, |
2400 |
|
ForkJoinWorkerThreadFactory factory, |
2401 |
|
UncaughtExceptionHandler handler, |
2402 |
< |
boolean asyncMode, |
2402 |
> |
int mode, |
2403 |
|
String workerNamePrefix) { |
2404 |
|
this.workerNamePrefix = workerNamePrefix; |
2405 |
|
this.factory = factory; |
2406 |
|
this.ueh = handler; |
2407 |
< |
this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0); |
2407 |
> |
this.mode = (short)mode; |
2408 |
> |
this.parallelism = (short)parallelism; |
2409 |
|
long np = (long)(-parallelism); // offset ctl counts |
2410 |
|
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); |
2411 |
|
} |
2593 |
|
* @return the targeted parallelism level of this pool |
2594 |
|
*/ |
2595 |
|
public int getParallelism() { |
2596 |
< |
int par = (config & SMASK); |
2597 |
< |
return (par > 0) ? par : 1; |
2596 |
> |
int par; |
2597 |
> |
return ((par = parallelism) > 0) ? par : 1; |
2598 |
|
} |
2599 |
|
|
2600 |
|
/** |
2616 |
|
* @return the number of worker threads |
2617 |
|
*/ |
2618 |
|
public int getPoolSize() { |
2619 |
< |
return (config & SMASK) + (short)(ctl >>> TC_SHIFT); |
2619 |
> |
return parallelism + (short)(ctl >>> TC_SHIFT); |
2620 |
|
} |
2621 |
|
|
2622 |
|
/** |
2626 |
|
* @return {@code true} if this pool uses async mode |
2627 |
|
*/ |
2628 |
|
public boolean getAsyncMode() { |
2629 |
< |
return (config >>> 16) == FIFO_QUEUE; |
2629 |
> |
return mode == FIFO_QUEUE; |
2630 |
|
} |
2631 |
|
|
2632 |
|
/** |
2657 |
|
* @return the number of active threads |
2658 |
|
*/ |
2659 |
|
public int getActiveThreadCount() { |
2660 |
< |
int r = (config & SMASK) + (int)(ctl >> AC_SHIFT); |
2660 |
> |
int r = parallelism + (int)(ctl >> AC_SHIFT); |
2661 |
|
return (r <= 0) ? 0 : r; // suppress momentarily negative values |
2662 |
|
} |
2663 |
|
|
2673 |
|
* @return {@code true} if all threads are currently idle |
2674 |
|
*/ |
2675 |
|
public boolean isQuiescent() { |
2676 |
< |
return (int)(ctl >> AC_SHIFT) + (config & SMASK) == 0; |
2676 |
> |
return parallelism + (int)(ctl >> AC_SHIFT) <= 0; |
2677 |
|
} |
2678 |
|
|
2679 |
|
/** |
2836 |
|
} |
2837 |
|
} |
2838 |
|
} |
2839 |
< |
int pc = (config & SMASK); |
2839 |
> |
int pc = parallelism; |
2840 |
|
int tc = pc + (short)(c >>> TC_SHIFT); |
2841 |
|
int ac = pc + (int)(c >> AC_SHIFT); |
2842 |
|
if (ac < 0) // ignore transient negative |
2909 |
|
public boolean isTerminated() { |
2910 |
|
long c = ctl; |
2911 |
|
return ((c & STOP_BIT) != 0L && |
2912 |
< |
(short)(c >>> TC_SHIFT) == -(config & SMASK)); |
2912 |
> |
(short)(c >>> TC_SHIFT) + parallelism <= 0); |
2913 |
|
} |
2914 |
|
|
2915 |
|
/** |
2928 |
|
public boolean isTerminating() { |
2929 |
|
long c = ctl; |
2930 |
|
return ((c & STOP_BIT) != 0L && |
2931 |
< |
(short)(c >>> TC_SHIFT) != -(config & SMASK)); |
2931 |
> |
(short)(c >>> TC_SHIFT) + parallelism > 0); |
2932 |
|
} |
2933 |
|
|
2934 |
|
/** |
3213 |
|
common = java.security.AccessController.doPrivileged |
3214 |
|
(new java.security.PrivilegedAction<ForkJoinPool>() { |
3215 |
|
public ForkJoinPool run() { return makeCommonPool(); }}); |
3216 |
< |
int par = common.config; // report 1 even if threads disabled |
3216 |
> |
int par = common.parallelism; // report 1 even if threads disabled |
3217 |
|
commonParallelism = par > 0 ? par : 1; |
3218 |
|
} |
3219 |
|
|
3249 |
|
parallelism = 0; |
3250 |
|
if (parallelism > MAX_CAP) |
3251 |
|
parallelism = MAX_CAP; |
3252 |
< |
return new ForkJoinPool(parallelism, factory, handler, false, |
3252 |
> |
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, |
3253 |
|
"ForkJoinPool.commonPool-worker-"); |
3254 |
|
} |
3255 |
|
|