196 |
|
* for work-stealing (this would contaminate lifo/fifo |
197 |
|
* processing). Instead, we randomly associate submission queues |
198 |
|
* with submitting threads, using a form of hashing. The |
199 |
< |
* ThreadLocal Submitter class contains a value initially used as |
200 |
< |
* a hash code for choosing existing queues, but may be randomly |
201 |
< |
* repositioned upon contention with other submitters. In |
202 |
< |
* essence, submitters act like workers except that they are |
203 |
< |
* restricted to executing local tasks that they submitted (or in |
204 |
< |
* the case of CountedCompleters, others with the same root task). |
205 |
< |
* However, because most shared/external queue operations are more |
206 |
< |
* expensive than internal, and because, at steady state, external |
207 |
< |
* submitters will compete for CPU with workers, ForkJoinTask.join |
208 |
< |
* and related methods disable them from repeatedly helping to |
209 |
< |
* process tasks if all workers are active. Insertion of tasks in |
210 |
< |
* shared mode requires a lock (mainly to protect in the case of |
199 |
> |
* ThreadLocalRandom probe value serves as a hash code for |
200 |
> |
* choosing existing queues, and may be randomly repositioned upon |
201 |
> |
* contention with other submitters. In essence, submitters act |
202 |
> |
* like workers except that they are restricted to executing local |
203 |
> |
* tasks that they submitted (or in the case of CountedCompleters, |
204 |
> |
* others with the same root task). However, because most |
205 |
> |
* shared/external queue operations are more expensive than |
206 |
> |
* internal, and because, at steady state, external submitters |
207 |
> |
* will compete for CPU with workers, ForkJoinTask.join and |
208 |
> |
* related methods disable them from repeatedly helping to process |
209 |
> |
* tasks if all workers are active. Insertion of tasks in shared |
210 |
> |
* mode requires a lock (mainly to protect in the case of |
211 |
|
* resizing) but we use only a simple spinlock (using bits in |
212 |
|
* field qlock), because submitters encountering a busy queue move |
213 |
|
* on to try or create other queues -- they block only when |
535 |
|
} |
536 |
|
|
537 |
|
/** |
538 |
– |
* Per-thread records for threads that submit to pools. Currently |
539 |
– |
* holds only pseudo-random seed / index that is used to choose |
540 |
– |
* submission queues in method externalPush. In the future, this may |
541 |
– |
* also incorporate a means to implement different task rejection |
542 |
– |
* and resubmission policies. |
543 |
– |
* |
544 |
– |
* Seeds for submitters and workers/workQueues work in basically |
545 |
– |
* the same way but are initialized and updated using slightly |
546 |
– |
* different mechanics. Both are initialized using the same |
547 |
– |
* approach as in class ThreadLocal, where successive values are |
548 |
– |
* unlikely to collide with previous values. Seeds are then |
549 |
– |
* randomly modified upon collisions using xorshifts, which |
550 |
– |
* requires a non-zero seed. |
551 |
– |
*/ |
552 |
– |
static final class Submitter { |
553 |
– |
int seed; |
554 |
– |
Submitter(int s) { seed = s; } |
555 |
– |
} |
556 |
– |
|
557 |
– |
/** |
538 |
|
* Class for artificial tasks that are used to replace the target |
539 |
|
* of local joins if they are removed from an interior queue slot |
540 |
|
* in WorkQueue.tryRemoveAndExec. We don't need the proxy to |
1034 |
|
defaultForkJoinWorkerThreadFactory; |
1035 |
|
|
1036 |
|
/** |
1057 |
– |
* Per-thread submission bookkeeping. Shared across all pools |
1058 |
– |
* to reduce ThreadLocal pollution and because random motion |
1059 |
– |
* to avoid contention in one pool is likely to hold for others. |
1060 |
– |
* Lazily initialized on first submission (but null-checked |
1061 |
– |
* in other contexts to avoid unnecessary initialization). |
1062 |
– |
*/ |
1063 |
– |
static final ThreadLocal<Submitter> submitters; |
1064 |
– |
|
1065 |
– |
/** |
1037 |
|
* Permission required for callers of methods that may start or |
1038 |
|
* kill threads. |
1039 |
|
*/ |
1225 |
|
* a more conservative alternative to a pure spinlock. |
1226 |
|
*/ |
1227 |
|
private int acquirePlock() { |
1228 |
< |
int spins = PL_SPINS, r = 0, ps, nps; |
1228 |
> |
int spins = PL_SPINS, ps, nps; |
1229 |
|
for (;;) { |
1230 |
|
if (((ps = plock) & PL_LOCK) == 0 && |
1231 |
|
U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK)) |
1232 |
|
return nps; |
1262 |
– |
else if (r == 0) { // randomize spins if possible |
1263 |
– |
Thread t = Thread.currentThread(); WorkQueue w; Submitter z; |
1264 |
– |
if ((t instanceof ForkJoinWorkerThread) && |
1265 |
– |
(w = ((ForkJoinWorkerThread)t).workQueue) != null) |
1266 |
– |
r = w.seed; |
1267 |
– |
else if ((z = submitters.get()) != null) |
1268 |
– |
r = z.seed; |
1269 |
– |
else |
1270 |
– |
r = 1; |
1271 |
– |
} |
1233 |
|
else if (spins >= 0) { |
1234 |
< |
r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift |
1274 |
< |
if (r >= 0) |
1234 |
> |
if (ThreadLocalRandom.nextSecondarySeed() >= 0) |
1235 |
|
--spins; |
1236 |
|
} |
1237 |
|
else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) { |
1424 |
|
* @param task the task. Caller must ensure non-null. |
1425 |
|
*/ |
1426 |
|
final void externalPush(ForkJoinTask<?> task) { |
1427 |
< |
WorkQueue[] ws; WorkQueue q; Submitter z; int m; ForkJoinTask<?>[] a; |
1428 |
< |
if ((z = submitters.get()) != null && plock > 0 && |
1427 |
> |
WorkQueue[] ws; WorkQueue q; int z, m; ForkJoinTask<?>[] a; |
1428 |
> |
if ((z = ThreadLocalRandom.getProbe()) != 0 && plock > 0 && |
1429 |
|
(ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && |
1430 |
< |
(q = ws[m & z.seed & SQMASK]) != null && |
1430 |
> |
(q = ws[m & z & SQMASK]) != null && |
1431 |
|
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock |
1432 |
|
int b = q.base, s = q.top, n, an; |
1433 |
|
if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) { |
1462 |
|
* reinitialize if workQueues exists, while still advancing plock. |
1463 |
|
*/ |
1464 |
|
private void fullExternalPush(ForkJoinTask<?> task) { |
1465 |
< |
int r = 0; // random index seed |
1466 |
< |
for (Submitter z = submitters.get();;) { |
1465 |
> |
int r; |
1466 |
> |
if ((r = ThreadLocalRandom.getProbe()) == 0) { |
1467 |
> |
ThreadLocalRandom.localInit(); |
1468 |
> |
r = ThreadLocalRandom.getProbe(); |
1469 |
> |
} |
1470 |
> |
for (;;) { |
1471 |
|
WorkQueue[] ws; WorkQueue q; int ps, m, k; |
1472 |
< |
if (z == null) { |
1473 |
< |
if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed, |
1510 |
< |
r += SEED_INCREMENT) && r != 0) |
1511 |
< |
submitters.set(z = new Submitter(r)); |
1512 |
< |
} |
1513 |
< |
else if (r == 0) { // move to a different index |
1514 |
< |
r = z.seed; |
1515 |
< |
r ^= r << 13; // same xorshift as WorkQueues |
1516 |
< |
r ^= r >>> 17; |
1517 |
< |
z.seed = r ^ (r << 5); |
1518 |
< |
} |
1519 |
< |
else if ((ps = plock) < 0) |
1472 |
> |
boolean move = false; |
1473 |
> |
if ((ps = plock) < 0) |
1474 |
|
throw new RejectedExecutionException(); |
1475 |
|
else if (ps == 0 || (ws = workQueues) == null || |
1476 |
|
(m = ws.length - 1) < 0) { // initialize workQueues |
1510 |
|
return; |
1511 |
|
} |
1512 |
|
} |
1513 |
< |
r = 0; // move on failure |
1513 |
> |
move = true; // move on failure |
1514 |
|
} |
1515 |
|
else if (((ps = plock) & PL_LOCK) == 0) { // create new queue |
1516 |
|
q = new WorkQueue(this, null, SHARED_QUEUE, r); |
1524 |
|
releasePlock(nps); |
1525 |
|
} |
1526 |
|
else |
1527 |
< |
r = 0; // try elsewhere while lock held |
1527 |
> |
move = true; // move if busy |
1528 |
> |
if (move) |
1529 |
> |
r = ThreadLocalRandom.advanceProbe(r); |
1530 |
|
} |
1531 |
|
} |
1532 |
|
|
2285 |
|
* least one task. |
2286 |
|
*/ |
2287 |
|
static WorkQueue commonSubmitterQueue() { |
2288 |
< |
ForkJoinPool p; WorkQueue[] ws; int m; Submitter z; |
2289 |
< |
return ((z = submitters.get()) != null && |
2288 |
> |
ForkJoinPool p; WorkQueue[] ws; int m, z; |
2289 |
> |
return ((z = ThreadLocalRandom.getProbe()) != 0 && |
2290 |
|
(p = common) != null && |
2291 |
|
(ws = p.workQueues) != null && |
2292 |
|
(m = ws.length - 1) >= 0) ? |
2293 |
< |
ws[m & z.seed & SQMASK] : null; |
2293 |
> |
ws[m & z & SQMASK] : null; |
2294 |
|
} |
2295 |
|
|
2296 |
|
/** |
2297 |
|
* Tries to pop the given task from submitter's queue in common pool. |
2298 |
|
*/ |
2299 |
|
static boolean tryExternalUnpush(ForkJoinTask<?> t) { |
2300 |
< |
ForkJoinPool p; WorkQueue[] ws; WorkQueue q; Submitter z; |
2301 |
< |
ForkJoinTask<?>[] a; int m, s; |
2300 |
> |
ForkJoinPool p; WorkQueue[] ws; WorkQueue q; |
2301 |
> |
ForkJoinTask<?>[] a; int m, s, z; |
2302 |
|
if (t != null && |
2303 |
< |
(z = submitters.get()) != null && |
2303 |
> |
(z = ThreadLocalRandom.getProbe()) != 0 && |
2304 |
|
(p = common) != null && |
2305 |
|
(ws = p.workQueues) != null && |
2306 |
|
(m = ws.length - 1) >= 0 && |
2307 |
< |
(q = ws[m & z.seed & SQMASK]) != null && |
2307 |
> |
(q = ws[m & z & SQMASK]) != null && |
2308 |
|
(s = q.top) != q.base && |
2309 |
|
(a = q.array) != null) { |
2310 |
|
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; |
2374 |
|
*/ |
2375 |
|
static void externalHelpJoin(ForkJoinTask<?> t) { |
2376 |
|
// Some hard-to-avoid overlap with tryExternalUnpush |
2377 |
< |
ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; Submitter z; |
2378 |
< |
ForkJoinTask<?>[] a; int m, s, n; |
2377 |
> |
ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; |
2378 |
> |
ForkJoinTask<?>[] a; int m, s, n, z; |
2379 |
|
if (t != null && |
2380 |
< |
(z = submitters.get()) != null && |
2380 |
> |
(z = ThreadLocalRandom.getProbe()) != 0 && |
2381 |
|
(p = common) != null && |
2382 |
|
(ws = p.workQueues) != null && |
2383 |
|
(m = ws.length - 1) >= 0 && |
2384 |
< |
(q = ws[m & z.seed & SQMASK]) != null && |
2384 |
> |
(q = ws[m & z & SQMASK]) != null && |
2385 |
|
(a = q.array) != null) { |
2386 |
|
int am = a.length - 1; |
2387 |
|
if ((s = q.top) != q.base) { |
3314 |
|
if ((s & (s-1)) != 0) |
3315 |
|
throw new Error("data type scale not a power of two"); |
3316 |
|
|
3361 |
– |
submitters = new ThreadLocal<Submitter>(); |
3317 |
|
ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory = |
3318 |
|
new DefaultForkJoinWorkerThreadFactory(); |
3319 |
|
modifyThreadPermission = new RuntimePermission("modifyThread"); |