393 |
|
* where no action is required, we reduce write contention by |
394 |
|
* equivalently prefacing signalWork when called by an external |
395 |
|
* task producer using a memory access with full-volatile |
396 |
< |
* semantics (alternatively, a "fullFence" could be used). (4) For |
397 |
< |
* internal task producers we rely on the fact that even if no |
398 |
< |
* other workers awaken, the producer itself will eventually see |
399 |
< |
* the task and execute it. |
396 |
> |
* semantics or a "fullFence". (4) For internal task producers we |
397 |
> |
* rely on the fact that even if no other workers awaken, the |
398 |
> |
* producer itself will eventually see the task and execute it. |
399 |
|
* |
400 |
|
* Almost always, too many signals are issued. A task producer |
401 |
|
* cannot in general tell if some existing worker is in the midst |
811 |
|
} |
812 |
|
|
813 |
|
/** |
814 |
< |
* Pushes a task. Call only by owner in unshared queues. (The |
816 |
< |
* shared-queue version is embedded in method externalPush.) |
814 |
> |
* Pushes a task. Call only by owner in unshared queues. |
815 |
|
* |
816 |
|
* @param task the task. Caller must ensure non-null. |
817 |
|
* @throws RejectedExecutionException if array cannot be resized |
846 |
|
(t = top) - (b = base) > 0) { |
847 |
|
int mask = size - 1; |
848 |
|
do { // emulate poll from old array, push to new array |
849 |
< |
long offset = ((b & oldMask) << ASHIFT) + ABASE; |
849 |
> |
int index = b & oldMask; |
850 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
851 |
|
ForkJoinTask<?> x = (ForkJoinTask<?>) |
852 |
|
U.getObjectVolatile(oldA, offset); |
853 |
|
if (x != null && |
865 |
|
*/ |
866 |
|
final ForkJoinTask<?> pop() { |
867 |
|
int b = base, s = top, al, i; ForkJoinTask<?>[] a; |
868 |
< |
if ((a = array) != null && s != b && (al = a.length) > 0) { |
869 |
< |
long offset = (((al - 1) & (s - 1)) << ASHIFT) + ABASE; |
868 |
> |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
869 |
> |
int index = (al - 1) & --s; |
870 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
871 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObject(a, offset); |
872 |
|
if (t != null && U.compareAndSwapObject(a, offset, t, null)) { |
873 |
< |
top = s - 1; |
873 |
> |
top = s; |
874 |
|
U.storeFence(); |
875 |
|
return t; |
876 |
|
} |
886 |
|
final ForkJoinTask<?> pollAt(int b) { |
887 |
|
ForkJoinTask<?>[] a; int al; |
888 |
|
if ((a = array) != null && (al = a.length) > 0) { |
889 |
< |
long offset = (((al - 1) & b) << ASHIFT) + ABASE; |
889 |
> |
int index = (al - 1) & b; |
890 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
891 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
892 |
|
U.getObjectVolatile(a, offset); |
893 |
|
if (t != null && b++ == base && |
907 |
|
int b = base, s = top, d, al; ForkJoinTask<?>[] a; |
908 |
|
if ((a = array) != null && (d = b - s) < 0 && |
909 |
|
(al = a.length) > 0) { |
910 |
< |
long offset = (((al - 1) & b) << ASHIFT) + ABASE; |
910 |
> |
int index = (al - 1) & b; |
911 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
912 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
913 |
|
U.getObjectVolatile(a, offset); |
914 |
|
if (b++ == base) { |
948 |
|
* Pops the given task only if it is at the current top. |
949 |
|
*/ |
950 |
|
final boolean tryUnpush(ForkJoinTask<?> task) { |
951 |
< |
int s = top - 1, al; ForkJoinTask<?>[] a; |
952 |
< |
if ((a = array) != null && (al = a.length) > 0) { |
953 |
< |
long offset = (((al - 1) & s) << ASHIFT) + ABASE; |
954 |
< |
if (U.getObject(a, offset) == task && |
955 |
< |
U.compareAndSwapObject(a, offset, task, null)) { |
951 |
> |
int b = base, s = top, al; ForkJoinTask<?>[] a; |
952 |
> |
if ((a = array) != null && b != s && (al = a.length) > 0) { |
953 |
> |
int index = (al - 1) & --s; |
954 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
955 |
> |
if (U.compareAndSwapObject(a, offset, task, null)) { |
956 |
|
top = s; |
957 |
|
U.storeFence(); |
958 |
|
return true; |
962 |
|
} |
963 |
|
|
964 |
|
/** |
965 |
+ |
* Shared version of push. Fails if already locked. |
966 |
+ |
* |
967 |
+ |
* @return status: > 0 locked, 0 was empty, < 0 was nonempty |
968 |
+ |
*/ |
969 |
+ |
final int sharedPush(ForkJoinTask<?> task) { |
970 |
+ |
int stat; |
971 |
+ |
if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { |
972 |
+ |
int b = base, s = top, al, d; ForkJoinTask<?>[] a; |
973 |
+ |
if ((a = array) != null && (al = a.length) > 0 && |
974 |
+ |
al - 1 + (d = b - s) > 0) { |
975 |
+ |
a[(al - 1) & s] = task; |
976 |
+ |
top = s + 1; // relaxed writes OK here |
977 |
+ |
U.putOrderedInt(this, QLOCK, 0); |
978 |
+ |
if (d < 0) |
979 |
+ |
stat = d; |
980 |
+ |
else { |
981 |
+ |
U.fullFence(); // sync with signallees |
982 |
+ |
stat = 0; |
983 |
+ |
} |
984 |
+ |
} |
985 |
+ |
else { |
986 |
+ |
growAndSharedPush(task); |
987 |
+ |
stat = 0; |
988 |
+ |
} |
989 |
+ |
} |
990 |
+ |
else |
991 |
+ |
stat = 1; |
992 |
+ |
return stat; |
993 |
+ |
} |
994 |
+ |
|
995 |
+ |
/* |
996 |
+ |
* Helper for sharedPush; called only when locked and resize |
997 |
+ |
* needed. |
998 |
+ |
*/ |
999 |
+ |
private void growAndSharedPush(ForkJoinTask<?> task) { |
1000 |
+ |
try { |
1001 |
+ |
growArray(); |
1002 |
+ |
int s = top, al; ForkJoinTask<?>[] a; |
1003 |
+ |
if ((a = array) != null && (al = a.length) > 0) { |
1004 |
+ |
a[(al - 1) & s] = task; |
1005 |
+ |
top = s + 1; |
1006 |
+ |
} |
1007 |
+ |
} finally { |
1008 |
+ |
qlock = 0; |
1009 |
+ |
} |
1010 |
+ |
} |
1011 |
+ |
|
1012 |
+ |
/** |
1013 |
|
* Shared version of pop |
1014 |
|
*/ |
1015 |
|
final boolean trySharedUnpush(ForkJoinTask<?> task) { |
966 |
– |
ForkJoinTask<?> t; |
1016 |
|
boolean popped = false; |
1017 |
|
int s = top - 1, al; ForkJoinTask<?>[] a; |
1018 |
|
if ((a = array) != null && (al = a.length) > 0) { |
1019 |
< |
long offset = (((al - 1) & s) << ASHIFT) + ABASE; |
1020 |
< |
if (U.getObject(a, offset) == task && |
1019 |
> |
int index = (al - 1) & s; |
1020 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
1021 |
> |
ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObject(a, offset); |
1022 |
> |
if (t == task && |
1023 |
|
U.compareAndSwapInt(this, QLOCK, 0, 1)) { |
1024 |
|
if (U.compareAndSwapObject(a, offset, task, null)) { |
1025 |
|
popped = true; |
1057 |
|
for (int nexec = 0;;) { |
1058 |
|
int b = base, s = top, al; ForkJoinTask<?>[] a; |
1059 |
|
if ((a = array) != null && b != s && (al = a.length) > 0) { |
1060 |
< |
long offset = (((al - 1) & --s) << ASHIFT) + ABASE; |
1060 |
> |
int index = (al - 1) & --s; |
1061 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
1062 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1063 |
|
U.getAndSetObject(a, offset, null); |
1064 |
|
if (t != null) { |
1082 |
|
for (int nexec = 0;;) { |
1083 |
|
int b = base, s = top, al; ForkJoinTask<?>[] a; |
1084 |
|
if ((a = array) != null && b != s && (al = a.length) > 0) { |
1085 |
< |
long offset = (((al - 1) & b++) << ASHIFT) + ABASE; |
1085 |
> |
int index = (al - 1) & b++; |
1086 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
1087 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1088 |
|
U.getAndSetObject(a, offset, null); |
1089 |
|
if (t != null) { |
1142 |
|
while ((d = (b = base) - (s = top)) < 0 && |
1143 |
|
(a = array) != null && (al = a.length) > 0) { |
1144 |
|
for (;;) { // traverse from s to b |
1145 |
< |
int i = --s & (al - 1); |
1146 |
< |
long offset = (i << ASHIFT) + ABASE; |
1145 |
> |
int index = --s & (al - 1); |
1146 |
> |
long offset = (index << ASHIFT) + ABASE; |
1147 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1148 |
|
U.getObjectVolatile(a, offset); |
1149 |
|
if (t == null) |
1195 |
|
final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) { |
1196 |
|
int b = base, s = top, al; ForkJoinTask<?>[] a; |
1197 |
|
if ((a = array) != null && b != s && (al = a.length) > 0) { |
1198 |
< |
long offset = (((al - 1) & (s - 1)) << ASHIFT) + ABASE; |
1199 |
< |
Object o = U.getObjectVolatile(a, offset); |
1198 |
> |
int index = (al - 1) & (s - 1); |
1199 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
1200 |
> |
ForkJoinTask<?> o = (ForkJoinTask<?>) |
1201 |
> |
U.getObjectVolatile(a, offset); |
1202 |
|
if (o instanceof CountedCompleter) { |
1203 |
|
CountedCompleter<?> t = (CountedCompleter<?>)o; |
1204 |
|
for (CountedCompleter<?> r = t;;) { |
1245 |
|
ForkJoinTask<?>[] a; |
1246 |
|
int b = base, s = top, al, h; |
1247 |
|
if ((a = array) != null && b != s && (al = a.length) > 0) { |
1248 |
< |
long offset = (((al - 1) & b) << ASHIFT) + ABASE; |
1249 |
< |
Object o = U.getObjectVolatile(a, offset); |
1248 |
> |
int index = (al - 1) & b; |
1249 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
1250 |
> |
ForkJoinTask<?> o = (ForkJoinTask<?>) |
1251 |
> |
U.getObjectVolatile(a, offset); |
1252 |
|
if (o == null) |
1253 |
|
h = 2; // retryable |
1254 |
|
else if (!(o instanceof CountedCompleter)) |
1322 |
|
|
1323 |
|
/** |
1324 |
|
* Permission required for callers of methods that may start or |
1325 |
< |
* kill threads. |
1325 |
> |
* kill threads. Also used as a private static lock in tryInitialize. |
1326 |
|
*/ |
1327 |
|
private static final RuntimePermission modifyThreadPermission; |
1328 |
|
|
1443 |
|
volatile int runState; |
1444 |
|
final int config; // parallelism, mode |
1445 |
|
int indexSeed; // to generate worker index |
1446 |
+ |
volatile AtomicLong stealCounter; // also used as sync monitor |
1447 |
|
volatile WorkQueue[] workQueues; // main registry |
1448 |
+ |
final String workerNamePrefix; // to create worker name string |
1449 |
|
final ForkJoinWorkerThreadFactory factory; |
1450 |
|
final UncaughtExceptionHandler ueh; // per-worker UEH |
1392 |
– |
final String workerNamePrefix; // to create worker name string |
1393 |
– |
volatile AtomicLong stealCounter; // also used as sync monitor |
1451 |
|
|
1452 |
|
/** |
1453 |
|
* Instantiates fields upon first submission, and/or throws |
1454 |
|
* exception if terminating. Called only by externalPush. |
1455 |
|
*/ |
1456 |
|
private void tryInitialize() { |
1457 |
< |
int rs; |
1458 |
< |
while (((rs = runState) & STARTED) == 0) { |
1459 |
< |
AtomicLong sc = new AtomicLong(); |
1460 |
< |
if (U.compareAndSwapObject(this, STEALCOUNTER, null, sc)) { |
1461 |
< |
// create workQueues array with size a power of two |
1462 |
< |
int p = config & SMASK; // ensure at least 2 slots |
1463 |
< |
int n = (p > 1) ? p - 1 : 1; |
1464 |
< |
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; |
1465 |
< |
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; |
1466 |
< |
workQueues = new WorkQueue[n]; |
1467 |
< |
while (!U.compareAndSwapInt(this, RUNSTATE, rs, rs | STARTED)) |
1468 |
< |
rs = runState; |
1469 |
< |
synchronized (sc) { sc.notifyAll(); } |
1413 |
< |
break; |
1414 |
< |
} |
1415 |
< |
else if ((sc = stealCounter) != null) { // wait for initialization |
1416 |
< |
synchronized (sc) { |
1417 |
< |
try { |
1418 |
< |
if ((runState & STARTED) == 0) |
1419 |
< |
sc.wait(); |
1420 |
< |
} catch (InterruptedException ie) { |
1421 |
< |
Thread.currentThread().interrupt(); |
1422 |
< |
} |
1457 |
> |
if ((runState & STARTED) == 0) { // bootstrap by locking static field |
1458 |
> |
int rs; // create workQueues array with size a power of two |
1459 |
> |
int p = config & SMASK; // ensure at least 2 slots |
1460 |
> |
int n = (p > 1) ? p - 1 : 1; |
1461 |
> |
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; |
1462 |
> |
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; |
1463 |
> |
synchronized(modifyThreadPermission) { |
1464 |
> |
if ((runState & STARTED) == 0) { |
1465 |
> |
stealCounter = new AtomicLong(); |
1466 |
> |
workQueues = new WorkQueue[n]; |
1467 |
> |
do {} while (!U.compareAndSwapInt(this, RUNSTATE, |
1468 |
> |
rs = runState, |
1469 |
> |
rs | STARTED)); |
1470 |
|
} |
1471 |
|
} |
1472 |
|
} |
1473 |
< |
if ((rs = runState) < 0) { |
1473 |
> |
if (runState < 0) { |
1474 |
|
tryTerminate(false, false); // help terminate |
1475 |
|
throw new RejectedExecutionException(); |
1476 |
|
} |
1907 |
|
WorkQueue q; ForkJoinTask<?>[] a; int b, d, al; |
1908 |
|
if ((q = ws[idx]) != null && (d = (b = q.base) - q.top) < 0 && |
1909 |
|
(a = q.array) != null && (al = a.length) > 0) { |
1910 |
< |
long offset = (((al - 1) & b) << ASHIFT) + ABASE; |
1910 |
> |
int index = (al - 1) & b; |
1911 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
1912 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1913 |
|
U.getObjectVolatile(a, offset); |
1914 |
|
if (t == null || b++ != q.base) |
2058 |
|
ForkJoinTask<?> next = v.currentJoin; |
2059 |
|
ForkJoinTask<?> t = null; |
2060 |
|
if ((a = v.array) != null && (al = a.length) > 0) { |
2061 |
< |
long offset = (((al - 1) & b) << ASHIFT) + ABASE; |
2061 |
> |
int index = (al - 1) & b; |
2062 |
> |
long offset = ((long)index << ASHIFT) + ABASE; |
2063 |
|
t = (ForkJoinTask<?>)U.getObjectVolatile(a, offset); |
2064 |
|
if (t != null && b++ == v.base) { |
2065 |
|
if (j.currentJoin != subtask || |
2452 |
|
WorkQueue q = new WorkQueue(this, null); |
2453 |
|
q.config = index; |
2454 |
|
q.scanState = ~UNSIGNALLED; |
2455 |
< |
synchronized (lock) { // lock to install |
2455 |
> |
q.qlock = 1; // lock queue |
2456 |
> |
boolean installed = false; |
2457 |
> |
synchronized (lock) { // lock pool to install |
2458 |
|
WorkQueue[] ws; |
2459 |
|
if ((ws = workQueues) != null && index < ws.length && |
2460 |
|
ws[index] == null) { |
2461 |
< |
ws[index] = q; // else throw away |
2461 |
> |
ws[index] = q; // else throw away |
2462 |
> |
installed = true; |
2463 |
> |
} |
2464 |
> |
} |
2465 |
> |
if (installed) { |
2466 |
> |
try { |
2467 |
> |
q.growArray(); |
2468 |
> |
} finally { |
2469 |
> |
q.qlock = 0; |
2470 |
|
} |
2471 |
|
} |
2472 |
|
} |
2482 |
|
* @param task the task. Caller must ensure non-null. |
2483 |
|
*/ |
2484 |
|
final void externalPush(ForkJoinTask<?> task) { |
2485 |
< |
int r; // initialize caller's probe |
2485 |
> |
int r; // initialize caller's probe |
2486 |
|
if ((r = ThreadLocalRandom.getProbe()) == 0) { |
2487 |
|
ThreadLocalRandom.localInit(); |
2488 |
|
r = ThreadLocalRandom.getProbe(); |
2489 |
|
} |
2490 |
|
for (;;) { |
2491 |
< |
WorkQueue q; int wl, k; |
2491 |
> |
WorkQueue q; int wl, k, stat; |
2492 |
|
int rs = runState; |
2493 |
|
WorkQueue[] ws = workQueues; |
2494 |
|
if (rs <= 0 || ws == null || (wl = ws.length) <= 0) |
2495 |
|
tryInitialize(); |
2496 |
|
else if ((q = ws[k = (wl - 1) & r & SQMASK]) == null) |
2497 |
|
tryCreateExternalQueue(k); |
2498 |
< |
else if (U.compareAndSwapInt(q, QLOCK, 0, 1)) { |
2499 |
< |
int b = q.base, s = q.top, al, d; ForkJoinTask<?>[] a; |
2500 |
< |
if ((a = q.array) != null && (al = a.length) > 0 && |
2501 |
< |
al - 1 + (d = b - s) > 0) { |
2502 |
< |
a[(al - 1) & s] = task; // push |
2444 |
< |
q.top = s + 1; // relaxed writes OK here |
2445 |
< |
if (d != 0) |
2446 |
< |
U.putOrderedInt(q, QLOCK, 0); |
2447 |
< |
else { // sync with signallees |
2448 |
< |
U.putIntVolatile(q, QLOCK, 0); |
2449 |
< |
signalWork(); |
2450 |
< |
} |
2451 |
< |
break; |
2452 |
< |
} |
2453 |
< |
else { // grow then retry |
2454 |
< |
try { |
2455 |
< |
q.growArray(); |
2456 |
< |
} finally { |
2457 |
< |
q.qlock = 0; |
2458 |
< |
} |
2459 |
< |
} |
2498 |
> |
else if ((stat = q.sharedPush(task)) < 0) |
2499 |
> |
break; |
2500 |
> |
else if (stat == 0) { |
2501 |
> |
signalWork(); |
2502 |
> |
break; |
2503 |
|
} |
2504 |
< |
else // move if cannot lock |
2504 |
> |
else // move if busy |
2505 |
|
r = ThreadLocalRandom.advanceProbe(r); |
2506 |
|
} |
2507 |
|
} |
2546 |
|
w.trySharedUnpush(task)); |
2547 |
|
} |
2548 |
|
|
2549 |
+ |
|
2550 |
|
/** |
2551 |
|
* Performs helpComplete for an external submitter. |
2552 |
|
*/ |
3419 |
|
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); |
3420 |
|
private static final long CTL; |
3421 |
|
private static final long RUNSTATE; |
3378 |
– |
private static final long STEALCOUNTER; |
3379 |
– |
private static final long QLOCK; |
3422 |
|
private static final int ABASE; |
3423 |
|
private static final int ASHIFT; |
3424 |
|
|
3428 |
|
(ForkJoinPool.class.getDeclaredField("ctl")); |
3429 |
|
RUNSTATE = U.objectFieldOffset |
3430 |
|
(ForkJoinPool.class.getDeclaredField("runState")); |
3389 |
– |
STEALCOUNTER = U.objectFieldOffset |
3390 |
– |
(ForkJoinPool.class.getDeclaredField("stealCounter")); |
3391 |
– |
|
3392 |
– |
QLOCK = U.objectFieldOffset |
3393 |
– |
(WorkQueue.class.getDeclaredField("qlock")); |
3431 |
|
|
3432 |
|
ABASE = U.arrayBaseOffset(ForkJoinTask[].class); |
3433 |
|
int scale = U.arrayIndexScale(ForkJoinTask[].class); |
3434 |
|
if ((scale & (scale - 1)) != 0) |
3435 |
|
throw new Error("array index scale not a power of two"); |
3436 |
|
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); |
3400 |
– |
|
3437 |
|
} catch (ReflectiveOperationException e) { |
3438 |
|
throw new Error(e); |
3439 |
|
} |