162 |
|
private static final int MAX_HELP_DEPTH = 8; |
163 |
|
|
164 |
|
/** |
165 |
< |
* The wakeup interval (in nanoseconds) for the first worker |
165 |
> |
* The wakeup interval (in nanoseconds) for the oldest worker |
166 |
|
* suspended as spare. On each wakeup not signalled by a |
167 |
|
* resumption, it may ask the pool to reduce the number of spares. |
168 |
|
*/ |
169 |
< |
private static final long TRIM_RATE_NANOS = 200L * 1000L * 1000L; |
169 |
> |
private static final long TRIM_RATE_NANOS = |
170 |
> |
5L * 1000L * 1000L * 1000L; // 5sec |
171 |
|
|
172 |
|
/** |
173 |
|
* Capacity of work-stealing queue array upon initialization. |
223 |
|
* Run state of this worker. In addition to the usual run levels, |
224 |
|
* tracks if this worker is suspended as a spare, and if it was |
225 |
|
* killed (trimmed) while suspended. However, "active" status is |
226 |
< |
* maintained separately. |
226 |
> |
* maintained separately and modified only in conjunction with |
227 |
> |
* CASes of the pool's runState (which are currently sadly manually |
228 |
> |
* inlined for performance.) |
229 |
|
*/ |
230 |
|
private volatile int runState; |
231 |
|
|
381 |
|
*/ |
382 |
|
protected void onTermination(Throwable exception) { |
383 |
|
try { |
384 |
+ |
ForkJoinPool p = pool; |
385 |
+ |
if (active) { |
386 |
+ |
int a; // inline p.tryDecrementActiveCount |
387 |
+ |
active = false; |
388 |
+ |
do {} while(!UNSAFE.compareAndSwapInt |
389 |
+ |
(p, poolRunStateOffset, a = p.runState, a - 1)); |
390 |
+ |
} |
391 |
|
cancelTasks(); |
382 |
– |
while (active) // force inactive |
383 |
– |
active = !pool.tryDecrementActiveCount(); |
392 |
|
setTerminated(); |
393 |
< |
pool.workerTerminated(this); |
393 |
> |
p.workerTerminated(this); |
394 |
|
} catch (Throwable ex) { // Shouldn't ever happen |
395 |
|
if (exception == null) // but if so, at least rethrown |
396 |
|
exception = ex; |
459 |
|
private boolean tryExecSubmission() { |
460 |
|
ForkJoinPool p = pool; |
461 |
|
while (p.hasQueuedSubmissions()) { |
462 |
< |
ForkJoinTask<?> t; |
463 |
< |
if (active || (active = p.tryIncrementActiveCount())) { |
462 |
> |
ForkJoinTask<?> t; int a; |
463 |
> |
if (active || // ugly/hacky: inline p.tryIncrementActiveCount |
464 |
> |
(active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, |
465 |
> |
a = p.runState, a + 1))) { |
466 |
|
if ((t = p.pollSubmission()) != null) { |
467 |
|
currentSteal = t; |
468 |
|
t.quietlyExec(); |
715 |
|
ForkJoinWorkerThread v = ws[k & mask]; |
716 |
|
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // inline xorshift |
717 |
|
if (v != null && v.base != v.sp) { |
718 |
< |
ForkJoinTask<?>[] q; int b; |
719 |
< |
if ((canSteal || // ensure active status |
720 |
< |
(canSteal = active = p.tryIncrementActiveCount())) && |
718 |
> |
ForkJoinTask<?>[] q; int b, a; |
719 |
> |
if ((canSteal || // Ugly/hacky: inline |
720 |
> |
(canSteal = active = // p.tryIncrementActiveCount |
721 |
> |
UNSAFE.compareAndSwapInt(p, poolRunStateOffset, |
722 |
> |
a = p.runState, a + 1))) && |
723 |
|
(q = v.queue) != null && (b = v.base) != v.sp) { |
724 |
|
int i = (q.length - 1) & b; |
725 |
|
long u = (i << qShift) + qBase; // raw offset |
759 |
|
final boolean isTrimmed() { return (runState & TRIMMED) != 0; } |
760 |
|
|
761 |
|
/** |
762 |
< |
* Sets state to TERMINATING, also, unless "quiet", unparking if |
763 |
< |
* not already terminated |
752 |
< |
* |
753 |
< |
* @param quiet don't unpark (used for faster status updates on |
754 |
< |
* pool termination) |
762 |
> |
* Sets state to TERMINATING. Does NOT unpark or interrupt |
763 |
> |
* to wake up if currently blocked. |
764 |
|
*/ |
765 |
< |
final void shutdown(boolean quiet) { |
765 |
> |
final void shutdown() { |
766 |
|
for (;;) { |
767 |
|
int s = runState; |
768 |
|
if ((s & (TERMINATING|TERMINATED)) != 0) |
777 |
|
s | TERMINATING)) |
778 |
|
break; |
779 |
|
} |
771 |
– |
if (!quiet && (runState & TERMINATED) != 0) |
772 |
– |
LockSupport.unpark(this); |
780 |
|
} |
781 |
|
|
782 |
|
/** |
790 |
|
} |
791 |
|
|
792 |
|
/** |
793 |
< |
* If suspended, tries to set status to unsuspended and unparks. |
793 |
> |
* If suspended, tries to set status to unsuspended. |
794 |
|
* |
795 |
|
* @return true if successful |
796 |
|
*/ |
807 |
|
/** |
808 |
|
* Sets suspended status and blocks as spare until resumed |
809 |
|
* or shutdown. |
803 |
– |
* @returns true if still running on exit |
810 |
|
*/ |
811 |
< |
final boolean suspendAsSpare() { |
806 |
< |
lastEventCount = 0; // reset upon resume |
811 |
> |
final void suspendAsSpare() { |
812 |
|
for (;;) { // set suspended unless terminating |
813 |
|
int s = runState; |
814 |
|
if ((s & TERMINATING) != 0) { // must kill |
815 |
|
if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, |
816 |
|
s | (TRIMMED | TERMINATING))) |
817 |
< |
return false; |
817 |
> |
return; |
818 |
|
} |
819 |
|
else if (UNSAFE.compareAndSwapInt(this, runStateOffset, s, |
820 |
|
s | SUSPENDED)) |
822 |
|
} |
823 |
|
ForkJoinPool p = pool; |
824 |
|
p.pushSpare(this); |
825 |
+ |
lastEventCount = 0; // reset upon resume |
826 |
|
while ((runState & SUSPENDED) != 0) { |
827 |
< |
if (!p.tryAccumulateStealCount(this)) |
828 |
< |
continue; |
829 |
< |
interrupted(); // clear/ignore interrupts |
830 |
< |
if ((runState & SUSPENDED) == 0) |
825 |
< |
break; |
826 |
< |
if (nextSpare != 0) // untimed |
827 |
< |
LockSupport.park(this); |
828 |
< |
else { |
829 |
< |
long startTime = System.nanoTime(); |
830 |
< |
LockSupport.parkNanos(this, TRIM_RATE_NANOS); |
827 |
> |
if (p.tryAccumulateStealCount(this)) { |
828 |
> |
boolean untimed = nextSpare != 0; |
829 |
> |
long startTime = untimed? 0 : System.nanoTime(); |
830 |
> |
interrupted(); // clear/ignore interrupts |
831 |
|
if ((runState & SUSPENDED) == 0) |
832 |
|
break; |
833 |
< |
long now = System.nanoTime(); |
834 |
< |
if (now - startTime >= TRIM_RATE_NANOS) |
835 |
< |
pool.tryTrimSpare(now); |
833 |
> |
if (untimed) // untimed |
834 |
> |
LockSupport.park(this); |
835 |
> |
else { |
836 |
> |
LockSupport.parkNanos(this, TRIM_RATE_NANOS); |
837 |
> |
if ((runState & SUSPENDED) == 0) |
838 |
> |
break; |
839 |
> |
if (System.nanoTime() - startTime >= TRIM_RATE_NANOS) |
840 |
> |
p.tryShutdownSpare(); |
841 |
> |
} |
842 |
|
} |
843 |
|
} |
838 |
– |
return runState == 0; |
844 |
|
} |
845 |
|
|
846 |
|
// Misc support methods for ForkJoinPool |
905 |
|
* @return a task, if available |
906 |
|
*/ |
907 |
|
final ForkJoinTask<?> pollLocalTask() { |
908 |
+ |
ForkJoinPool p = pool; |
909 |
|
while (sp != base) { |
910 |
< |
if (active || (active = pool.tryIncrementActiveCount())) |
910 |
> |
int a; // inline p.tryIncrementActiveCount |
911 |
> |
if (active || |
912 |
> |
(active = UNSAFE.compareAndSwapInt(p, poolRunStateOffset, |
913 |
> |
a = p.runState, a + 1))) |
914 |
|
return locallyFifo? locallyDeqTask() : popTask(); |
915 |
|
} |
916 |
|
return null; |
1123 |
|
} |
1124 |
|
else { |
1125 |
|
ForkJoinPool p = pool; |
1126 |
+ |
int a; // to inline CASes |
1127 |
|
if (active) { |
1128 |
< |
if (!p.tryDecrementActiveCount()) |
1128 |
> |
if (!UNSAFE.compareAndSwapInt |
1129 |
> |
(p, poolRunStateOffset, a = p.runState, a - 1)) |
1130 |
|
continue; // retry later |
1131 |
|
active = false; // inactivate |
1132 |
|
} |
1133 |
|
if (p.isQuiescent()) { |
1134 |
|
active = true; // re-activate |
1135 |
< |
do {} while (!p.tryIncrementActiveCount()); |
1135 |
> |
do {} while(!UNSAFE.compareAndSwapInt |
1136 |
> |
(p, poolRunStateOffset, a = p.runState, a+1)); |
1137 |
|
return; |
1138 |
|
} |
1139 |
|
} |
1140 |
|
} |
1141 |
|
} |
1142 |
|
|
1131 |
– |
|
1143 |
|
// Unsafe mechanics |
1144 |
|
|
1145 |
|
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); |
1153 |
|
objectFieldOffset("currentSteal", ForkJoinWorkerThread.class); |
1154 |
|
private static final long qBase = |
1155 |
|
UNSAFE.arrayBaseOffset(ForkJoinTask[].class); |
1156 |
+ |
private static final long poolRunStateOffset = // to inline CAS |
1157 |
+ |
objectFieldOffset("runState", ForkJoinPool.class); |
1158 |
|
|
1159 |
|
private static final int qShift; |
1160 |
|
|
1175 |
|
throw error; |
1176 |
|
} |
1177 |
|
} |
1165 |
– |
|
1178 |
|
} |