633 |
|
return t; |
634 |
|
} |
635 |
|
} |
636 |
< |
j = -n; // reset on contention |
636 |
> |
j = -n; // reset on contention |
637 |
|
} |
638 |
|
k = j >= 0? k + ((n >>> 1) | 1) : r; |
639 |
|
} |
875 |
|
} |
876 |
|
|
877 |
|
/** |
878 |
< |
* Returns a stolen task, if available, unless joinMe is done |
878 |
> |
* Returns a popped or stolen task, if available, unless joinMe is done |
879 |
|
* |
880 |
|
* This method is intrinsically nonmodular. To maintain the |
881 |
|
* property that tasks are never stolen if the awaited task is |
887 |
|
* without adjusting index. The scan loop is otherwise the same as |
888 |
|
* in scan. |
889 |
|
* |
890 |
– |
* The outer loop cannot be allowed to run forever, because it |
891 |
– |
* could lead to a form of deadlock if all threads are executing |
892 |
– |
* this method. However, we must also be patient before giving up, |
893 |
– |
* to cope with GC stalls, transient high loads, etc. The loop |
894 |
– |
* terminates (causing caller to possibly block this thread and |
895 |
– |
* create a replacement) only after #workers clean sweeps during |
896 |
– |
* which all running threads are active. |
890 |
|
*/ |
891 |
|
final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) { |
892 |
< |
int sweeps = 0; |
893 |
< |
int r = seed; |
894 |
< |
ForkJoinPool p = pool; |
895 |
< |
p.releaseWaiters(); // help other threads progress |
896 |
< |
while (joinMe.status >= 0) { |
892 |
> |
ForkJoinTask<?> popped; // prefer local tasks |
893 |
> |
if (base != sp && (popped = popWhileJoining(joinMe)) != null) |
894 |
> |
return popped; |
895 |
> |
if (joinMe.status >= 0) { |
896 |
> |
ForkJoinPool p = pool; |
897 |
|
ForkJoinWorkerThread[] ws = p.workers; |
898 |
|
int n = ws.length; |
899 |
+ |
int r = seed; |
900 |
|
int k = r; |
901 |
< |
for (int j = -n; j < n; ++j) { |
901 |
> |
for (int j = -n; j < n && joinMe.status >= 0; ++j) { |
902 |
|
ForkJoinWorkerThread v = ws[k & (n - 1)]; |
903 |
|
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift |
904 |
|
if (v != null) { |
907 |
|
if (b != v.sp && (q = v.queue) != null) { |
908 |
|
int i = (q.length - 1) & b; |
909 |
|
ForkJoinTask<?> t = q[i]; |
910 |
< |
if (t != null) { |
911 |
< |
if (joinMe.status < 0) |
912 |
< |
return null; |
919 |
< |
if (UNSAFE.compareAndSwapObject |
920 |
< |
(q, (i << qShift) + qBase, t, null)) { |
921 |
< |
if (joinMe.status < 0) { |
922 |
< |
writeSlot(q, i, t); // back out |
923 |
< |
return null; |
924 |
< |
} |
910 |
> |
if (t != null && UNSAFE.compareAndSwapObject |
911 |
> |
(q, (i << qShift) + qBase, t, null)) { |
912 |
> |
if (joinMe.status >= 0) { |
913 |
|
v.base = b + 1; |
914 |
|
seed = r; |
915 |
|
++stealCount; |
916 |
|
return t; |
917 |
|
} |
918 |
+ |
UNSAFE.putObjectVolatile(q, (i<<qShift)+qBase, t); |
919 |
+ |
break; // back out |
920 |
|
} |
921 |
< |
sweeps = 0; // ensure rescan on contention |
921 |
> |
j = -n; |
922 |
|
} |
923 |
|
} |
924 |
|
k = j >= 0? k + ((n >>> 1) | 1) : r; |
935 |
– |
if ((j & 7) == 0 && joinMe.status < 0) // periodically recheck |
936 |
– |
return null; |
925 |
|
} |
926 |
< |
if ((sweeps = p.inactiveCount() == 0 ? sweeps + 1 : 0) > n) |
927 |
< |
return null; |
926 |
> |
} |
927 |
> |
return null; |
928 |
> |
} |
929 |
> |
|
930 |
> |
/** |
931 |
> |
* Version of popTask with join checks surrounding extraction. |
932 |
> |
* Uses the same backout strategy as scanWhileJoining. Note that |
933 |
> |
* we ignore locallyFifo flag for local tasks here since helping |
934 |
> |
* joins only make sense in LIFO mode. |
935 |
> |
* |
936 |
> |
* @return a popped task, if available, unless joinMe is done |
937 |
> |
*/ |
938 |
> |
private ForkJoinTask<?> popWhileJoining(ForkJoinTask<?> joinMe) { |
939 |
> |
int s; |
940 |
> |
ForkJoinTask<?>[] q; |
941 |
> |
while ((s = sp) != base && (q = queue) != null && joinMe.status >= 0) { |
942 |
> |
int i = (q.length - 1) & --s; |
943 |
> |
ForkJoinTask<?> t = q[i]; |
944 |
> |
if (t != null && UNSAFE.compareAndSwapObject |
945 |
> |
(q, (i << qShift) + qBase, t, null)) { |
946 |
> |
if (joinMe.status >= 0) { |
947 |
> |
sp = s; |
948 |
> |
return t; |
949 |
> |
} |
950 |
> |
UNSAFE.putObjectVolatile(q, (i << qShift) + qBase, t); |
951 |
> |
break; // back out |
952 |
> |
} |
953 |
|
} |
954 |
|
return null; |
955 |
|
} |