164 |
|
private static final int MAX_HELP_DEPTH = 8; |
165 |
|
|
166 |
|
/** |
167 |
< |
* The wakeup interval (in nanoseconds) for the first worker |
167 |
> |
* The wakeup interval (in nanoseconds) for the oldest worker |
168 |
|
* suspended as spare. On each wakeup not signalled by a |
169 |
|
* resumption, it may ask the pool to reduce the number of spares. |
170 |
|
*/ |
171 |
< |
private static final long TRIM_RATE_NANOS = 200L * 1000L * 1000L; |
171 |
> |
private static final long TRIM_RATE_NANOS = |
172 |
> |
5L * 1000L * 1000L * 1000L; // 5sec |
173 |
|
|
174 |
|
/** |
175 |
|
* Capacity of work-stealing queue array upon initialization. |
225 |
|
* Run state of this worker. In addition to the usual run levels, |
226 |
|
* tracks if this worker is suspended as a spare, and if it was |
227 |
|
* killed (trimmed) while suspended. However, "active" status is |
228 |
< |
* maintained separately. |
228 |
> |
* maintained separately and modified only in conjunction with |
229 |
> |
* CASes of the pool's runState (which are currently sadly manually |
230 |
> |
* inlined for performance.) |
231 |
|
*/ |
232 |
|
private volatile int runState; |
233 |
|
|
383 |
|
*/ |
384 |
|
protected void onTermination(Throwable exception) { |
385 |
|
try { |
386 |
+ |
ForkJoinPool p = pool; |
387 |
+ |
if (active) { |
388 |
+ |
int a; // inline p.tryDecrementActiveCount |
389 |
+ |
active = false; |
390 |
+ |
do {} while(!UNSAFE.compareAndSwapInt |
391 |
+ |
(p, poolRunStateOffset, a = p.runState, a - 1)); |
392 |
+ |
} |
393 |
|
cancelTasks(); |
384 |
– |
while (active) // force inactive |
385 |
– |
active = !pool.tryDecrementActiveCount(); |
394 |
|
setTerminated(); |
395 |
< |
pool.workerTerminated(this); |
395 |
> |
p.workerTerminated(this); |
396 |
|
} catch (Throwable ex) { // Shouldn't ever happen |
397 |
|
if (exception == null) // but if so, at least rethrown |
398 |
|
exception = ex; |
461 |
|
private boolean tryExecSubmission() { |
462 |
|
ForkJoinPool p = pool; |
463 |
|
while (p.hasQueuedSubmissions()) { |
464 |
< |
ForkJoinTask<?> t; |
465 |
< |
if (active || (active = p.tryIncrementActiveCount())) { |
464 |
> |
ForkJoinTask<?> t; int a; |
465 |
> |
if (active || // ugly/hacky: inline p.tryIncrementActiveCount |
466 |
> |
(active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, |
467 |
> |
a = p.runState, a + 1))) { |
468 |
|
if ((t = p.pollSubmission()) != null) { |
469 |
|
currentSteal = t; |
470 |
|
t.quietlyExec(); |
717 |
|
ForkJoinWorkerThread v = ws[k & mask]; |
718 |
|
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift |
719 |
|
if (v != null && v.base != v.sp) { |
720 |
< |
ForkJoinTask<?>[] q; int b; |
721 |
< |
if ((canSteal || // ensure active status |
722 |
< |
(canSteal = active = p.tryIncrementActiveCount())) && |
720 |
> |
ForkJoinTask<?>[] q; int b, a; |
721 |
> |
if ((canSteal || // Ugly/hacky: inline |
722 |
> |
(canSteal = active = // p.tryIncrementActiveCount |
723 |
> |
UNSAFE.compareAndSwapInt(p, poolRunStateOffset, |
724 |
> |
a = p.runState, a + 1))) && |
725 |
|
(q = v.queue) != null && (b = v.base) != v.sp) { |
726 |
|
int i = (q.length - 1) & b; |
727 |
|
long u = (i << qShift) + qBase; // raw offset |
761 |
|
final boolean isTrimmed() { return (runState & TRIMMED) != 0; } |
762 |
|
|
763 |
|
/** |
764 |
< |
* Sets state to TERMINATING, also, unless "quiet", unparking if |
765 |
< |
* not already terminated |
754 |
< |
* |
755 |
< |
* @param quiet don't unpark (used for faster status updates on |
756 |
< |
* pool termination) |
764 |
> |
* Sets state to TERMINATING. Does NOT unpark or interrupt |
765 |
> |
* to wake up if currently blocked. |
766 |
|
*/ |
767 |
< |
final void shutdown(boolean quiet) { |
767 |
> |
final void shutdown() { |
768 |
|
for (;;) { |
769 |
|
int s = runState; |
770 |
|
if ((s & (TERMINATING|TERMINATED)) != 0) |
779 |
|
s | TERMINATING)) |
780 |
|
break; |
781 |
|
} |
773 |
– |
if (!quiet && (runState & TERMINATED) != 0) |
774 |
– |
LockSupport.unpark(this); |
782 |
|
} |
783 |
|
|
784 |
|
/** |
792 |
|
} |
793 |
|
|
794 |
|
/** |
795 |
< |
* If suspended, tries to set status to unsuspended and unparks. |
795 |
> |
* If suspended, tries to set status to unsuspended. |
796 |
|
* |
797 |
|
* @return true if successful |
798 |
|
*/ |
809 |
|
/** |
810 |
|
* Sets suspended status and blocks as spare until resumed |
811 |
|
* or shutdown. |
805 |
– |
* @returns true if still running on exit |
812 |
|
*/ |
813 |
< |
final boolean suspendAsSpare() { |
808 |
< |
lastEventCount = 0; // reset upon resume |
813 |
> |
final void suspendAsSpare() { |
814 |
|
for (;;) { // set suspended unless terminating |
815 |
|
int s = runState; |
816 |
|
if ((s & TERMINATING) != 0) { // must kill |
817 |
|
if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, |
818 |
|
s | (TRIMMED | TERMINATING))) |
819 |
< |
return false; |
819 |
> |
return; |
820 |
|
} |
821 |
|
else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, |
822 |
|
s | SUSPENDED)) |
824 |
|
} |
825 |
|
ForkJoinPool p = pool; |
826 |
|
p.pushSpare(this); |
827 |
+ |
lastEventCount = 0; // reset upon resume |
828 |
|
while ((runState & SUSPENDED) != 0) { |
829 |
< |
if (!p.tryAccumulateStealCount(this)) |
830 |
< |
continue; |
831 |
< |
interrupted(); // clear/ignore interrupts |
832 |
< |
if ((runState & SUSPENDED) == 0) |
827 |
< |
break; |
828 |
< |
if (nextSpare != 0) // untimed |
829 |
< |
LockSupport.park(this); |
830 |
< |
else { |
831 |
< |
long startTime = System.nanoTime(); |
832 |
< |
LockSupport.parkNanos(this, TRIM_RATE_NANOS); |
829 |
> |
if (p.tryAccumulateStealCount(this)) { |
830 |
> |
boolean untimed = nextSpare != 0; |
831 |
> |
long startTime = untimed? 0 : System.nanoTime(); |
832 |
> |
interrupted(); // clear/ignore interrupts |
833 |
|
if ((runState & SUSPENDED) == 0) |
834 |
|
break; |
835 |
< |
long now = System.nanoTime(); |
836 |
< |
if (now - startTime >= TRIM_RATE_NANOS) |
837 |
< |
pool.tryTrimSpare(now); |
835 |
> |
if (untimed) // untimed |
836 |
> |
LockSupport.park(this); |
837 |
> |
else { |
838 |
> |
LockSupport.parkNanos(this, TRIM_RATE_NANOS); |
839 |
> |
if ((runState & SUSPENDED) == 0) |
840 |
> |
break; |
841 |
> |
if (System.nanoTime() - startTime >= TRIM_RATE_NANOS) |
842 |
> |
p.tryShutdownSpare(); |
843 |
> |
} |
844 |
|
} |
845 |
|
} |
840 |
– |
return runState == 0; |
846 |
|
} |
847 |
|
|
848 |
|
// Misc support methods for ForkJoinPool |
907 |
|
* @return a task, if available |
908 |
|
*/ |
909 |
|
final ForkJoinTask<?> pollLocalTask() { |
910 |
+ |
ForkJoinPool p = pool; |
911 |
|
while (sp != base) { |
912 |
< |
if (active || (active = pool.tryIncrementActiveCount())) |
912 |
> |
int a; // inline p.tryIncrementActiveCount |
913 |
> |
if (active || |
914 |
> |
(active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, |
915 |
> |
a = p.runState, a + 1))) |
916 |
|
return locallyFifo? locallyDeqTask() : popTask(); |
917 |
|
} |
918 |
|
return null; |
1125 |
|
} |
1126 |
|
else { |
1127 |
|
ForkJoinPool p = pool; |
1128 |
+ |
int a; // to inline CASes |
1129 |
|
if (active) { |
1130 |
< |
if (!p.tryDecrementActiveCount()) |
1130 |
> |
if (!UNSAFE.compareAndSwapInt |
1131 |
> |
(p, poolRunStateOffset, a = p.runState, a - 1)) |
1132 |
|
continue; // retry later |
1133 |
|
active = false; // inactivate |
1134 |
|
} |
1135 |
|
if (p.isQuiescent()) { |
1136 |
|
active = true; // re-activate |
1137 |
< |
do {} while (!p.tryIncrementActiveCount()); |
1137 |
> |
do {} while(!UNSAFE.compareAndSwapInt |
1138 |
> |
(p, poolRunStateOffset, a = p.runState, a+1)); |
1139 |
|
return; |
1140 |
|
} |
1141 |
|
} |
1155 |
|
objectFieldOffset("currentSteal", ForkJoinWorkerThread.class); |
1156 |
|
private static final long qBase = |
1157 |
|
UNSAFE.arrayBaseOffset(ForkJoinTask[].class); |
1158 |
+ |
private static final long poolRunStateOffset = // to inline CAS |
1159 |
+ |
objectFieldOffset("runState", ForkJoinPool.class); |
1160 |
|
|
1161 |
|
private static final int qShift; |
1162 |
|
|