20 |
|
import java.util.concurrent.RunnableFuture; |
21 |
|
import java.util.concurrent.ThreadLocalRandom; |
22 |
|
import java.util.concurrent.TimeUnit; |
23 |
+ |
import java.util.concurrent.atomic.AtomicLong; |
24 |
|
import java.security.AccessControlContext; |
25 |
|
import java.security.ProtectionDomain; |
26 |
|
import java.security.Permissions; |
280 |
|
* often maintaining atomicity without blocking or locking. |
281 |
|
* Nearly all essentially atomic control state is held in two |
282 |
|
* volatile variables that are by far most often read (not |
283 |
< |
* written) as status and consistency checks. |
283 |
> |
* written) as status and consistency checks. (Also, field |
284 |
> |
* "config" holds unchanging configuration state.) |
285 |
|
* |
286 |
|
* Field "ctl" contains 64 bits holding information needed to |
287 |
|
* atomically decide to add, inactivate, enqueue (on an event |
289 |
|
* packing, we restrict maximum parallelism to (1<<15)-1 (which is |
290 |
|
* far in excess of normal operating range) to allow ids, counts, |
291 |
|
* and their negations (used for thresholding) to fit into 16bit |
292 |
< |
* subfields. Field "runState" holds lockable state bits |
293 |
< |
* (STARTED, STOP, etc) also protecting updates to the workQueues |
294 |
< |
* array. When used as a lock, it is normally held only for a few |
295 |
< |
* instructions (the only exceptions are one-time array |
296 |
< |
* initialization and uncommon resizing), so is nearly always |
297 |
< |
* available after at most a brief spin. But to be extra-cautious, |
298 |
< |
* we use a monitor-based backup strategy to block when needed |
299 |
< |
* (see awaitRunStateLock). Usages of "runState" vs "ctl" |
300 |
< |
* interact in only one case: deciding to add a worker thread (see |
301 |
< |
* tryAddWorker), in which case the ctl CAS is performed while the |
302 |
< |
* lock is held. Field "config" holds unchanging configuration |
303 |
< |
* state. |
292 |
> |
* subfields. |
293 |
> |
* |
294 |
> |
* Field "runState" holds lockable state bits (STARTED, STOP, etc) |
295 |
> |
* also protecting updates to the workQueues array. When used as |
296 |
> |
* a lock, it is normally held only for a few instructions (the |
297 |
> |
* only exceptions are one-time array initialization and uncommon |
298 |
> |
* resizing), so is nearly always available after at most a brief |
299 |
> |
* spin. But to be extra-cautious, after spinning, method |
300 |
> |
* awaitRunStateLock (called only if an initial CAS fails), uses a |
301 |
> |
* wait/notify mechanics on a builtin monitor to block when |
302 |
> |
* (rarely) needed. This would be a terrible idea for a highly |
303 |
> |
* contended lock, but most pools run without the lock ever |
304 |
> |
* contending after the spin limit, so this works fine as a more |
305 |
> |
* conservative alternative. Because we don't otherwise have an |
306 |
> |
* internal Object to use as a monitor, the "stealCounter" (an |
307 |
> |
* AtomicLong) is used when available (it too must be lazily |
308 |
> |
* initialized; see externalSubmit). |
309 |
> |
|
310 |
> |
* Usages of "runState" vs "ctl" interact in only one case: |
311 |
> |
* deciding to add a worker thread (see tryAddWorker), in which |
312 |
> |
* case the ctl CAS is performed while the lock is held. |
313 |
|
* |
314 |
|
* Recording WorkQueues. WorkQueues are recorded in the |
315 |
|
* "workQueues" array. The array is created upon first use (see |
1031 |
|
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC |
1032 |
|
execLocalTasks(); |
1033 |
|
ForkJoinWorkerThread thread = owner; |
1034 |
< |
++nsteals; |
1034 |
> |
if (++nsteals < 0) // collect on overflow |
1035 |
> |
transferStealCount(pool); |
1036 |
|
scanState |= SCANNING; |
1037 |
|
if (thread != null) |
1038 |
|
thread.afterTopLevelExec(); |
1040 |
|
} |
1041 |
|
|
1042 |
|
/** |
1043 |
+ |
* Adds steal count to pool stealCounter if it exists, and resets. |
1044 |
+ |
*/ |
1045 |
+ |
final void transferStealCount(ForkJoinPool p) { |
1046 |
+ |
AtomicLong sc; |
1047 |
+ |
if (p != null && (sc = p.stealCounter) != null) { |
1048 |
+ |
int s = nsteals; |
1049 |
+ |
nsteals = 0; // if negative, correct for overflow |
1050 |
+ |
sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s)); |
1051 |
+ |
} |
1052 |
+ |
} |
1053 |
+ |
|
1054 |
+ |
/** |
1055 |
|
* If present, removes from queue and executes the given task, |
1056 |
|
* or any other cancelled task. Used only by awaitJoin. |
1057 |
|
* |
1366 |
|
private static final int SHUTDOWN = 1 << 31; |
1367 |
|
|
1368 |
|
// Instance fields |
1345 |
– |
volatile long stealCount; // collects worker counts |
1369 |
|
volatile long ctl; // main pool control |
1370 |
|
volatile int runState; // lockable status |
1371 |
|
final int config; // parallelism, mode |
1374 |
|
final ForkJoinWorkerThreadFactory factory; |
1375 |
|
final UncaughtExceptionHandler ueh; // per-worker UEH |
1376 |
|
final String workerNamePrefix; // to create worker name string |
1377 |
+ |
volatile AtomicLong stealCounter; // also used as sync monitor |
1378 |
|
|
1379 |
|
/** |
1380 |
|
* Acquires the runState lock; returns current (locked) runState. |
1387 |
|
} |
1388 |
|
|
1389 |
|
/** |
1390 |
< |
* Spins and/or blocks until runstate lock is available. This |
1391 |
< |
* method is called only if an initial CAS fails. This acts as a |
1368 |
< |
* spinlock for normal cases, but falls back to builtin monitor to |
1369 |
< |
* block when (rarely) needed. This would be a terrible idea for a |
1370 |
< |
* highly contended lock, but most pools run without the lock ever |
1371 |
< |
* contending after the spin limit, so this works fine as a more |
1372 |
< |
* conservative alternative to a pure spinlock. |
1390 |
> |
* Spins and/or blocks until runstate lock is available. See |
1391 |
> |
* above for explanation. |
1392 |
|
*/ |
1393 |
|
private int awaitRunStateLock() { |
1394 |
+ |
Object lock; |
1395 |
|
boolean wasInterrupted = false; |
1396 |
|
for (int spins = SPINS, r = 0, rs, ns;;) { |
1397 |
|
if (((rs = runState) & RSLOCK) == 0) { |
1412 |
|
if (r >= 0) |
1413 |
|
--spins; |
1414 |
|
} |
1415 |
+ |
else if ((rs & STARTED) == 0 || (lock = stealCounter) == null) |
1416 |
+ |
Thread.yield(); // initialization race |
1417 |
|
else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) { |
1418 |
< |
synchronized (this) { |
1418 |
> |
synchronized (lock) { |
1419 |
|
if ((runState & RSIGNAL) != 0) { |
1420 |
|
try { |
1421 |
< |
wait(); |
1421 |
> |
lock.wait(); |
1422 |
|
} catch (InterruptedException ie) { |
1423 |
|
if (!(Thread.currentThread() instanceof |
1424 |
|
ForkJoinWorkerThread)) |
1426 |
|
} |
1427 |
|
} |
1428 |
|
else |
1429 |
< |
notifyAll(); |
1429 |
> |
lock.notifyAll(); |
1430 |
|
} |
1431 |
|
} |
1432 |
|
} |
1440 |
|
*/ |
1441 |
|
private void unlockRunState(int oldRunState, int newRunState) { |
1442 |
|
if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) { |
1443 |
+ |
Object lock = stealCounter; |
1444 |
|
runState = newRunState; // clears RSIGNAL bit |
1445 |
< |
synchronized (this) { notifyAll(); } |
1445 |
> |
if (lock != null) |
1446 |
> |
synchronized (lock) { lock.notifyAll(); } |
1447 |
|
} |
1448 |
|
} |
1449 |
|
|
1571 |
|
(SP_MASK & c)))); |
1572 |
|
if (w != null) { |
1573 |
|
w.qlock = -1; // ensure set |
1574 |
+ |
w.transferStealCount(this); |
1575 |
|
w.cancelAll(); // cancel remaining tasks |
1551 |
– |
U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals |
1576 |
|
} |
1577 |
|
for (;;) { // possibly replace |
1578 |
|
WorkQueue[] ws; int m, sp; |
1770 |
|
else if (spins > 0) { |
1771 |
|
r ^= r << 6; r ^= r >>> 21; r ^= r << 7; |
1772 |
|
if (r >= 0 && --spins == 0) { // randomize spins |
1773 |
< |
WorkQueue v; WorkQueue[] ws; int s, j; |
1773 |
> |
WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc; |
1774 |
|
if (pred != 0 && (ws = workQueues) != null && |
1775 |
|
(j = pred & SMASK) < ws.length && |
1776 |
|
(v = ws[j]) != null && // see if pred parking |
1777 |
|
(v.parker == null || v.scanState >= 0)) |
1778 |
|
spins = SPINS; // continue spinning |
1755 |
– |
else if ((s = w.nsteals) != 0) { |
1756 |
– |
w.nsteals = 0; // collect steals |
1757 |
– |
U.getAndAddLong(this, STEALCOUNT, s); |
1758 |
– |
} |
1779 |
|
} |
1780 |
|
} |
1781 |
|
else if (w.qlock < 0) // recheck after spins |
2090 |
|
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { |
2091 |
|
U.putOrderedObject(w, QCURRENTSTEAL, t); |
2092 |
|
t.doExec(); |
2093 |
< |
++w.nsteals; |
2093 |
> |
if (++w.nsteals < 0) |
2094 |
> |
w.transferStealCount(this); |
2095 |
|
} |
2096 |
|
} |
2097 |
|
else if (active) { // decrement active count without queuing |
2306 |
|
tryTerminate(false, false); // help terminate |
2307 |
|
throw new RejectedExecutionException(); |
2308 |
|
} |
2309 |
< |
else if ((rs & STARTED) == 0 || // initialize workQueues array |
2309 |
> |
else if ((rs & STARTED) == 0 || // initialize |
2310 |
|
((ws = workQueues) == null || (m = ws.length - 1) < 0)) { |
2311 |
|
int ns = 0; |
2312 |
|
rs = lockRunState(); |
2313 |
|
try { |
2314 |
< |
if ((rs & STARTED) == 0) { // find power of two table size |
2314 |
> |
if ((rs & STARTED) == 0) { |
2315 |
> |
U.compareAndSwapObject(this, STEALCOUNTER, null, |
2316 |
> |
new AtomicLong()); |
2317 |
> |
// create workQueues array with size a power of two |
2318 |
|
int p = config & SMASK; // ensure at least 2 slots |
2319 |
|
int n = (p > 1) ? p - 1 : 1; |
2320 |
|
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; |
2831 |
|
* @return the number of steals |
2832 |
|
*/ |
2833 |
|
public long getStealCount() { |
2834 |
< |
long count = stealCount; |
2834 |
> |
AtomicLong sc = stealCounter; |
2835 |
> |
long count = (sc == null) ? 0L : sc.get(); |
2836 |
|
WorkQueue[] ws; WorkQueue w; |
2837 |
|
if ((ws = workQueues) != null) { |
2838 |
|
for (int i = 1; i < ws.length; i += 2) { |
2962 |
|
public String toString() { |
2963 |
|
// Use a single pass through workQueues to collect counts |
2964 |
|
long qt = 0L, qs = 0L; int rc = 0; |
2965 |
< |
long st = stealCount; |
2965 |
> |
AtomicLong sc = stealCounter; |
2966 |
> |
long st = (sc == null) ? 0L : sc.get(); |
2967 |
|
long c = ctl; |
2968 |
|
WorkQueue[] ws; WorkQueue w; |
2969 |
|
if ((ws = workQueues) != null) { |
3314 |
|
private static final int ASHIFT; |
3315 |
|
private static final long CTL; |
3316 |
|
private static final long RUNSTATE; |
3317 |
< |
private static final long STEALCOUNT; |
3317 |
> |
private static final long STEALCOUNTER; |
3318 |
|
private static final long PARKBLOCKER; |
3319 |
|
private static final long QBASE; // these must be same as in WorkQueue |
3320 |
|
private static final long QTOP; |
3333 |
|
(k.getDeclaredField("ctl")); |
3334 |
|
RUNSTATE = U.objectFieldOffset |
3335 |
|
(k.getDeclaredField("runState")); |
3336 |
< |
STEALCOUNT = U.objectFieldOffset |
3337 |
< |
(k.getDeclaredField("stealCount")); |
3336 |
> |
STEALCOUNTER = U.objectFieldOffset |
3337 |
> |
(k.getDeclaredField("stealCounter")); |
3338 |
|
Class<?> tk = Thread.class; |
3339 |
|
PARKBLOCKER = U.objectFieldOffset |
3340 |
|
(tk.getDeclaredField("parkBlocker")); |