19 |
|
import java.util.concurrent.RejectedExecutionException; |
20 |
|
import java.util.concurrent.RunnableFuture; |
21 |
|
import java.util.concurrent.TimeUnit; |
22 |
– |
import java.util.concurrent.TimeoutException; |
22 |
|
import java.util.concurrent.atomic.AtomicInteger; |
23 |
|
import java.util.concurrent.locks.LockSupport; |
24 |
|
import java.util.concurrent.locks.ReentrantLock; |
101 |
|
* daemon} mode, there is typically no need to explicitly {@link |
102 |
|
* #shutdown} such a pool upon program exit. |
103 |
|
* |
104 |
< |
* <pre> |
104 |
> |
* <pre> {@code |
105 |
|
* static final ForkJoinPool mainPool = new ForkJoinPool(); |
106 |
|
* ... |
107 |
|
* public void sort(long[] array) { |
108 |
|
* mainPool.invoke(new SortTask(array, 0, array.length)); |
109 |
< |
* } |
111 |
< |
* </pre> |
109 |
> |
* }}</pre> |
110 |
|
* |
111 |
|
* <p><b>Implementation notes</b>: This implementation restricts the |
112 |
|
* maximum number of running threads to 32767. Attempts to create |
290 |
|
* "terminate" status, cancels all unprocessed tasks, and wakes up |
291 |
|
* all waiting workers. Detecting whether termination should |
292 |
|
* commence after a non-abrupt shutdown() call requires more work |
293 |
< |
* and bookkeeping. We need consensus about quiesence (i.e., that |
293 |
> |
* and bookkeeping. We need consensus about quiescence (i.e., that |
294 |
|
* there is no more work) which is reflected in active counts so |
295 |
|
* long as there are no current blockers, as well as possible |
296 |
|
* re-evaluations during independent changes in blocking or |
465 |
|
/** |
466 |
|
* Main pool control -- a long packed with: |
467 |
|
* AC: Number of active running workers minus target parallelism (16 bits) |
468 |
< |
* TC: Number of total workers minus target parallelism (16bits) |
468 |
> |
* TC: Number of total workers minus target parallelism (16 bits) |
469 |
|
* ST: true if pool is terminating (1 bit) |
470 |
|
* EC: the wait count of top waiting thread (15 bits) |
471 |
|
* ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits) |
545 |
|
volatile boolean shutdown; |
546 |
|
|
547 |
|
/** |
548 |
< |
* True if use local fifo, not default lifo, for local polling |
549 |
< |
* Read by, and replicated by ForkJoinWorkerThreads |
548 |
> |
* True if use local fifo, not default lifo, for local polling. |
549 |
> |
* Read by, and replicated by ForkJoinWorkerThreads. |
550 |
|
*/ |
551 |
|
final boolean locallyFifo; |
552 |
|
|
864 |
|
w.parked = false; |
865 |
|
if (w.eventCount != v) |
866 |
|
break; |
867 |
< |
else if (System.nanoTime() - startTime < |
867 |
> |
else if (System.nanoTime() - startTime < |
868 |
|
SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop |
869 |
|
Thread.interrupted(); // spurious wakeup |
870 |
|
else if (UNSAFE.compareAndSwapLong(this, ctlOffset, |
947 |
|
int pc = parallelism; |
948 |
|
do { |
949 |
|
ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w; |
950 |
< |
int e, ac, tc, rc, i; |
950 |
> |
int e, ac, tc, i; |
951 |
|
long c = ctl; |
952 |
|
int u = (int)(c >>> 32); |
953 |
|
if ((e = (int)c) < 0) { |
987 |
|
} |
988 |
|
|
989 |
|
/** |
990 |
< |
* Decrements blockedCount and increments active count |
990 |
> |
* Decrements blockedCount and increments active count. |
991 |
|
*/ |
992 |
|
private void postBlock() { |
993 |
|
long c; |
994 |
|
do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, // no mask |
995 |
|
c = ctl, c + AC_UNIT)); |
996 |
|
int b; |
997 |
< |
do {} while(!UNSAFE.compareAndSwapInt(this, blockedCountOffset, |
998 |
< |
b = blockedCount, b - 1)); |
997 |
> |
do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset, |
998 |
> |
b = blockedCount, b - 1)); |
999 |
|
} |
1000 |
|
|
1001 |
|
/** |
1005 |
|
* @param joinMe the task |
1006 |
|
*/ |
1007 |
|
final void tryAwaitJoin(ForkJoinTask<?> joinMe) { |
1010 |
– |
int s; |
1008 |
|
Thread.interrupted(); // clear interrupts before checking termination |
1009 |
|
if (joinMe.status >= 0) { |
1010 |
|
if (tryPreBlock()) { |
1018 |
|
|
1019 |
|
/** |
1020 |
|
* Possibly blocks the given worker waiting for joinMe to |
1021 |
< |
* complete or timeout |
1021 |
> |
* complete or timeout. |
1022 |
|
* |
1023 |
|
* @param joinMe the task |
1024 |
|
* @param millis the wait time for underlying Object.wait |
1054 |
|
} |
1055 |
|
|
1056 |
|
/** |
1057 |
< |
* If necessary, compensates for blocker, and blocks |
1057 |
> |
* If necessary, compensates for blocker, and blocks. |
1058 |
|
*/ |
1059 |
|
private void awaitBlocker(ManagedBlocker blocker) |
1060 |
|
throws InterruptedException { |
1146 |
|
ws[k] = w; |
1147 |
|
nextWorkerIndex = k + 1; |
1148 |
|
int m = g & SMASK; |
1149 |
< |
g = k > m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); |
1149 |
> |
g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); |
1150 |
|
} |
1151 |
|
} finally { |
1152 |
|
scanGuard = g; |
1321 |
|
// misc ForkJoinWorkerThread support |
1322 |
|
|
1323 |
|
/** |
1324 |
< |
* Increment or decrement quiescerCount. Needed only to prevent |
1324 |
> |
* Increments or decrements quiescerCount. Needed only to prevent |
1325 |
|
* triggering shutdown if a worker is transiently inactive while |
1326 |
|
* checking quiescence. |
1327 |
|
* |
1329 |
|
*/ |
1330 |
|
final void addQuiescerCount(int delta) { |
1331 |
|
int c; |
1332 |
< |
do {} while(!UNSAFE.compareAndSwapInt(this, quiescerCountOffset, |
1333 |
< |
c = quiescerCount, c + delta)); |
1332 |
> |
do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset, |
1333 |
> |
c = quiescerCount, c + delta)); |
1334 |
|
} |
1335 |
|
|
1336 |
|
/** |
1337 |
< |
* Directly increment or decrement active count without |
1338 |
< |
* queuing. This method is used to transiently assert inactivation |
1339 |
< |
* while checking quiescence. |
1337 |
> |
* Directly increments or decrements active count without queuing. |
1338 |
> |
* This method is used to transiently assert inactivation while |
1339 |
> |
* checking quiescence. |
1340 |
|
* |
1341 |
|
* @param delta 1 for increment, -1 for decrement |
1342 |
|
*/ |
1355 |
|
final int idlePerActive() { |
1356 |
|
// Approximate at powers of two for small values, saturate past 4 |
1357 |
|
int p = parallelism; |
1358 |
< |
int a = p + (int)(ctl >> AC_SHIFT); |
1359 |
< |
return (a > (p >>>= 1) ? 0 : |
1360 |
< |
a > (p >>>= 1) ? 1 : |
1361 |
< |
a > (p >>>= 1) ? 2 : |
1362 |
< |
a > (p >>>= 1) ? 4 : |
1363 |
< |
8); |
1358 |
> |
int a = p + (int)(ctl >> AC_SHIFT); |
1359 |
> |
return (a > (p >>>= 1) ? 0 : |
1360 |
> |
a > (p >>>= 1) ? 1 : |
1361 |
> |
a > (p >>>= 1) ? 2 : |
1362 |
> |
a > (p >>>= 1) ? 4 : |
1363 |
> |
8); |
1364 |
|
} |
1365 |
|
|
1366 |
|
// Exported methods |
1683 |
|
*/ |
1684 |
|
public int getRunningThreadCount() { |
1685 |
|
int r = parallelism + (int)(ctl >> AC_SHIFT); |
1686 |
< |
return r <= 0? 0 : r; // suppress momentarily negative values |
1686 |
> |
return (r <= 0) ? 0 : r; // suppress momentarily negative values |
1687 |
|
} |
1688 |
|
|
1689 |
|
/** |
1695 |
|
*/ |
1696 |
|
public int getActiveThreadCount() { |
1697 |
|
int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount; |
1698 |
< |
return r <= 0? 0 : r; // suppress momentarily negative values |
1698 |
> |
return (r <= 0) ? 0 : r; // suppress momentarily negative values |
1699 |
|
} |
1700 |
|
|
1701 |
|
/** |
1850 |
|
int ac = rc + blockedCount; |
1851 |
|
String level; |
1852 |
|
if ((c & STOP_BIT) != 0) |
1853 |
< |
level = (tc == 0)? "Terminated" : "Terminating"; |
1853 |
> |
level = (tc == 0) ? "Terminated" : "Terminating"; |
1854 |
|
else |
1855 |
< |
level = shutdown? "Shutting down" : "Running"; |
1855 |
> |
level = shutdown ? "Shutting down" : "Running"; |
1856 |
|
return super.toString() + |
1857 |
|
"[" + level + |
1858 |
|
", parallelism = " + pc + |
2115 |
|
modifyThreadPermission = new RuntimePermission("modifyThread"); |
2116 |
|
defaultForkJoinWorkerThreadFactory = |
2117 |
|
new DefaultForkJoinWorkerThreadFactory(); |
2121 |
– |
int s; |
2118 |
|
try { |
2119 |
|
UNSAFE = getUnsafe(); |
2120 |
< |
Class k = ForkJoinPool.class; |
2120 |
> |
Class<?> k = ForkJoinPool.class; |
2121 |
|
ctlOffset = UNSAFE.objectFieldOffset |
2122 |
|
(k.getDeclaredField("ctl")); |
2123 |
|
stealCountOffset = UNSAFE.objectFieldOffset |
2130 |
|
(k.getDeclaredField("scanGuard")); |
2131 |
|
nextWorkerNumberOffset = UNSAFE.objectFieldOffset |
2132 |
|
(k.getDeclaredField("nextWorkerNumber")); |
2137 |
– |
Class a = ForkJoinTask[].class; |
2138 |
– |
ABASE = UNSAFE.arrayBaseOffset(a); |
2139 |
– |
s = UNSAFE.arrayIndexScale(a); |
2133 |
|
} catch (Exception e) { |
2134 |
|
throw new Error(e); |
2135 |
|
} |
2136 |
+ |
Class<?> a = ForkJoinTask[].class; |
2137 |
+ |
ABASE = UNSAFE.arrayBaseOffset(a); |
2138 |
+ |
int s = UNSAFE.arrayIndexScale(a); |
2139 |
|
if ((s & (s-1)) != 0) |
2140 |
|
throw new Error("data type scale not a power of two"); |
2141 |
|
ASHIFT = 31 - Integer.numberOfLeadingZeros(s); |