496 |
|
*/ |
497 |
|
private volatile long eventWaiters; |
498 |
|
|
499 |
< |
private static final int EVENT_COUNT_SHIFT = 32; |
500 |
< |
private static final long WAITER_ID_MASK = (1L << 16) - 1L; |
499 |
> |
private static final int EVENT_COUNT_SHIFT = 32; |
500 |
> |
private static final int WAITER_ID_MASK = (1 << 16) - 1; |
501 |
|
|
502 |
|
/** |
503 |
|
* A counter for events that may wake up worker threads: |
586 |
|
// are usually manually inlined by callers |
587 |
|
|
588 |
|
/** |
589 |
< |
* Increments running count part of workerCounts |
589 |
> |
* Increments running count part of workerCounts. |
590 |
|
*/ |
591 |
|
final void incrementRunningCount() { |
592 |
|
int c; |
596 |
|
} |
597 |
|
|
598 |
|
/** |
599 |
< |
* Tries to decrement running count unless already zero |
599 |
> |
* Tries to increment running count part of workerCounts. |
600 |
> |
*/ |
601 |
> |
final boolean tryIncrementRunningCount() { |
602 |
> |
int c; |
603 |
> |
return UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
604 |
> |
c = workerCounts, |
605 |
> |
c + ONE_RUNNING); |
606 |
> |
} |
607 |
> |
|
608 |
> |
/** |
609 |
> |
* Tries to decrement running count unless already zero. |
610 |
|
*/ |
611 |
|
final boolean tryDecrementRunningCount() { |
612 |
|
int wc = workerCounts; |
679 |
|
for (k = 0; k < n && ws[k] != null; ++k) |
680 |
|
; |
681 |
|
if (k == n) |
682 |
< |
ws = Arrays.copyOf(ws, n << 1); |
682 |
> |
ws = workers = Arrays.copyOf(ws, n << 1); |
683 |
|
} |
684 |
|
ws[k] = w; |
685 |
< |
workers = ws; // volatile array write ensures slot visibility |
685 |
> |
int c = eventCount; // advance event count to ensure visibility |
686 |
> |
UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1); |
687 |
|
} finally { |
688 |
|
lock.unlock(); |
689 |
|
} |
737 |
|
int ec = eventCount; |
738 |
|
boolean releasedOne = false; |
739 |
|
ForkJoinWorkerThread w; int id; |
740 |
< |
while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 && |
740 |
> |
while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 && |
741 |
|
(int)(h >>> EVENT_COUNT_SHIFT) != ec && |
742 |
|
id < n && (w = ws[id]) != null) { |
743 |
|
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, |
775 |
|
long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1)); |
776 |
|
long h; |
777 |
|
while ((runState < SHUTDOWN || !tryTerminate(false)) && |
778 |
< |
(((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 || |
778 |
> |
(((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 || |
779 |
|
(int)(h >>> EVENT_COUNT_SHIFT) == ec) && |
780 |
|
eventCount == ec) { |
781 |
|
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, |
804 |
|
(workerCounts & RUNNING_COUNT_MASK) <= 1); |
805 |
|
long startTime = untimed ? 0 : System.nanoTime(); |
806 |
|
Thread.interrupted(); // clear/ignore interrupt |
807 |
< |
if (eventCount != ec || w.isTerminating()) |
807 |
> |
if (w.isTerminating() || eventCount != ec) |
808 |
|
break; // recheck after clear |
809 |
|
if (untimed) |
810 |
|
LockSupport.park(w); |
842 |
|
if ((sw = spareWaiters) != 0 && |
843 |
|
(id = (sw & SPARE_ID_MASK) - 1) >= 0 && |
844 |
|
id < n && (w = ws[id]) != null && |
845 |
< |
(workerCounts & RUNNING_COUNT_MASK) < parallelism && |
845 |
> |
(runState >= TERMINATING || |
846 |
> |
(workerCounts & RUNNING_COUNT_MASK) < parallelism) && |
847 |
|
spareWaiters == sw && |
848 |
|
UNSAFE.compareAndSwapInt(this, spareWaitersOffset, |
849 |
|
sw, w.nextSpare)) { |
897 |
|
break; |
898 |
|
} |
899 |
|
w.start(recordWorker(w), ueh); |
900 |
< |
if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) { |
889 |
< |
int c; // advance event count |
890 |
< |
UNSAFE.compareAndSwapInt(this, eventCountOffset, |
891 |
< |
c = eventCount, c+1); |
900 |
> |
if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) |
901 |
|
break; // add at most one unless total below target |
893 |
– |
} |
902 |
|
} |
903 |
|
} |
904 |
|
if (eventWaiters != 0L) |
934 |
|
} |
935 |
|
else if ((h = eventWaiters) != 0L) { |
936 |
|
long nh; |
937 |
< |
int id = ((int)(h & WAITER_ID_MASK)) - 1; |
937 |
> |
int id = (((int)h) & WAITER_ID_MASK) - 1; |
938 |
|
if (id >= 0 && id < n && (w = ws[id]) != null && |
939 |
|
(nh = w.nextWaiter) != 0L && // keep at least one worker |
940 |
|
UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) |
982 |
|
int pc = parallelism; |
983 |
|
while (w.runState == 0) { |
984 |
|
int rs = runState; |
985 |
< |
if (rs >= TERMINATING) { // propagate shutdown |
985 |
> |
if (rs >= TERMINATING) { // propagate shutdown |
986 |
|
w.shutdown(); |
987 |
|
break; |
988 |
|
} |
989 |
|
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) && |
990 |
< |
UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1)) |
990 |
> |
UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) { |
991 |
|
inactivate = active = w.active = false; |
992 |
< |
int wc = workerCounts; |
992 |
> |
if (rs == SHUTDOWN) { // all inactive and shut down |
993 |
> |
tryTerminate(false); |
994 |
> |
continue; |
995 |
> |
} |
996 |
> |
} |
997 |
> |
int wc = workerCounts; // try to suspend as spare |
998 |
|
if ((wc & RUNNING_COUNT_MASK) > pc) { |
999 |
|
if (!(inactivate |= active) && // must inactivate to suspend |
1000 |
< |
workerCounts == wc && // try to suspend as spare |
1000 |
> |
workerCounts == wc && |
1001 |
|
UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1002 |
|
wc, wc - ONE_RUNNING)) |
1003 |
|
w.suspendAsSpare(); |
1004 |
|
} |
1005 |
|
else if ((wc >>> TOTAL_COUNT_SHIFT) < pc) |
1006 |
|
helpMaintainParallelism(); // not enough workers |
1007 |
< |
else if (!ran) { |
1007 |
> |
else if (ran) |
1008 |
> |
break; |
1009 |
> |
else { |
1010 |
|
long h = eventWaiters; |
1011 |
|
int ec = eventCount; |
1012 |
|
if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec) |
1018 |
|
else if (!(inactivate |= active)) |
1019 |
|
eventSync(w, wec); // must inactivate before sync |
1020 |
|
} |
1006 |
– |
else |
1007 |
– |
break; |
1021 |
|
} |
1022 |
|
} |
1023 |
|
|
1034 |
|
boolean timed, long nanos) { |
1035 |
|
long startTime = timed? System.nanoTime() : 0L; |
1036 |
|
int retries = 2 + (parallelism >> 2); // #helpJoins before blocking |
1037 |
+ |
boolean running = true; // false when count decremented |
1038 |
|
while (joinMe.status >= 0) { |
1025 |
– |
int wc; |
1026 |
– |
long nt = 0L; |
1039 |
|
if (runState >= TERMINATING) { |
1040 |
|
joinMe.cancelIgnoringExceptions(); |
1041 |
|
break; |
1042 |
|
} |
1043 |
< |
worker.helpJoinTask(joinMe); |
1043 |
> |
running = worker.helpJoinTask(joinMe, running); |
1044 |
|
if (joinMe.status < 0) |
1045 |
|
break; |
1046 |
< |
else if (retries > 0) |
1046 |
> |
if (retries > 0) { |
1047 |
|
--retries; |
1048 |
< |
else if (timed && |
1049 |
< |
(nt = nanos - (System.nanoTime() - startTime)) <= 0L) |
1050 |
< |
break; |
1051 |
< |
else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 && |
1052 |
< |
UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1053 |
< |
wc, wc - ONE_RUNNING)) { |
1054 |
< |
int stat, c; long h; |
1055 |
< |
while ((stat = joinMe.status) >= 0 && |
1056 |
< |
(h = eventWaiters) != 0L && // help release others |
1057 |
< |
(int)(h >>> EVENT_COUNT_SHIFT) != eventCount) |
1048 |
> |
continue; |
1049 |
> |
} |
1050 |
> |
int wc = workerCounts; |
1051 |
> |
if ((wc & RUNNING_COUNT_MASK) != 0) { |
1052 |
> |
if (running) { |
1053 |
> |
if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1054 |
> |
wc, wc - ONE_RUNNING)) |
1055 |
> |
continue; |
1056 |
> |
running = false; |
1057 |
> |
} |
1058 |
> |
long h = eventWaiters; |
1059 |
> |
if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) |
1060 |
|
releaseEventWaiters(); |
1061 |
< |
if (stat >= 0) { |
1062 |
< |
if ((workerCounts & RUNNING_COUNT_MASK) != 0) { |
1063 |
< |
long ms; int ns; |
1064 |
< |
if (!timed) { |
1061 |
> |
if ((workerCounts & RUNNING_COUNT_MASK) != 0) { |
1062 |
> |
long ms; int ns; |
1063 |
> |
if (!timed) { |
1064 |
> |
ms = JOIN_TIMEOUT_MILLIS; |
1065 |
> |
ns = 0; |
1066 |
> |
} |
1067 |
> |
else { // at most JOIN_TIMEOUT_MILLIS per wait |
1068 |
> |
long nt = nanos - (System.nanoTime() - startTime); |
1069 |
> |
if (nt <= 0L) |
1070 |
> |
break; |
1071 |
> |
ms = nt / 1000000; |
1072 |
> |
if (ms > JOIN_TIMEOUT_MILLIS) { |
1073 |
|
ms = JOIN_TIMEOUT_MILLIS; |
1074 |
|
ns = 0; |
1075 |
|
} |
1076 |
< |
else { // at most JOIN_TIMEOUT_MILLIS per wait |
1077 |
< |
ms = nt / 1000000; |
1056 |
< |
if (ms > JOIN_TIMEOUT_MILLIS) { |
1057 |
< |
ms = JOIN_TIMEOUT_MILLIS; |
1058 |
< |
ns = 0; |
1059 |
< |
} |
1060 |
< |
else |
1061 |
< |
ns = (int) (nt % 1000000); |
1062 |
< |
} |
1063 |
< |
stat = joinMe.internalAwaitDone(ms, ns); |
1076 |
> |
else |
1077 |
> |
ns = (int) (nt % 1000000); |
1078 |
|
} |
1079 |
< |
if (stat >= 0) // timeout or no running workers |
1066 |
< |
helpMaintainParallelism(); |
1079 |
> |
joinMe.internalAwaitDone(ms, ns); |
1080 |
|
} |
1081 |
< |
do {} while (!UNSAFE.compareAndSwapInt |
1082 |
< |
(this, workerCountsOffset, |
1070 |
< |
c = workerCounts, c + ONE_RUNNING)); |
1071 |
< |
if (stat < 0) |
1072 |
< |
break; // else restart |
1081 |
> |
if (joinMe.status < 0) |
1082 |
> |
break; |
1083 |
|
} |
1084 |
+ |
helpMaintainParallelism(); |
1085 |
+ |
} |
1086 |
+ |
if (!running) { |
1087 |
+ |
int c; |
1088 |
+ |
do {} while (!UNSAFE.compareAndSwapInt |
1089 |
+ |
(this, workerCountsOffset, |
1090 |
+ |
c = workerCounts, c + ONE_RUNNING)); |
1091 |
|
} |
1092 |
|
} |
1093 |
|
|
1098 |
|
throws InterruptedException { |
1099 |
|
while (!blocker.isReleasable()) { |
1100 |
|
int wc = workerCounts; |
1101 |
< |
if ((wc & RUNNING_COUNT_MASK) != 0 && |
1102 |
< |
UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1103 |
< |
wc, wc - ONE_RUNNING)) { |
1101 |
> |
if ((wc & RUNNING_COUNT_MASK) == 0) |
1102 |
> |
helpMaintainParallelism(); |
1103 |
> |
else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1104 |
> |
wc, wc - ONE_RUNNING)) { |
1105 |
|
try { |
1106 |
|
while (!blocker.isReleasable()) { |
1107 |
|
long h = eventWaiters; |
1151 |
|
return true; |
1152 |
|
} |
1153 |
|
|
1136 |
– |
|
1154 |
|
/** |
1155 |
|
* Actions on transition to TERMINATING |
1156 |
|
* |