97 |
|
* technique for implementing efficient futures" SIGPLAN Notices, |
98 |
|
* 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs |
99 |
|
* in that: (1) We only maintain dependency links across workers |
100 |
< |
* upon steals, rather than maintain per-task bookkeeping. This |
101 |
< |
* may require a linear scan of workers array to locate stealers, |
102 |
< |
* but usually doesn't because stealers leave hints (that may |
103 |
< |
* become stale/wrong) of where to locate the kathem. This |
104 |
< |
* isolates cost to when it is needed, rather than adding to |
105 |
< |
* per-task overhead. (2) It is "shallow", ignoring nesting and |
106 |
< |
* potentially cyclic mutual steals. (3) It is intentionally |
107 |
< |
* racy: field currentJoin is updated only while actively joining, |
108 |
< |
* which means that we could miss links in the chain during |
109 |
< |
* long-lived tasks, GC stalls etc. (4) We bound the number of |
110 |
< |
* attempts to find work (see MAX_HELP_DEPTH) and fall back to |
111 |
< |
* suspending the worker and if necessary replacing it with a |
112 |
< |
* spare (see ForkJoinPool.tryAwaitJoin). |
100 |
> |
* upon steals, rather than use per-task bookkeeping. This may |
101 |
> |
* require a linear scan of workers array to locate stealers, but |
102 |
> |
* usually doesn't because stealers leave hints (that may become |
103 |
> |
* stale/wrong) of where to locate them. This isolates cost to |
104 |
> |
* when it is needed, rather than adding to per-task overhead. |
105 |
> |
* (2) It is "shallow", ignoring nesting and potentially cyclic |
106 |
> |
* mutual steals. (3) It is intentionally racy: field currentJoin |
107 |
> |
* is updated only while actively joining, which means that we |
108 |
> |
* miss links in the chain during long-lived tasks, GC stalls etc |
109 |
> |
* (which is OK since blocking in such cases is usually a good |
110 |
> |
* idea). (4) We bound the number of attempts to find work (see |
111 |
> |
* MAX_HELP_DEPTH) and fall back to suspending the worker and if |
112 |
> |
* necessary replacing it with a spare (see |
113 |
> |
* ForkJoinPool.tryAwaitJoin). |
114 |
|
* |
115 |
|
* Efficient implementation of these algorithms currently relies |
116 |
|
* on an uncomfortable amount of "Unsafe" mechanics. To maintain |
257 |
|
*/ |
258 |
|
private int seed; |
259 |
|
|
259 |
– |
|
260 |
|
/** |
261 |
|
* Activity status. When true, this worker is considered active. |
262 |
|
* Accessed directly by pool. Must be false upon construction. |
265 |
|
|
266 |
|
/** |
267 |
|
* True if use local fifo, not default lifo, for local polling. |
268 |
< |
* Shadows value from ForkJoinPool, which resets it if changed |
269 |
< |
* pool-wide. |
268 |
> |
* Shadows value from ForkJoinPool. |
269 |
|
*/ |
270 |
|
private final boolean locallyFifo; |
271 |
|
|
320 |
|
*/ |
321 |
|
final void start(int poolIndex, UncaughtExceptionHandler ueh) { |
322 |
|
this.poolIndex = poolIndex; |
323 |
+ |
setDaemon(true); |
324 |
|
if (ueh != null) |
325 |
|
setUncaughtExceptionHandler(ueh); |
326 |
– |
setDaemon(true); |
326 |
|
start(); |
327 |
|
} |
328 |
|
|
760 |
|
} |
761 |
|
|
762 |
|
/** |
763 |
< |
* Instrumented version of park used by ForkJoinPool.awaitEvent |
763 |
> |
* Instrumented version of park used by ForkJoinPool.eventSync |
764 |
|
*/ |
765 |
|
final void doPark() { |
766 |
|
++parkCount; |
801 |
|
s | SUSPENDED)) |
802 |
|
break; |
803 |
|
} |
804 |
+ |
int pc = pool.parallelism; |
805 |
+ |
pool.accumulateStealCount(this); |
806 |
|
boolean timed; |
807 |
|
long nanos; |
808 |
|
long startTime; |
809 |
< |
if (poolIndex < pool.parallelism) { |
809 |
> |
if (poolIndex < pc) { // untimed wait for core threads |
810 |
|
timed = false; |
811 |
|
nanos = 0L; |
812 |
|
startTime = 0L; |
813 |
|
} |
814 |
< |
else { |
814 |
> |
else { // timed wait for added threads |
815 |
|
timed = true; |
816 |
|
nanos = SPARE_KEEPALIVE_NANOS; |
817 |
|
startTime = System.nanoTime(); |
818 |
|
} |
818 |
– |
pool.accumulateStealCount(this); |
819 |
|
lastEventCount = 0; // reset upon resume |
820 |
|
interrupted(); // clear/ignore interrupts |
821 |
|
while ((runState & SUSPENDED) != 0) { |
905 |
|
* @return a task, if available |
906 |
|
*/ |
907 |
|
final ForkJoinTask<?> pollTask() { |
908 |
< |
ForkJoinTask<?> t; |
909 |
< |
return (t = pollLocalTask()) != null ? t : scan(); |
908 |
> |
ForkJoinTask<?> t = pollLocalTask(); |
909 |
> |
if (t == null) { |
910 |
> |
t = scan(); |
911 |
> |
currentSteal = null; // cannot retain/track |
912 |
> |
} |
913 |
> |
return t; |
914 |
|
} |
915 |
|
|
916 |
|
/** |
924 |
|
* @param joinMe the task to join |
925 |
|
* @return task status on exit |
926 |
|
*/ |
927 |
< |
final int joinTask(ForkJoinTask<?> joinMe) { |
927 |
> |
final int joinTask(ForkJoinTask<?> joinMe) { |
928 |
|
int stat; |
929 |
|
ForkJoinTask<?> prevJoin = currentJoin; |
930 |
< |
currentJoin = joinMe; |
930 |
> |
// Only written by this thread; only need ordered store |
931 |
> |
UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); |
932 |
|
if ((stat = joinMe.status) >= 0 && |
933 |
|
(sp == base || (stat = localHelpJoinTask(joinMe)) >= 0)) { |
934 |
< |
ForkJoinPool p = pool; |
935 |
< |
int helpRetries = 2; // initial patience values |
931 |
< |
int awaitRetries = -1; // -1 is sentinel for replace-check only |
932 |
< |
do { |
933 |
< |
helpJoinTask(joinMe, helpRetries); |
934 |
> |
for (int retries = 0; ; ++retries) { |
935 |
> |
helpJoinTask(joinMe, retries); |
936 |
|
if ((stat = joinMe.status) < 0) |
937 |
|
break; |
938 |
< |
boolean busy = p.tryAwaitJoin(joinMe, awaitRetries); |
938 |
> |
pool.tryAwaitJoin(joinMe, retries); |
939 |
|
if ((stat = joinMe.status) < 0) |
940 |
|
break; |
939 |
– |
if (awaitRetries == -1) |
940 |
– |
awaitRetries = 0; |
941 |
– |
else if (busy) |
942 |
– |
++awaitRetries; |
943 |
– |
if (helpRetries < p.parallelism) |
944 |
– |
helpRetries <<= 1; |
941 |
|
Thread.yield(); // tame unbounded loop |
942 |
< |
} while (joinMe.status >= 0); |
942 |
> |
} |
943 |
|
} |
944 |
< |
currentJoin = prevJoin; |
944 |
> |
UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); |
945 |
|
return stat; |
946 |
|
} |
947 |
|
|
993 |
|
* caller anyway) to slightly simplify control paths. |
994 |
|
* |
995 |
|
* @param joinMe the task to join |
996 |
+ |
* @param rescans the number of times to recheck for work |
997 |
|
*/ |
998 |
< |
final void helpJoinTask(ForkJoinTask<?> joinMe, int retries) { |
998 |
> |
private void helpJoinTask(ForkJoinTask<?> joinMe, int rescans) { |
999 |
|
ForkJoinWorkerThread[] ws = pool.workers; |
1000 |
|
int n; |
1001 |
|
if (ws == null || (n = ws.length) <= 1) |
1002 |
|
return; // need at least 2 workers |
1003 |
< |
|
1007 |
< |
restart:while (joinMe.status >= 0 && --retries >= 0) { |
1003 |
> |
restart:while (rescans-- >= 0 && joinMe.status >= 0) { |
1004 |
|
ForkJoinTask<?> task = joinMe; // base of chain |
1005 |
|
ForkJoinWorkerThread thread = this; // thread with stolen task |
1006 |
|
for (int depth = 0; depth < MAX_HELP_DEPTH; ++depth) { |
1025 |
|
ForkJoinTask<?> t = q[i]; |
1026 |
|
if (task.status < 0) // stale |
1027 |
|
continue restart; |
1028 |
< |
if (v.base == b) { // recheck after reading t |
1029 |
< |
if (t == null) // producer stalled |
1030 |
< |
continue restart; // retry via restart |
1035 |
< |
if (UNSAFE.compareAndSwapObject(q, u, t, null)) { |
1028 |
> |
if (t != null) { |
1029 |
> |
if (v.base == b && |
1030 |
> |
UNSAFE.compareAndSwapObject(q, u, t, null)) { |
1031 |
|
if (joinMe.status < 0) { |
1032 |
|
UNSAFE.putObjectVolatile(q, u, t); |
1033 |
|
return; // back out on cancel |
1040 |
|
currentSteal = prevSteal; |
1041 |
|
} |
1042 |
|
} |
1043 |
+ |
else if (v.base == b) // producer stalled |
1044 |
+ |
continue restart; // retry via restart |
1045 |
|
if (joinMe.status < 0) |
1046 |
|
return; |
1047 |
|
} |
1048 |
|
// Try to descend to find v's stealer |
1049 |
|
ForkJoinTask<?> next = v.currentJoin; |
1050 |
< |
if (next == null || task.status < 0) |
1050 |
> |
if (next == null || next == task || task.status < 0) |
1051 |
|
continue restart; // no descendent or stale |
1052 |
|
if (joinMe.status < 0) |
1053 |
|
return; |
1138 |
|
private static final sun.misc.Unsafe UNSAFE = getUnsafe(); |
1139 |
|
private static final long runStateOffset = |
1140 |
|
objectFieldOffset("runState", ForkJoinWorkerThread.class); |
1141 |
+ |
private static final long currentJoinOffset = |
1142 |
+ |
objectFieldOffset("currentJoin", ForkJoinWorkerThread.class); |
1143 |
|
private static final long qBase = |
1144 |
|
UNSAFE.arrayBaseOffset(ForkJoinTask[].class); |
1145 |
|
private static final int qShift; |