195 |
|
* WorkQueues are also used in a similar way for tasks submitted |
196 |
|
* to the pool. We cannot mix these tasks in the same queues used |
197 |
|
* for work-stealing (this would contaminate lifo/fifo |
198 |
< |
* processing). Instead, we loosely associate (via hashing) |
199 |
< |
* submission queues with submitting threads, and randomly scan |
200 |
< |
* these queues as well when looking for work. In essence, |
201 |
< |
* submitters act like workers except that they never take tasks, |
202 |
< |
* and they are multiplexed on to a finite number of shared work |
203 |
< |
* queues. However, classes are set up so that future extensions |
204 |
< |
* could allow submitters to optionally help perform tasks as |
205 |
< |
* well. Pool submissions from internal workers are also allowed, |
206 |
< |
* but use randomized rather than thread-hashed queue indices to |
207 |
< |
* avoid imbalance. Insertion of tasks in shared mode requires a |
208 |
< |
* lock (mainly to protect in the case of resizing) but we use |
209 |
< |
* only a simple spinlock (using bits in field runState), because |
210 |
< |
* submitters encountering a busy queue try or create others so |
211 |
< |
* never block. |
198 |
> |
* processing). Instead, we loosely associate submission queues |
199 |
> |
* with submitting threads, using a form of hashing. The |
200 |
> |
* ThreadLocal Submitter class contains a value initially used as |
201 |
> |
* a hash code for choosing existing queues, but may be randomly |
202 |
> |
* repositioned upon contention with other submitters. In |
203 |
> |
* essence, submitters act like workers except that they never |
204 |
> |
* take tasks, and they are multiplexed on to a finite number of |
205 |
> |
* shared work queues. However, classes are set up so that future |
206 |
> |
* extensions could allow submitters to optionally help perform |
207 |
> |
* tasks as well. Pool submissions from internal workers are also |
208 |
> |
* allowed, but use randomized rather than thread-hashed queue |
209 |
> |
* indices to avoid imbalance. Insertion of tasks in shared mode |
210 |
> |
* requires a lock (mainly to protect in the case of resizing) but |
211 |
> |
* we use only a simple spinlock (using bits in field runState), |
212 |
> |
* because submitters encountering a busy queue try or create |
213 |
> |
* others so never block. |
214 |
|
* |
215 |
|
* Management. |
216 |
|
* ========== |
1087 |
|
} |
1088 |
|
|
1089 |
|
/** |
1090 |
< |
* Computes a hash code for the given thread. This method is |
1091 |
< |
* expected to provide higher-quality hash codes than those using |
1092 |
< |
* method hashCode(). |
1093 |
< |
*/ |
1094 |
< |
static final int hashThread(Thread t) { |
1095 |
< |
long id = (t == null) ? 0L : t.getId(); // Use MurmurHash of thread id |
1096 |
< |
int h = (int)id ^ (int)(id >>> 32); |
1097 |
< |
h ^= h >>> 16; |
1098 |
< |
h *= 0x85ebca6b; |
1099 |
< |
h ^= h >>> 13; |
1100 |
< |
h *= 0xc2b2ae35; |
1101 |
< |
return h ^ (h >>> 16); |
1090 |
> |
<<<<<<< ForkJoinPool.java |
1091 |
> |
* Per-thread records for (typically non-FJ) threads that submit |
1092 |
> |
* to pools. Cureently holds only psuedo-random seed / index that |
1093 |
> |
* is used to chose submission queues in method doSubmit. In the |
1094 |
> |
* future, this may incorporate a means to implement different |
1095 |
> |
* task rejection and resubmission policies. |
1096 |
> |
*/ |
1097 |
> |
static final class Submitter { |
1098 |
> |
int seed; // seed for random submission queue selection |
1099 |
> |
|
1100 |
> |
// Heuristic padding to ameliorate unfortunate memory placements |
1101 |
> |
int p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; |
1102 |
> |
|
1103 |
> |
Submitter() { |
1104 |
> |
// Use identityHashCode, forced negative, for seed |
1105 |
> |
seed = System.identityHashCode(Thread.currentThread()) | (1 << 31); |
1106 |
> |
} |
1107 |
> |
|
1108 |
> |
/** |
1109 |
> |
* Computes next value for random probes. Like method |
1110 |
> |
* WorkQueue.nextSeed, this is manually inlined in several |
1111 |
> |
* usages to avoid writes inside busy loops. |
1112 |
> |
*/ |
1113 |
> |
final int nextSeed() { |
1114 |
> |
int r = seed; |
1115 |
> |
r ^= r << 13; |
1116 |
> |
r ^= r >>> 17; |
1117 |
> |
return seed = r ^= r << 5; |
1118 |
> |
} |
1119 |
> |
} |
1120 |
> |
|
1121 |
> |
/** ThreadLocal class for Submitters */ |
1122 |
> |
static final class ThreadSubmitter extends ThreadLocal<Submitter> { |
1123 |
> |
public Submitter initialValue() { return new Submitter(); } |
1124 |
|
} |
1125 |
|
|
1126 |
|
/** |
1127 |
+ |
* Per-thread submission bookeeping. Shared across all pools |
1128 |
+ |
* to reduce ThreadLocal pollution and because random motion |
1129 |
+ |
* to avoid contention in one pool is likely to hold for others. |
1130 |
+ |
*/ |
1131 |
+ |
static final ThreadSubmitter submitters = new ThreadSubmitter(); |
1132 |
+ |
|
1133 |
+ |
/** |
1134 |
|
* Top-level runloop for workers |
1135 |
|
*/ |
1136 |
|
final void runWorker(ForkJoinWorkerThread wt) { |
1137 |
+ |
// Initialize queue array and seed in this thread |
1138 |
|
WorkQueue w = wt.workQueue; |
1139 |
< |
w.growArray(false); // Initialize queue array and seed in this thread |
1140 |
< |
w.seed = hashThread(Thread.currentThread()) | (1 << 31); // force < 0 |
1139 |
> |
w.growArray(false); |
1140 |
> |
// Same initial hash as Submitters |
1141 |
> |
w.seed = System.identityHashCode(Thread.currentThread()) | (1 << 31); |
1142 |
|
|
1143 |
|
do {} while (w.runTask(scan(w))); |
1144 |
|
} |
1251 |
|
U.throwException(ex); |
1252 |
|
} |
1253 |
|
|
1254 |
+ |
/** |
1255 |
+ |
* Tries to add and register a new queue at the given index. |
1256 |
+ |
* |
1257 |
+ |
* @param idx the workQueues array index to register the queue |
1258 |
+ |
* @return the queue, or null if could not add because could |
1259 |
+ |
* not acquire lock or idx is unusable |
1260 |
+ |
*/ |
1261 |
+ |
private WorkQueue tryAddSharedQueue(int idx) { |
1262 |
+ |
WorkQueue q = null; |
1263 |
+ |
ReentrantLock lock = this.lock; |
1264 |
+ |
if (idx >= 0 && (idx & 1) == 0 && !lock.isLocked()) { |
1265 |
+ |
// create queue outside of lock but only if apparently free |
1266 |
+ |
WorkQueue nq = new WorkQueue(null, SHARED_QUEUE); |
1267 |
+ |
if (lock.tryLock()) { |
1268 |
+ |
try { |
1269 |
+ |
WorkQueue[] ws = workQueues; |
1270 |
+ |
if (ws != null && idx < ws.length) { |
1271 |
+ |
if ((q = ws[idx]) == null) { |
1272 |
+ |
int rs; // update runState seq |
1273 |
+ |
ws[idx] = q = nq; |
1274 |
+ |
runState = (((rs = runState) & SHUTDOWN) | |
1275 |
+ |
((rs + RS_SEQ) & ~SHUTDOWN)); |
1276 |
+ |
} |
1277 |
+ |
} |
1278 |
+ |
} finally { |
1279 |
+ |
lock.unlock(); |
1280 |
+ |
} |
1281 |
+ |
} |
1282 |
+ |
} |
1283 |
+ |
return q; |
1284 |
+ |
} |
1285 |
|
|
1286 |
|
// Maintaining ctl counts |
1287 |
|
|
1384 |
|
// Submissions |
1385 |
|
|
1386 |
|
/** |
1387 |
< |
* Unless shutting down, adds the given task to some submission |
1388 |
< |
* queue; using a randomly chosen queue index if the caller is a |
1389 |
< |
* ForkJoinWorkerThread, else one based on caller thread's hash |
1390 |
< |
* code. If no queue exists at the index, one is created. If the |
1327 |
< |
* queue is busy, another is chosen by sweeping through the queues |
1328 |
< |
* array. |
1387 |
> |
* Unless shutting down, adds the given task to a submission queue |
1388 |
> |
* at submitter's current queue index. If no queue exists at the |
1389 |
> |
* index, one is created unless pool lock is busy. If the queue |
1390 |
> |
* and/or lock are busy, another index is randomly chosen. |
1391 |
|
*/ |
1392 |
|
private void doSubmit(ForkJoinTask<?> task) { |
1393 |
|
if (task == null) |
1394 |
|
throw new NullPointerException(); |
1395 |
< |
Thread t = Thread.currentThread(); |
1396 |
< |
int r = ((t instanceof ForkJoinWorkerThread) ? |
1397 |
< |
((ForkJoinWorkerThread)t).workQueue.nextSeed() : hashThread(t)); |
1336 |
< |
for (;;) { |
1395 |
> |
Submitter s = submitters.get(); |
1396 |
> |
for (int r = s.seed;;) { |
1397 |
> |
WorkQueue q; int k; |
1398 |
|
int rs = runState, m = rs & SMASK; |
1338 |
– |
int j = r &= (m & ~1); // even numbered queues |
1399 |
|
WorkQueue[] ws = workQueues; |
1400 |
< |
if (rs < 0 || ws == null) |
1401 |
< |
throw new RejectedExecutionException(); // shutting down |
1402 |
< |
if (ws.length > m) { // consistency check |
1403 |
< |
for (WorkQueue q;;) { // circular sweep |
1404 |
< |
if (((q = ws[j]) != null || |
1405 |
< |
(q = tryAddSharedQueue(j)) != null) && |
1406 |
< |
q.trySharedPush(task)) { |
1407 |
< |
signalWork(); |
1348 |
< |
return; |
1349 |
< |
} |
1350 |
< |
if ((j = (j + 2) & m) == r) { |
1351 |
< |
Thread.yield(); // all queues busy |
1352 |
< |
break; |
1353 |
< |
} |
1354 |
< |
} |
1400 |
> |
if (rs < 0 || ws == null) // shutting down |
1401 |
> |
throw new RejectedExecutionException(); |
1402 |
> |
if (ws.length > m && // k must be at index |
1403 |
> |
((q = ws[k = (r << 1) & m]) != null || |
1404 |
> |
(q = tryAddSharedQueue(k)) != null) && |
1405 |
> |
q.trySharedPush(task)) { |
1406 |
> |
signalWork(); |
1407 |
> |
return; |
1408 |
|
} |
1409 |
+ |
r ^= r << 13; // xorshift seed to new position |
1410 |
+ |
r ^= r >>> 17; |
1411 |
+ |
if (((s.seed = r ^= r << 5) & m) == 0) |
1412 |
+ |
Thread.yield(); // occasionally yield if busy |
1413 |
|
} |
1414 |
|
} |
1415 |
|
|
1359 |
– |
/** |
1360 |
– |
* Tries to add and register a new queue at the given index. |
1361 |
– |
* |
1362 |
– |
* @param idx the workQueues array index to register the queue |
1363 |
– |
* @return the queue, or null if could not add because could |
1364 |
– |
* not acquire lock or idx is unusable |
1365 |
– |
*/ |
1366 |
– |
private WorkQueue tryAddSharedQueue(int idx) { |
1367 |
– |
WorkQueue q = null; |
1368 |
– |
ReentrantLock lock = this.lock; |
1369 |
– |
if (idx >= 0 && (idx & 1) == 0 && !lock.isLocked()) { |
1370 |
– |
// create queue outside of lock but only if apparently free |
1371 |
– |
WorkQueue nq = new WorkQueue(null, SHARED_QUEUE); |
1372 |
– |
if (lock.tryLock()) { |
1373 |
– |
try { |
1374 |
– |
WorkQueue[] ws = workQueues; |
1375 |
– |
if (ws != null && idx < ws.length) { |
1376 |
– |
if ((q = ws[idx]) == null) { |
1377 |
– |
int rs; // update runState seq |
1378 |
– |
ws[idx] = q = nq; |
1379 |
– |
runState = (((rs = runState) & SHUTDOWN) | |
1380 |
– |
((rs + RS_SEQ) & ~SHUTDOWN)); |
1381 |
– |
} |
1382 |
– |
} |
1383 |
– |
} finally { |
1384 |
– |
lock.unlock(); |
1385 |
– |
} |
1386 |
– |
} |
1387 |
– |
} |
1388 |
– |
return q; |
1389 |
– |
} |
1416 |
|
|
1417 |
|
// Scanning for tasks |
1418 |
|
|