496 |
|
*/ |
497 |
|
private volatile long eventWaiters; |
498 |
|
|
499 |
< |
private static final int EVENT_COUNT_SHIFT = 32; |
500 |
< |
private static final long WAITER_ID_MASK = (1L << 16) - 1L; |
499 |
> |
private static final int EVENT_COUNT_SHIFT = 32; |
500 |
> |
private static final int WAITER_ID_MASK = (1 << 16) - 1; |
501 |
|
|
502 |
|
/** |
503 |
|
* A counter for events that may wake up worker threads: |
586 |
|
// are usually manually inlined by callers |
587 |
|
|
588 |
|
/** |
589 |
< |
* Increments running count part of workerCounts |
589 |
> |
* Increments running count part of workerCounts. |
590 |
|
*/ |
591 |
|
final void incrementRunningCount() { |
592 |
|
int c; |
596 |
|
} |
597 |
|
|
598 |
|
/** |
599 |
< |
* Tries to decrement running count unless already zero |
599 |
> |
* Tries to increment running count part of workerCounts. |
600 |
> |
*/ |
601 |
> |
final boolean tryIncrementRunningCount() { |
602 |
> |
int c; |
603 |
> |
return UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
604 |
> |
c = workerCounts, |
605 |
> |
c + ONE_RUNNING); |
606 |
> |
} |
607 |
> |
|
608 |
> |
/** |
609 |
> |
* Tries to decrement running count unless already zero. |
610 |
|
*/ |
611 |
|
final boolean tryDecrementRunningCount() { |
612 |
|
int wc = workerCounts; |
679 |
|
for (k = 0; k < n && ws[k] != null; ++k) |
680 |
|
; |
681 |
|
if (k == n) |
682 |
< |
ws = Arrays.copyOf(ws, n << 1); |
682 |
> |
ws = workers = Arrays.copyOf(ws, n << 1); |
683 |
|
} |
684 |
|
ws[k] = w; |
685 |
< |
workers = ws; // volatile array write ensures slot visibility |
685 |
> |
int c = eventCount; // advance event count to ensure visibility |
686 |
> |
UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1); |
687 |
|
} finally { |
688 |
|
lock.unlock(); |
689 |
|
} |
728 |
|
* Releases workers blocked on a count not equal to current count. |
729 |
|
* Normally called after precheck that eventWaiters isn't zero to |
730 |
|
* avoid wasted array checks. Gives up upon a change in count or |
731 |
< |
* upon releasing two workers, letting others take over. |
731 |
> |
* upon releasing four workers, letting others take over. |
732 |
|
*/ |
733 |
|
private void releaseEventWaiters() { |
734 |
|
ForkJoinWorkerThread[] ws = workers; |
735 |
|
int n = ws.length; |
736 |
|
long h = eventWaiters; |
737 |
|
int ec = eventCount; |
738 |
< |
boolean releasedOne = false; |
738 |
> |
int releases = 4; |
739 |
|
ForkJoinWorkerThread w; int id; |
740 |
< |
while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 && |
740 |
> |
while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 && |
741 |
|
(int)(h >>> EVENT_COUNT_SHIFT) != ec && |
742 |
|
id < n && (w = ws[id]) != null) { |
743 |
|
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, |
744 |
|
h, w.nextWaiter)) { |
745 |
|
LockSupport.unpark(w); |
746 |
< |
if (releasedOne) // exit on second release |
746 |
> |
if (--releases == 0) |
747 |
|
break; |
737 |
– |
releasedOne = true; |
748 |
|
} |
749 |
|
if (eventCount != ec) |
750 |
|
break; |
774 |
|
long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1)); |
775 |
|
long h; |
776 |
|
while ((runState < SHUTDOWN || !tryTerminate(false)) && |
777 |
< |
(((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 || |
777 |
> |
(((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 || |
778 |
|
(int)(h >>> EVENT_COUNT_SHIFT) == ec) && |
779 |
|
eventCount == ec) { |
780 |
|
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, |
803 |
|
(workerCounts & RUNNING_COUNT_MASK) <= 1); |
804 |
|
long startTime = untimed ? 0 : System.nanoTime(); |
805 |
|
Thread.interrupted(); // clear/ignore interrupt |
806 |
< |
if (eventCount != ec || w.isTerminating()) |
806 |
> |
if (w.isTerminating() || eventCount != ec) |
807 |
|
break; // recheck after clear |
808 |
|
if (untimed) |
809 |
|
LockSupport.park(w); |
841 |
|
if ((sw = spareWaiters) != 0 && |
842 |
|
(id = (sw & SPARE_ID_MASK) - 1) >= 0 && |
843 |
|
id < n && (w = ws[id]) != null && |
844 |
< |
(workerCounts & RUNNING_COUNT_MASK) < parallelism && |
844 |
> |
(runState >= TERMINATING || |
845 |
> |
(workerCounts & RUNNING_COUNT_MASK) < parallelism) && |
846 |
|
spareWaiters == sw && |
847 |
|
UNSAFE.compareAndSwapInt(this, spareWaitersOffset, |
848 |
|
sw, w.nextSpare)) { |
896 |
|
break; |
897 |
|
} |
898 |
|
w.start(recordWorker(w), ueh); |
899 |
< |
if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) { |
889 |
< |
int c; // advance event count |
890 |
< |
UNSAFE.compareAndSwapInt(this, eventCountOffset, |
891 |
< |
c = eventCount, c+1); |
899 |
> |
if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) |
900 |
|
break; // add at most one unless total below target |
893 |
– |
} |
901 |
|
} |
902 |
|
} |
903 |
|
if (eventWaiters != 0L) |
933 |
|
} |
934 |
|
else if ((h = eventWaiters) != 0L) { |
935 |
|
long nh; |
936 |
< |
int id = ((int)(h & WAITER_ID_MASK)) - 1; |
936 |
> |
int id = (((int)h) & WAITER_ID_MASK) - 1; |
937 |
|
if (id >= 0 && id < n && (w = ws[id]) != null && |
938 |
|
(nh = w.nextWaiter) != 0L && // keep at least one worker |
939 |
|
UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) |
981 |
|
int pc = parallelism; |
982 |
|
while (w.runState == 0) { |
983 |
|
int rs = runState; |
984 |
< |
if (rs >= TERMINATING) { // propagate shutdown |
984 |
> |
if (rs >= TERMINATING) { // propagate shutdown |
985 |
|
w.shutdown(); |
986 |
|
break; |
987 |
|
} |
988 |
|
if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) && |
989 |
< |
UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1)) |
989 |
> |
UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) { |
990 |
|
inactivate = active = w.active = false; |
991 |
< |
int wc = workerCounts; |
991 |
> |
if (rs == SHUTDOWN) { // all inactive and shut down |
992 |
> |
tryTerminate(false); |
993 |
> |
continue; |
994 |
> |
} |
995 |
> |
} |
996 |
> |
int wc = workerCounts; // try to suspend as spare |
997 |
|
if ((wc & RUNNING_COUNT_MASK) > pc) { |
998 |
|
if (!(inactivate |= active) && // must inactivate to suspend |
999 |
< |
workerCounts == wc && // try to suspend as spare |
999 |
> |
workerCounts == wc && |
1000 |
|
UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1001 |
|
wc, wc - ONE_RUNNING)) |
1002 |
|
w.suspendAsSpare(); |
1003 |
|
} |
1004 |
|
else if ((wc >>> TOTAL_COUNT_SHIFT) < pc) |
1005 |
|
helpMaintainParallelism(); // not enough workers |
1006 |
< |
else if (!ran) { |
1006 |
> |
else if (ran) |
1007 |
> |
break; |
1008 |
> |
else { |
1009 |
|
long h = eventWaiters; |
1010 |
|
int ec = eventCount; |
1011 |
|
if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec) |
1017 |
|
else if (!(inactivate |= active)) |
1018 |
|
eventSync(w, wec); // must inactivate before sync |
1019 |
|
} |
1006 |
– |
else |
1007 |
– |
break; |
1020 |
|
} |
1021 |
|
} |
1022 |
|
|
1033 |
|
boolean timed, long nanos) { |
1034 |
|
long startTime = timed? System.nanoTime() : 0L; |
1035 |
|
int retries = 2 + (parallelism >> 2); // #helpJoins before blocking |
1036 |
+ |
boolean running = true; // false when count decremented |
1037 |
|
while (joinMe.status >= 0) { |
1025 |
– |
int wc; |
1026 |
– |
long nt = 0L; |
1038 |
|
if (runState >= TERMINATING) { |
1039 |
|
joinMe.cancelIgnoringExceptions(); |
1040 |
|
break; |
1041 |
|
} |
1042 |
< |
worker.helpJoinTask(joinMe); |
1042 |
> |
running = worker.helpJoinTask(joinMe, running); |
1043 |
|
if (joinMe.status < 0) |
1044 |
|
break; |
1045 |
< |
else if (retries > 0) |
1045 |
> |
if (retries > 0) { |
1046 |
|
--retries; |
1047 |
< |
else if (timed && |
1048 |
< |
(nt = nanos - (System.nanoTime() - startTime)) <= 0L) |
1049 |
< |
break; |
1050 |
< |
else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 && |
1051 |
< |
UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1052 |
< |
wc, wc - ONE_RUNNING)) { |
1053 |
< |
int stat, c; long h; |
1054 |
< |
while ((stat = joinMe.status) >= 0 && |
1055 |
< |
(h = eventWaiters) != 0L && // help release others |
1056 |
< |
(int)(h >>> EVENT_COUNT_SHIFT) != eventCount) |
1047 |
> |
continue; |
1048 |
> |
} |
1049 |
> |
int wc = workerCounts; |
1050 |
> |
if ((wc & RUNNING_COUNT_MASK) != 0) { |
1051 |
> |
if (running) { |
1052 |
> |
if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1053 |
> |
wc, wc - ONE_RUNNING)) |
1054 |
> |
continue; |
1055 |
> |
running = false; |
1056 |
> |
} |
1057 |
> |
long h = eventWaiters; |
1058 |
> |
if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) |
1059 |
|
releaseEventWaiters(); |
1060 |
< |
if (stat >= 0) { |
1061 |
< |
if ((workerCounts & RUNNING_COUNT_MASK) != 0) { |
1062 |
< |
long ms; int ns; |
1063 |
< |
if (!timed) { |
1060 |
> |
if ((workerCounts & RUNNING_COUNT_MASK) != 0) { |
1061 |
> |
long ms; int ns; |
1062 |
> |
if (!timed) { |
1063 |
> |
ms = JOIN_TIMEOUT_MILLIS; |
1064 |
> |
ns = 0; |
1065 |
> |
} |
1066 |
> |
else { // at most JOIN_TIMEOUT_MILLIS per wait |
1067 |
> |
long nt = nanos - (System.nanoTime() - startTime); |
1068 |
> |
if (nt <= 0L) |
1069 |
> |
break; |
1070 |
> |
ms = nt / 1000000; |
1071 |
> |
if (ms > JOIN_TIMEOUT_MILLIS) { |
1072 |
|
ms = JOIN_TIMEOUT_MILLIS; |
1073 |
|
ns = 0; |
1074 |
|
} |
1075 |
< |
else { // at most JOIN_TIMEOUT_MILLIS per wait |
1076 |
< |
ms = nt / 1000000; |
1056 |
< |
if (ms > JOIN_TIMEOUT_MILLIS) { |
1057 |
< |
ms = JOIN_TIMEOUT_MILLIS; |
1058 |
< |
ns = 0; |
1059 |
< |
} |
1060 |
< |
else |
1061 |
< |
ns = (int) (nt % 1000000); |
1062 |
< |
} |
1063 |
< |
stat = joinMe.internalAwaitDone(ms, ns); |
1075 |
> |
else |
1076 |
> |
ns = (int) (nt % 1000000); |
1077 |
|
} |
1078 |
< |
if (stat >= 0) // timeout or no running workers |
1066 |
< |
helpMaintainParallelism(); |
1078 |
> |
joinMe.internalAwaitDone(ms, ns); |
1079 |
|
} |
1080 |
< |
do {} while (!UNSAFE.compareAndSwapInt |
1081 |
< |
(this, workerCountsOffset, |
1070 |
< |
c = workerCounts, c + ONE_RUNNING)); |
1071 |
< |
if (stat < 0) |
1072 |
< |
break; // else restart |
1080 |
> |
if (joinMe.status < 0) |
1081 |
> |
break; |
1082 |
|
} |
1083 |
+ |
helpMaintainParallelism(); |
1084 |
+ |
} |
1085 |
+ |
if (!running) { |
1086 |
+ |
int c; |
1087 |
+ |
do {} while (!UNSAFE.compareAndSwapInt |
1088 |
+ |
(this, workerCountsOffset, |
1089 |
+ |
c = workerCounts, c + ONE_RUNNING)); |
1090 |
|
} |
1091 |
|
} |
1092 |
|
|
1097 |
|
throws InterruptedException { |
1098 |
|
while (!blocker.isReleasable()) { |
1099 |
|
int wc = workerCounts; |
1100 |
< |
if ((wc & RUNNING_COUNT_MASK) != 0 && |
1101 |
< |
UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1102 |
< |
wc, wc - ONE_RUNNING)) { |
1100 |
> |
if ((wc & RUNNING_COUNT_MASK) == 0) |
1101 |
> |
helpMaintainParallelism(); |
1102 |
> |
else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, |
1103 |
> |
wc, wc - ONE_RUNNING)) { |
1104 |
|
try { |
1105 |
|
while (!blocker.isReleasable()) { |
1106 |
|
long h = eventWaiters; |
1145 |
|
// Finish now if all threads terminated; else in some subsequent call |
1146 |
|
if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) { |
1147 |
|
advanceRunLevel(TERMINATED); |
1148 |
< |
termination.arrive(); |
1148 |
> |
termination.forceTermination(); |
1149 |
|
} |
1150 |
|
return true; |
1151 |
|
} |
1152 |
|
|
1136 |
– |
|
1153 |
|
/** |
1154 |
|
* Actions on transition to TERMINATING |
1155 |
|
* |
1773 |
|
* commenced but not yet completed. This method may be useful for |
1774 |
|
* debugging. A return of {@code true} reported a sufficient |
1775 |
|
* period after shutdown may indicate that submitted tasks have |
1776 |
< |
* ignored or suppressed interruption, causing this executor not |
1777 |
< |
* to properly terminate. |
1776 |
> |
* ignored or suppressed interruption, or are waiting for IO, |
1777 |
> |
* causing this executor not to properly terminate. (See the |
1778 |
> |
* advisory notes for class {@link ForkJoinTask} stating that |
1779 |
> |
* tasks should not normally entail blocking operations. But if |
1780 |
> |
* they do, they must abort them on interrupt.) |
1781 |
|
* |
1782 |
|
* @return {@code true} if terminating but not yet terminated |
1783 |
|
*/ |
1815 |
|
public boolean awaitTermination(long timeout, TimeUnit unit) |
1816 |
|
throws InterruptedException { |
1817 |
|
try { |
1818 |
< |
return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0; |
1818 |
> |
termination.awaitAdvanceInterruptibly(0, timeout, unit); |
1819 |
|
} catch (TimeoutException ex) { |
1820 |
|
return false; |
1821 |
|
} |
1822 |
+ |
return true; |
1823 |
|
} |
1824 |
|
|
1825 |
|
/** |