635 |
|
return t; |
636 |
|
} |
637 |
|
} |
638 |
< |
j = -n; // reset on contention |
638 |
> |
j = -n; // reset on contention |
639 |
|
} |
640 |
|
k = j >= 0? k + ((n >>> 1) | 1) : r; |
641 |
|
} |
877 |
|
} |
878 |
|
|
879 |
|
/** |
880 |
< |
* Returns a stolen task, if available, unless joinMe is done |
880 |
> |
* Returns a popped or stolen task, if available, unless joinMe is done |
881 |
|
* |
882 |
|
* This method is intrinsically nonmodular. To maintain the |
883 |
|
* property that tasks are never stolen if the awaited task is |
889 |
|
* without adjusting index. The scan loop is otherwise the same as |
890 |
|
* in scan. |
891 |
|
* |
892 |
– |
* The outer loop cannot be allowed to run forever, because it |
893 |
– |
* could lead to a form of deadlock if all threads are executing |
894 |
– |
* this method. However, we must also be patient before giving up, |
895 |
– |
* to cope with GC stalls, transient high loads, etc. The loop |
896 |
– |
* terminates (causing caller to possibly block this thread and |
897 |
– |
* create a replacement) only after #workers clean sweeps during |
898 |
– |
* which all running threads are active. |
892 |
|
*/ |
893 |
|
final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) { |
894 |
< |
int sweeps = 0; |
895 |
< |
int r = seed; |
896 |
< |
ForkJoinPool p = pool; |
897 |
< |
p.releaseWaiters(); // help other threads progress |
898 |
< |
while (joinMe.status >= 0) { |
894 |
> |
ForkJoinTask<?> popped; // prefer local tasks |
895 |
> |
if (base != sp && (popped = popWhileJoining(joinMe)) != null) |
896 |
> |
return popped; |
897 |
> |
if (joinMe.status >= 0) { |
898 |
> |
ForkJoinPool p = pool; |
899 |
|
ForkJoinWorkerThread[] ws = p.workers; |
900 |
|
int n = ws.length; |
901 |
+ |
int r = seed; |
902 |
|
int k = r; |
903 |
< |
for (int j = -n; j < n; ++j) { |
903 |
> |
for (int j = -n; j < n && joinMe.status >= 0; ++j) { |
904 |
|
ForkJoinWorkerThread v = ws[k & (n - 1)]; |
905 |
|
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift |
906 |
|
if (v != null) { |
909 |
|
if (b != v.sp && (q = v.queue) != null) { |
910 |
|
int i = (q.length - 1) & b; |
911 |
|
ForkJoinTask<?> t = q[i]; |
912 |
< |
if (t != null) { |
913 |
< |
if (joinMe.status < 0) |
914 |
< |
return null; |
921 |
< |
if (UNSAFE.compareAndSwapObject |
922 |
< |
(q, (i << qShift) + qBase, t, null)) { |
923 |
< |
if (joinMe.status < 0) { |
924 |
< |
writeSlot(q, i, t); // back out |
925 |
< |
return null; |
926 |
< |
} |
912 |
> |
if (t != null && UNSAFE.compareAndSwapObject |
913 |
> |
(q, (i << qShift) + qBase, t, null)) { |
914 |
> |
if (joinMe.status >= 0) { |
915 |
|
v.base = b + 1; |
916 |
|
seed = r; |
917 |
|
++stealCount; |
918 |
|
return t; |
919 |
|
} |
920 |
+ |
UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t); |
921 |
+ |
break; // back out |
922 |
|
} |
923 |
< |
sweeps = 0; // ensure rescan on contention |
923 |
> |
j = -n; |
924 |
|
} |
925 |
|
} |
926 |
|
k = j >= 0? k + ((n >>> 1) | 1) : r; |
937 |
– |
if ((j & 7) == 0 && joinMe.status < 0) // periodically recheck |
938 |
– |
return null; |
927 |
|
} |
928 |
< |
if ((sweeps = p.inactiveCount() == 0 ? sweeps + 1 : 0) > n) |
929 |
< |
return null; |
928 |
> |
} |
929 |
> |
return null; |
930 |
> |
} |
931 |
> |
|
932 |
> |
/** |
933 |
> |
* Version of popTask with join checks surrounding extraction. |
934 |
> |
* Uses the same backout strategy as scanWhileJoining. Note that |
935 |
> |
* we ignore locallyFifo flag for local tasks here since helping |
936 |
> |
* joins only make sense in LIFO mode. |
937 |
> |
* |
938 |
> |
* @return a popped task, if available, unless joinMe is done |
939 |
> |
*/ |
940 |
> |
private ForkJoinTask<?> popWhileJoining(ForkJoinTask<?> joinMe) { |
941 |
> |
int s; |
942 |
> |
ForkJoinTask<?>[] q; |
943 |
> |
while ((s = sp) != base && (q = queue) != null && joinMe.status >= 0) { |
944 |
> |
int i = (q.length - 1) & --s; |
945 |
> |
ForkJoinTask<?> t = q[i]; |
946 |
> |
if (t != null && UNSAFE.compareAndSwapObject |
947 |
> |
(q, (i << qShift) + qBase, t, null)) { |
948 |
> |
if (joinMe.status >= 0) { |
949 |
> |
sp = s; |
950 |
> |
return t; |
951 |
> |
} |
952 |
> |
UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t); |
953 |
> |
break; // back out |
954 |
> |
} |
955 |
|
} |
956 |
|
return null; |
957 |
|
} |