105 |
|
* Dependent actions are represented by Completion objects linked |
106 |
|
* as Treiber stacks headed by field completions. There are four |
107 |
|
* kinds of Completions: single-source (UniCompletion), two-source |
108 |
< |
* (BiCompletion), shared (CoBiCompletion, used by the second |
108 |
> |
* (BiCompletion), shared (CoCompletion, used by the second |
109 |
|
* source of a BiCompletion), and Signallers that unblock waiters. |
110 |
|
* |
111 |
|
* The same patterns of methods and classes are used for each form |
137 |
|
* |
138 |
|
* Methods with two sources (for example thenCombine) must deal |
139 |
|
* with races across both while pushing actions. The second |
140 |
< |
* completion is an CoBiCompletion pointing to the first, shared |
140 |
> |
* completion is an CoCompletion pointing to the first, shared |
141 |
|
* to ensure that at most one claims and performs the action. The |
142 |
|
* multiple-arity method allOf does this pairwise to form a tree |
143 |
|
* of completions. (Method anyOf just uses a depth-one Or tree.) |
171 |
|
*/ |
172 |
|
|
173 |
|
volatile Object result; // Either the result or boxed AltResult |
174 |
< |
volatile Completion<?> completions; // Treiber stack of dependent actions |
174 |
> |
volatile Completion completions; // Treiber stack of dependent actions |
175 |
|
|
176 |
|
final boolean internalComplete(Object r) { // CAS from null to r |
177 |
|
return UNSAFE.compareAndSwapObject(this, RESULT, null, r); |
178 |
|
} |
179 |
|
|
180 |
< |
final boolean casCompletions(Completion<?> cmp, Completion<?> val) { |
180 |
> |
final boolean casCompletions(Completion cmp, Completion val) { |
181 |
|
return UNSAFE.compareAndSwapObject(this, COMPLETIONS, cmp, val); |
182 |
|
} |
183 |
|
|
317 |
|
|
318 |
|
/* ------------- Completions -------------- */ |
319 |
|
|
320 |
< |
abstract static class Completion<T> { // See above |
321 |
< |
volatile Completion<?> next; // Treiber stack link |
320 |
> |
abstract static class Completion { // See above |
321 |
> |
volatile Completion next; // Treiber stack link |
322 |
|
|
323 |
|
/** |
324 |
|
* Performs completion action if enabled, returning a |
337 |
|
* and run. It is extended along only one path at a time, |
338 |
|
* pushing others to avoid StackOverflowErrors on recursion. |
339 |
|
*/ |
340 |
< |
CompletableFuture<?> f = this; Completion<?> h; |
340 |
> |
CompletableFuture<?> f = this; Completion h; |
341 |
|
while ((h = f.completions) != null || |
342 |
|
(f != this && (h = (f = this).completions) != null)) { |
343 |
< |
CompletableFuture<?> d; Completion<?> t; |
343 |
> |
CompletableFuture<?> d; Completion t; |
344 |
|
if (f.casCompletions(h, t = h.next)) { |
345 |
|
if (t != null) { |
346 |
|
if (f != this) { // push |
362 |
|
* trigger. Fields can only be observed by other threads upon |
363 |
|
* successful push; and should be nulled out after claim. |
364 |
|
*/ |
365 |
< |
abstract static class UniCompletion<T> extends Completion<T> { |
365 |
> |
abstract static class UniCompletion<T> extends Completion { |
366 |
|
Executor async; // executor to use (null if none) |
367 |
|
CompletableFuture<T> dep; // the dependent to complete |
368 |
|
CompletableFuture<?> src; // source of value for tryAct |
392 |
|
} |
393 |
|
|
394 |
|
/** Pushes c on to completions, and triggers c if done. */ |
395 |
< |
private void unipush(UniCompletion<?> c) { |
395 |
> |
private void unipush(Completion c) { |
396 |
|
if (c != null) { |
397 |
|
CompletableFuture<?> d; |
398 |
|
while (result == null && !casCompletions(c.next = completions, c)) |
952 |
|
} |
953 |
|
} |
954 |
|
|
955 |
< |
/** A Completion delegating to a shared BiCompletion */ |
956 |
< |
static final class CoBiCompletion<T> extends Completion<T> { |
957 |
< |
BiCompletion<T> completion; |
958 |
< |
CoBiCompletion(BiCompletion<T> completion) { |
955 |
> |
/** A Completion delegating to another Completion */ |
956 |
> |
static final class CoCompletion extends Completion { |
957 |
> |
Completion completion; |
958 |
> |
CoCompletion(Completion completion) { |
959 |
|
this.completion = completion; |
960 |
|
} |
961 |
|
final CompletableFuture<?> tryAct() { |
962 |
< |
BiCompletion<T> c; |
963 |
< |
return (c = completion) == null ? null : c.tryAct(); |
962 |
> |
Completion c; CompletableFuture<?> d; |
963 |
> |
if ((c = completion) == null || (d = c.tryAct()) == null) |
964 |
> |
return null; |
965 |
> |
completion = null; // detach |
966 |
> |
return d; |
967 |
|
} |
968 |
|
} |
969 |
|
|
970 |
|
/* ------------- Two-source Anded -------------- */ |
971 |
|
|
972 |
|
/* Pushes c on to completions and o's completions unless both done. */ |
973 |
< |
private <U> void bipushAnded(CompletableFuture<?> o, BiCompletion<U> c) { |
973 |
> |
private void bipushAnded(CompletableFuture<?> o, Completion c) { |
974 |
|
if (c != null && o != null) { |
975 |
|
Object r; CompletableFuture<?> d; |
976 |
|
while ((r = result) == null && |
977 |
|
!casCompletions(c.next = completions, c)) |
978 |
|
c.next = null; |
979 |
|
if (o.result == null) { |
980 |
< |
Completion<U> q = (r != null) ? c : new CoBiCompletion<U>(c); |
980 |
> |
Completion q = (r != null) ? c : new CoCompletion(c); |
981 |
|
while (o.result == null && |
982 |
|
!o.casCompletions(q.next = o.completions, q)) |
983 |
|
q.next = null; |
1253 |
|
/* ------------- Two-source Ored -------------- */ |
1254 |
|
|
1255 |
|
/* Pushes c on to completions and o's completions unless either done. */ |
1256 |
< |
private <U> void bipushOred(CompletableFuture<?> o, BiCompletion<U> c) { |
1256 |
> |
private void bipushOred(CompletableFuture<?> o, Completion c) { |
1257 |
|
if (c != null && o != null) { |
1258 |
|
CompletableFuture<?> d; |
1259 |
|
while (o.result == null && result == null) { |
1260 |
|
if (casCompletions(c.next = completions, c)) { |
1261 |
< |
CoBiCompletion<U> q = new CoBiCompletion<U>(c); |
1261 |
> |
CoCompletion q = new CoCompletion(c); |
1262 |
|
while (result == null && o.result == null && |
1263 |
|
!o.casCompletions(q.next = o.completions, q)) |
1264 |
|
q.next = null; |
1395 |
|
* avoid starvation when blocking actions pile up in |
1396 |
|
* ForkJoinPools. |
1397 |
|
*/ |
1398 |
< |
static final class Signaller extends Completion<Void> |
1398 |
> |
static final class Signaller extends Completion |
1399 |
|
implements ForkJoinPool.ManagedBlocker { |
1400 |
|
long nanos; // wait time if timed |
1401 |
|
final long deadline; // non-zero if timed |
1530 |
|
* in case of an apparent race. |
1531 |
|
*/ |
1532 |
|
private void removeCancelledSignallers() { |
1533 |
< |
for (Completion<?> p = null, q = completions; q != null;) { |
1534 |
< |
Completion<?> s = q.next; |
1533 |
> |
for (Completion p = null, q = completions; q != null;) { |
1534 |
> |
Completion s = q.next; |
1535 |
|
if ((q instanceof Signaller) && ((Signaller)q).thread == null) { |
1536 |
|
if (p != null) { |
1537 |
|
p.next = s; |
2127 |
|
*/ |
2128 |
|
public int getNumberOfDependents() { |
2129 |
|
int count = 0; |
2130 |
< |
for (Completion<?> p = completions; p != null; p = p.next) |
2130 |
> |
for (Completion p = completions; p != null; p = p.next) |
2131 |
|
++count; |
2132 |
|
return count; |
2133 |
|
} |