467 |
|
* relies on the active count bits of "ctl" maintaining consensus |
468 |
|
* about quiescence. However, external submitters do not take part |
469 |
|
* in this consensus. So, tryTerminate sweeps through queues to |
470 |
< |
* ensure lack of in-flight submissions before triggering the |
471 |
< |
* "STOP" phase of termination. |
470 |
> |
* ensure lack of in-flight submissions and workers about to |
471 |
> |
* process them before triggering the "STOP" phase of |
472 |
> |
* termination. |
473 |
|
* |
474 |
|
* Joining Tasks |
475 |
|
* ============= |
1084 |
|
U.putOrderedInt(this, QLOCK, 0); |
1085 |
|
return t; |
1086 |
|
} |
1087 |
< |
qlock = 0; |
1087 |
> |
U.compareAndSwapInt(this, QLOCK, 1, 0); |
1088 |
|
} |
1089 |
|
} |
1090 |
|
else if (U.compareAndSwapObject(a, j, t, null)) { |
1534 |
|
w.cancelAll(); // cancel remaining tasks |
1535 |
|
U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals |
1536 |
|
} |
1537 |
< |
if (!tryTerminate(false, false) && w != null && w.array != null) { |
1537 |
> |
for (;;) { // possibly replace |
1538 |
|
WorkQueue[] ws; int m, sp; |
1539 |
< |
while ((runState & STOP) == 0 && |
1540 |
< |
(ws = workQueues) != null && (m = ws.length - 1) >= 0) { |
1541 |
< |
if ((sp = (int)(c = ctl)) != 0) { // wake up replacement |
1542 |
< |
if (tryRelease(c, ws[sp & m], AC_UNIT)) |
1543 |
< |
break; |
1544 |
< |
} |
1544 |
< |
else if (ex != null && (c & ADD_WORKER) != 0L) { |
1545 |
< |
tryAddWorker(c); // create replacement |
1546 |
< |
break; |
1547 |
< |
} |
1548 |
< |
else |
1539 |
> |
if (tryTerminate(false, false) || w == null || w.array == null || |
1540 |
> |
(runState & STOP) != 0 || (ws = workQueues) == null || |
1541 |
> |
(m = ws.length - 1) < 0) // already terminating |
1542 |
> |
break; |
1543 |
> |
if ((sp = (int)(c = ctl)) != 0) { // wake up replacement |
1544 |
> |
if (tryRelease(c, ws[sp & m], AC_UNIT)) |
1545 |
|
break; |
1546 |
|
} |
1547 |
+ |
else if (ex != null && (c & ADD_WORKER) != 0L) { |
1548 |
+ |
tryAddWorker(c); // create replacement |
1549 |
+ |
break; |
1550 |
+ |
} |
1551 |
+ |
else // don't need replacement |
1552 |
+ |
break; |
1553 |
|
} |
1554 |
|
if (ex == null) // help clean on way out |
1555 |
|
ForkJoinTask.helpExpungeStaleExceptions(); |
2174 |
|
} |
2175 |
|
if ((rs & STOP) == 0) { |
2176 |
|
if (!now) { |
2177 |
< |
if ((int)(ctl >> AC_SHIFT) + (config & SMASK) > 0) |
2178 |
< |
return false; |
2179 |
< |
WorkQueue[] ws; WorkQueue w; // check external submissions |
2180 |
< |
if ((ws = workQueues) != null) { |
2181 |
< |
for (int i = 0; i < ws.length; ++i) { |
2182 |
< |
if ((w = ws[i]) != null && |
2183 |
< |
(!w.isEmpty() || |
2184 |
< |
((i & 1) != 0 && w.scanState >= 0))) { |
2185 |
< |
signalWork(ws, w); |
2186 |
< |
return false; |
2177 |
> |
for (int oldSum = 0, checkSum = 0;;) { |
2178 |
> |
WorkQueue[] ws; WorkQueue w; int m, b; |
2179 |
> |
if ((int)(ctl >> AC_SHIFT) + (config & SMASK) > 0) |
2180 |
> |
return false; // not quiescent |
2181 |
> |
if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) |
2182 |
> |
break; // scan for submissions |
2183 |
> |
for (int i = 0; i <= m; ++i) { |
2184 |
> |
if ((w = ws[i]) != null) { |
2185 |
> |
if ((i & 1) == 0) |
2186 |
> |
w.qlock = -1; // disable external queue |
2187 |
> |
else if (w.scanState >= 0) |
2188 |
> |
return false; // still active |
2189 |
> |
if ((b = w.base) != w.top ) |
2190 |
> |
return false; |
2191 |
> |
checkSum += b; |
2192 |
|
} |
2193 |
|
} |
2194 |
+ |
if (oldSum == (oldSum = checkSum)) |
2195 |
+ |
break; // continue until stable |
2196 |
+ |
checkSum = 0; |
2197 |
|
} |
2188 |
– |
if ((int)(ctl >> AC_SHIFT) + (config & SMASK) > 0) |
2189 |
– |
return false; // recheck |
2198 |
|
} |
2199 |
|
rs = lockRunState(); // enter STOP phase |
2200 |
< |
unlockRunState(rs, (rs & ~RSLOCK) | STOP); |
2200 |
> |
int ns = rs & STOP; |
2201 |
> |
if (ns != 0 || !now || |
2202 |
> |
(int)(ctl >> AC_SHIFT) + (config & SMASK) <= 0) |
2203 |
> |
ns = STOP; // recheck under lock |
2204 |
> |
unlockRunState(rs, (rs & ~RSLOCK) | ns); |
2205 |
> |
if (ns == 0) |
2206 |
> |
return false; |
2207 |
|
} |
2208 |
|
for (int pass = 0; pass < 3; ++pass) { // clobber other workers |
2209 |
|
WorkQueue[] ws; int n; |
2291 |
|
submitted = true; |
2292 |
|
} |
2293 |
|
} finally { |
2294 |
< |
q.qlock = 0; |
2294 |
> |
U.compareAndSwapInt(q, QLOCK, 1, 0); |
2295 |
|
} |
2296 |
|
if (submitted) { |
2297 |
|
signalWork(ws, q); |
2305 |
|
q.hint = r; |
2306 |
|
q.config = k | SHARED_QUEUE; |
2307 |
|
rs = lockRunState(); // publish index |
2308 |
< |
if ((ws = workQueues) != null && k < ws.length && ws[k] == null) |
2308 |
> |
if (rs > 0 && (ws = workQueues) != null && |
2309 |
> |
k < ws.length && ws[k] == null) |
2310 |
|
ws[k] = q; // else terminated |
2311 |
|
unlockRunState(rs, rs & ~RSLOCK); |
2312 |
|
} |
2343 |
|
signalWork(ws, q); |
2344 |
|
return; |
2345 |
|
} |
2346 |
< |
q.qlock = 0; |
2346 |
> |
U.compareAndSwapInt(q, QLOCK, 1, 0); |
2347 |
|
} |
2348 |
|
externalSubmit(task); |
2349 |
|
} |
2380 |
|
U.putOrderedInt(w, QLOCK, 0); |
2381 |
|
return true; |
2382 |
|
} |
2383 |
< |
w.qlock = 0; |
2383 |
> |
U.compareAndSwapInt(w, QLOCK, 1, 0); |
2384 |
|
} |
2385 |
|
} |
2386 |
|
return false; |