ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.109
Committed: Sat May 3 12:16:03 2014 UTC (10 years, 1 month ago) by dl
Branch: MAIN
Changes since 1.108: +47 -44 lines
Log Message:
screen explicit commonPool arguments, for compatibility

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.locks.LockSupport;
25
26 /**
27 * A {@link Future} that may be explicitly completed (setting its
28 * value and status), and may be used as a {@link CompletionStage},
29 * supporting dependent functions and actions that trigger upon its
30 * completion.
31 *
32 * <p>When two or more threads attempt to
33 * {@link #complete complete},
34 * {@link #completeExceptionally completeExceptionally}, or
35 * {@link #cancel cancel}
36 * a CompletableFuture, only one of them succeeds.
37 *
38 * <p>In addition to these and related methods for directly
39 * manipulating status and results, CompletableFuture implements
40 * interface {@link CompletionStage} with the following policies: <ul>
41 *
42 * <li>Actions supplied for dependent completions of
43 * <em>non-async</em> methods may be performed by the thread that
44 * completes the current CompletableFuture, or by any other caller of
45 * a completion method.</li>
46 *
47 * <li>All <em>async</em> methods without an explicit Executor
48 * argument are performed using the {@link ForkJoinPool#commonPool()}
49 * (unless it does not support a parallelism level of at least two, in
50 * which case, a new Thread is used). To simplify monitoring,
51 * debugging, and tracking, all generated asynchronous tasks are
52 * instances of the marker interface {@link
53 * AsynchronousCompletionTask}. </li>
54 *
55 * <li>All CompletionStage methods are implemented independently of
56 * other public methods, so the behavior of one method is not impacted
57 * by overrides of others in subclasses. </li> </ul>
58 *
59 * <p>CompletableFuture also implements {@link Future} with the following
60 * policies: <ul>
61 *
62 * <li>Since (unlike {@link FutureTask}) this class has no direct
63 * control over the computation that causes it to be completed,
64 * cancellation is treated as just another form of exceptional
65 * completion. Method {@link #cancel cancel} has the same effect as
66 * {@code completeExceptionally(new CancellationException())}. Method
67 * {@link #isCompletedExceptionally} can be used to determine if a
68 * CompletableFuture completed in any exceptional fashion.</li>
69 *
70 * <li>In case of exceptional completion with a CompletionException,
71 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
72 * {@link ExecutionException} with the same cause as held in the
73 * corresponding CompletionException. To simplify usage in most
74 * contexts, this class also defines methods {@link #join()} and
75 * {@link #getNow} that instead throw the CompletionException directly
76 * in these cases.</li> </ul>
77 *
78 * @author Doug Lea
79 * @since 1.8
80 */
81 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
82
83 /*
84 * Overview:
85 *
86 * A CompletableFuture may have dependent completion actions,
87 * collected in a linked stack. It atomically completes by CASing
88 * a result field, and then pops off and runs those actions. This
89 * applies across normal vs exceptional outcomes, sync vs async
90 * actions, binary triggers, and various forms of completions.
91 *
92 * Non-nullness of field result (set via CAS) indicates done. An
93 * AltResult is used to box null as a result, as well as to hold
94 * exceptions. Using a single field makes completion simple to
95 * detect and trigger. Encoding and decoding is straightforward
96 * but adds vertical sprawl. One minor simplification relies on
97 * the (static) NIL (to box null results) being the only AltResult
98 * with a null exception field, so we don't usually need explicit
99 * comparisons with NIL. Exception propagation mechanics
100 * surrounding decoding rely on unchecked casts of decoded results
101 * really being unchecked, and user type errors being caught at
102 * point of use, as is currently the case in Java. These are
103 * highlighted by using SuppressWarnings annotated temporaries.
104 *
105 * Dependent actions are represented by Completion objects linked
106 * as Treiber stacks headed by field completions. There are four
107 * kinds of Completions: single-source (UniCompletion), two-source
108 * (BiCompletion), shared (CoBiCompletion, used by the second
109 * source of a BiCompletion), and Signallers that unblock waiters.
110 *
111 * The same patterns of methods and classes are used for each form
112 * of Completion (apply, combine, etc), and are written in a
113 * similar style. For each form X there is, when applicable:
114 *
115 * * Method nowX (for example nowApply) that immediately executes
116 * a supplied function and sets result
117 * * Class AsyncX class (for example AsyncApply) that calls nowX
118 * from another task,
119 * * Class DelayedX (for example DelayedApply) that holds
120 * arguments and calls nowX when ready.
121 *
122 * For each public CompletionStage method M* (for example
123 * thenApply{Async}), there is a method doM (for example
124 * doThenApply) that creates and/or invokes the appropriate form.
125 * Each deals with three cases that can arise when adding a
126 * dependent completion to CompletableFuture f:
127 *
128 * * f is already complete, so the dependent action is run
129 * immediately, via a "now" method, which, if async,
130 * starts the action in a new task.
131 * * f is not complete, so a Completion action is created and
132 * pushed to f's completions. It is triggered via
133 * f.postComplete when f completes.
134 * * f is not complete, but completes while adding the completion
135 * action, so we try to trigger it upon adding (see method
136 * unipush and derivatives) to cover races.
137 *
138 * Methods with two sources (for example thenCombine) must deal
139 * with races across both while pushing actions. The second
140 * completion is an CoBiCompletion pointing to the first, shared
141 * to ensure that at most one claims and performs the action. The
142 * multiple-arity method allOf does this pairwise to form a tree
143 * of completions. (Method anyOf just uses a depth-one Or tree.)
144 *
145 * Upon setting results, method postComplete is called unless
146 * the target is guaranteed not to be observable (i.e., not yet
147 * returned or linked). Multiple threads can call postComplete,
148 * which atomically pops each dependent action, and tries to
149 * trigger it via method tryAct. Any such action must be performed
150 * only once, even if called from several threads, so Completions
151 * maintain status via CAS, and on success run one of the "now"
152 * methods. Triggering can propagate recursively, so tryAct
153 * returns a completed dependent (if one exists) for further
154 * processing by its caller.
155 *
156 * Blocking methods get() and join() rely on Signaller Completions
157 * that wake up waiting threads. The mechanics are similar to
158 * Treiber stack wait-nodes used in FutureTask, Phaser, and
159 * SynchronousQueue. See their internal documentation for
160 * algorithmic details.
161 *
162 * Without precautions, CompletableFutures would be prone to
163 * garbage accumulation as chains of completions build up, each
164 * pointing back to its sources. So we detach (null out) most
165 * Completion fields as soon as possible. To support this,
166 * internal methods check for and harmlessly ignore null arguments
167 * that may have been obtained during races with threads nulling
168 * out fields. (Some of these checked cases cannot currently
169 * happen.) Fields of Async classes can be but currently are not
170 * fully detached, because they do not in general form cycles.
171 */
172
173 volatile Object result; // Either the result or boxed AltResult
174 volatile Completion<?> completions; // Treiber stack of dependent actions
175
176 final boolean internalComplete(Object r) { // CAS from null to r
177 return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
178 }
179
180 final boolean casCompletions(Completion<?> cmp, Completion<?> val) {
181 return UNSAFE.compareAndSwapObject(this, COMPLETIONS, cmp, val);
182 }
183
184 /* ------------- Encoding and decoding outcomes -------------- */
185
186 static final class AltResult { // See above
187 final Throwable ex; // null only for NIL
188 AltResult(Throwable x) { this.ex = x; }
189 }
190
191 static final AltResult NIL = new AltResult(null);
192
193 /**
194 * Returns the encoding of the given (non-null) exception as a
195 * wrapped CompletionException unless it is one already.
196 */
197 static AltResult altThrowable(Throwable x) {
198 return new AltResult((x instanceof CompletionException) ? x :
199 new CompletionException(x));
200 }
201
202 /**
203 * Returns the encoding of the given arguments: if the exception
204 * is non-null, encodes as altThrowable. Otherwise uses the given
205 * value, boxed as NIL if null.
206 */
207 static Object encodeOutcome(Object v, Throwable x) {
208 return (x != null) ? altThrowable(x) : (v == null) ? NIL : v;
209 }
210
211 /**
212 * Decodes outcome to return result or throw unchecked exception.
213 */
214 private static <T> T reportJoin(Object r) {
215 if (r instanceof AltResult) {
216 Throwable x;
217 if ((x = ((AltResult)r).ex) == null)
218 return null;
219 if (x instanceof CancellationException)
220 throw (CancellationException)x;
221 if (x instanceof CompletionException)
222 throw (CompletionException)x;
223 throw new CompletionException(x);
224 }
225 @SuppressWarnings("unchecked") T tr = (T) r;
226 return tr;
227 }
228
229 /**
230 * Reports result using Future.get conventions.
231 */
232 private static <T> T reportGet(Object r)
233 throws InterruptedException, ExecutionException {
234 if (r == null) // by convention below, null means interrupted
235 throw new InterruptedException();
236 if (r instanceof AltResult) {
237 Throwable x, cause;
238 if ((x = ((AltResult)r).ex) == null)
239 return null;
240 if (x instanceof CancellationException)
241 throw (CancellationException)x;
242 if ((x instanceof CompletionException) &&
243 (cause = x.getCause()) != null)
244 x = cause;
245 throw new ExecutionException(x);
246 }
247 @SuppressWarnings("unchecked") T tr = (T) r;
248 return tr;
249 }
250
251 /* ------------- Async Tasks -------------- */
252
253 /**
254 * A marker interface identifying asynchronous tasks produced by
255 * {@code async} methods. This may be useful for monitoring,
256 * debugging, and tracking asynchronous activities.
257 *
258 * @since 1.8
259 */
260 public static interface AsynchronousCompletionTask {
261 }
262
263 /**
264 * Base class for tasks that can act as either FJ or plain
265 * Runnables. Abstract method compute calls an associated "now"
266 * method. Method exec calls compute if its CompletableFuture is
267 * not already done, and runs completions if done. Fields are not
268 * in general final and can be nulled out after use (but most
269 * currently are not). Classes include serialVersionUIDs even
270 * though they are currently never serialized.
271 */
272 abstract static class Async<T> extends ForkJoinTask<Void>
273 implements Runnable, AsynchronousCompletionTask {
274 CompletableFuture<T> dep; // the CompletableFuture to trigger
275 Async(CompletableFuture<T> dep) { this.dep = dep; }
276
277 abstract void compute(); // call the associated "now" method
278
279 public final boolean exec() {
280 CompletableFuture<T> d;
281 if ((d = dep) != null) {
282 if (d.result == null) // suppress if cancelled
283 compute();
284 if (d.result != null)
285 d.postComplete();
286 dep = null; // detach
287 }
288 return true;
289 }
290 public final Void getRawResult() { return null; }
291 public final void setRawResult(Void v) { }
292 public final void run() { exec(); }
293 private static final long serialVersionUID = 5232453952276885070L;
294 }
295
296 /**
297 * Default executor -- ForkJoinPool.commonPool() unless it cannot
298 * support parallelism.
299 */
300 static final Executor asyncPool =
301 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
302 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
303
304 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
305 static final class ThreadPerTaskExecutor implements Executor {
306 public void execute(Runnable r) { new Thread(r).start(); }
307 }
308
309 /**
310 * Null-checks user executor argument, and translates uses of
311 * commonPool to asyncPool in case parallelism disabled.
312 */
313 static Executor screenExecutor(Executor e) {
314 if (e == null) throw new NullPointerException();
315 return (e == ForkJoinPool.commonPool()) ? asyncPool : e;
316 }
317
318 /* ------------- Completions -------------- */
319
320 abstract static class Completion<T> { // See above
321 volatile Completion<?> next; // Treiber stack link
322
323 /**
324 * Performs completion action if enabled, returning a
325 * completed dependent CompletableFuture, if one exists.
326 */
327 abstract CompletableFuture<?> tryAct();
328 }
329
330 /**
331 * Triggers all reachable enabled dependents. Call only when
332 * known to be done.
333 */
334 final void postComplete() {
335 /*
336 * On each step, variable f holds current completions to pop
337 * and run. It is extended along only one path at a time,
338 * pushing others to avoid StackOverflowErrors on recursion.
339 */
340 CompletableFuture<?> f = this; Completion<?> h;
341 while ((h = f.completions) != null ||
342 (f != this && (h = (f = this).completions) != null)) {
343 CompletableFuture<?> d; Completion<?> t;
344 if (f.casCompletions(h, t = h.next)) {
345 if (t != null) {
346 if (f != this) { // push
347 do {} while (!casCompletions(h.next = completions, h));
348 continue;
349 }
350 h.next = null; // detach
351 }
352 f = (d = h.tryAct()) == null ? this : d;
353 }
354 }
355 }
356
357 /* ------------- One-source Completions -------------- */
358
359 /**
360 * A Completion with a source and dependent. The "dep" field acts
361 * as a claim, nulled out to disable further attempts to
362 * trigger. Fields can only be observed by other threads upon
363 * successful push; and should be nulled out after claim.
364 */
365 abstract static class UniCompletion<T> extends Completion<T> {
366 Executor async; // executor to use (null if none)
367 CompletableFuture<T> dep; // the dependent to complete
368 CompletableFuture<?> src; // source of value for tryAct
369
370 UniCompletion(Executor async, CompletableFuture<T> dep,
371 CompletableFuture<?> src) {
372 this.async = async; this.dep = dep; this.src = src;
373 }
374
375 /** Tries to claim completion action by CASing dep to null */
376 final boolean claim(CompletableFuture<T> d) {
377 return UNSAFE.compareAndSwapObject(this, DEP, d, null);
378 }
379
380 private static final sun.misc.Unsafe UNSAFE;
381 private static final long DEP;
382 static {
383 try {
384 UNSAFE = sun.misc.Unsafe.getUnsafe();
385 Class<?> k = UniCompletion.class;
386 DEP = UNSAFE.objectFieldOffset
387 (k.getDeclaredField("dep"));
388 } catch (Exception x) {
389 throw new Error(x);
390 }
391 }
392 }
393
394 /** Pushes c on to completions, and triggers c if done. */
395 private void unipush(UniCompletion<?> c) {
396 if (c != null) {
397 CompletableFuture<?> d;
398 while (result == null && !casCompletions(c.next = completions, c))
399 c.next = null; // clear on CAS failure
400 if ((d = c.tryAct()) != null) // cover races
401 d.postComplete();
402 if (result != null) // clean stack
403 postComplete();
404 }
405 }
406
407 // Immediate, async, delayed, and routing support for Function/apply
408
409 static <T,U> void nowApply(Executor e, CompletableFuture<U> d, Object r,
410 Function<? super T,? extends U> f) {
411 if (d != null && f != null) {
412 T t; U u; Throwable x;
413 if (r instanceof AltResult) {
414 t = null;
415 x = ((AltResult)r).ex;
416 }
417 else {
418 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
419 x = null;
420 }
421 if (x == null) {
422 try {
423 if (e != null) {
424 e.execute(new AsyncApply<T,U>(d, t, f));
425 return;
426 }
427 u = f.apply(t);
428 } catch (Throwable ex) {
429 x = ex;
430 u = null;
431 }
432 }
433 else
434 u = null;
435 d.internalComplete(encodeOutcome(u, x));
436 }
437 }
438
439 static final class AsyncApply<T,U> extends Async<U> {
440 T arg; Function<? super T,? extends U> fn;
441 AsyncApply(CompletableFuture<U> dep, T arg,
442 Function<? super T,? extends U> fn) {
443 super(dep); this.arg = arg; this.fn = fn;
444 }
445 final void compute() { nowApply(null, dep, arg, fn); }
446 private static final long serialVersionUID = 5232453952276885070L;
447 }
448
449 static final class DelayedApply<T,U> extends UniCompletion<U> {
450 Function<? super T,? extends U> fn;
451 DelayedApply(Executor async, CompletableFuture<U> dep,
452 CompletableFuture<?> src,
453 Function<? super T,? extends U> fn) {
454 super(async, dep, src); this.fn = fn;
455 }
456 final CompletableFuture<?> tryAct() {
457 CompletableFuture<U> d; CompletableFuture<?> a; Object r;
458 if ((d = dep) != null && (a = src) != null &&
459 (r = a.result) != null && claim(d)) {
460 nowApply(async, d, r, fn);
461 src = null; fn = null;
462 if (d.result != null) return d;
463 }
464 return null;
465 }
466 }
467
468 private <U> CompletableFuture<U> doThenApply(
469 Function<? super T,? extends U> fn, Executor e) {
470 if (fn == null) throw new NullPointerException();
471 CompletableFuture<U> d = new CompletableFuture<U>();
472 Object r = result;
473 if (r == null)
474 unipush(new DelayedApply<T,U>(e, d, this, fn));
475 else
476 nowApply(e, d, r, fn);
477 return d;
478 }
479
480 // Consumer/accept
481
482 static <T,U> void nowAccept(Executor e, CompletableFuture<U> d,
483 Object r, Consumer<? super T> f) {
484 if (d != null && f != null) {
485 T t; Throwable x;
486 if (r instanceof AltResult) {
487 t = null;
488 x = ((AltResult)r).ex;
489 }
490 else {
491 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
492 x = null;
493 }
494 if (x == null) {
495 try {
496 if (e != null) {
497 e.execute(new AsyncAccept<T,U>(d, t, f));
498 return;
499 }
500 f.accept(t);
501 } catch (Throwable ex) {
502 x = ex;
503 }
504 }
505 d.internalComplete(encodeOutcome(null, x));
506 }
507 }
508
509 static final class AsyncAccept<T,U> extends Async<U> {
510 T arg; Consumer<? super T> fn;
511 AsyncAccept(CompletableFuture<U> dep, T arg,
512 Consumer<? super T> fn) {
513 super(dep); this.arg = arg; this.fn = fn;
514 }
515 final void compute() { nowAccept(null, dep, arg, fn); }
516 private static final long serialVersionUID = 5232453952276885070L;
517 }
518
519 static final class DelayedAccept<T> extends UniCompletion<Void> {
520 Consumer<? super T> fn;
521 DelayedAccept(Executor async, CompletableFuture<Void> dep,
522 CompletableFuture<?> src, Consumer<? super T> fn) {
523 super(async, dep, src); this.fn = fn;
524 }
525 final CompletableFuture<?> tryAct() {
526 CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
527 if ((d = dep) != null && (a = src) != null &&
528 (r = a.result) != null && claim(d)) {
529 nowAccept(async, d, r, fn);
530 src = null; fn = null;
531 if (d.result != null) return d;
532 }
533 return null;
534 }
535 }
536
537 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
538 Executor e) {
539 if (fn == null) throw new NullPointerException();
540 CompletableFuture<Void> d = new CompletableFuture<Void>();
541 Object r = result;
542 if (r == null)
543 unipush(new DelayedAccept<T>(e, d, this, fn));
544 else
545 nowAccept(e, d, r, fn);
546 return d;
547 }
548
549 // Runnable/run
550
551 static <T> void nowRun(Executor e, CompletableFuture<T> d, Object r,
552 Runnable f) {
553 if (d != null && f != null) {
554 Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
555 if (x == null) {
556 try {
557 if (e != null) {
558 e.execute(new AsyncRun<T>(d, f));
559 return;
560 }
561 f.run();
562 } catch (Throwable ex) {
563 x = ex;
564 }
565 }
566 d.internalComplete(encodeOutcome(null, x));
567 }
568 }
569
570 static final class AsyncRun<T> extends Async<T> {
571 Runnable fn;
572 AsyncRun(CompletableFuture<T> dep, Runnable fn) {
573 super(dep); this.fn = fn;
574 }
575 final void compute() { nowRun(null, dep, null, fn); }
576 private static final long serialVersionUID = 5232453952276885070L;
577 }
578
579 static final class DelayedRun extends UniCompletion<Void> {
580 Runnable fn;
581 DelayedRun(Executor async, CompletableFuture<Void> dep,
582 CompletableFuture<?> src, Runnable fn) {
583 super(async, dep, src); this.fn = fn;
584 }
585 final CompletableFuture<?> tryAct() {
586 CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
587 if ((d = dep) != null && (a = src) != null &&
588 (r = a.result) != null && claim(d)) {
589 nowRun(async, d, r, fn);
590 src = null; fn = null; // clear refs
591 if (d.result != null) return d;
592 }
593 return null;
594 }
595 }
596
597 private CompletableFuture<Void> doThenRun(Runnable fn, Executor e) {
598 if (fn == null) throw new NullPointerException();
599 CompletableFuture<Void> d = new CompletableFuture<Void>();
600 Object r = result;
601 if (r == null)
602 unipush(new DelayedRun(e, d, this, fn));
603 else
604 nowRun(e, d, r, fn);
605 return d;
606 }
607
608 // Supplier/get
609
610 static <T> void nowSupply(CompletableFuture<T> d, Supplier<T> f) {
611 if (d != null && f != null) {
612 T t; Throwable x;
613 try {
614 t = f.get();
615 x = null;
616 } catch (Throwable ex) {
617 x = ex;
618 t = null;
619 }
620 d.internalComplete(encodeOutcome(t, x));
621 }
622 }
623
624 static final class AsyncSupply<T> extends Async<T> {
625 Supplier<T> fn;
626 AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
627 super(dep); this.fn = fn;
628 }
629 final void compute() { nowSupply(dep, fn); }
630 private static final long serialVersionUID = 5232453952276885070L;
631 }
632
633 // WhenComplete
634
635 static <T> void nowWhen(Executor e, CompletableFuture<T> d, Object r,
636 BiConsumer<? super T,? super Throwable> f) {
637 if (d != null && f != null) {
638 T t; Throwable x, dx;
639 if (r instanceof AltResult) {
640 t = null;
641 x = ((AltResult)r).ex;
642 }
643 else {
644 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
645 x = null;
646 }
647 try {
648 if (e != null) {
649 e.execute(new AsyncWhen<T>(d, r, f));
650 return;
651 }
652 f.accept(t, x);
653 dx = null;
654 } catch (Throwable ex) {
655 dx = ex;
656 }
657 d.internalComplete(encodeOutcome(t, x != null ? x : dx));
658 }
659 }
660
661 static final class AsyncWhen<T> extends Async<T> {
662 Object arg; BiConsumer<? super T,? super Throwable> fn;
663 AsyncWhen(CompletableFuture<T> dep, Object arg,
664 BiConsumer<? super T,? super Throwable> fn) {
665 super(dep); this.arg = arg; this.fn = fn;
666 }
667 final void compute() { nowWhen(null, dep, arg, fn); }
668 private static final long serialVersionUID = 5232453952276885070L;
669 }
670
671 static final class DelayedWhen<T> extends UniCompletion<T> {
672 BiConsumer<? super T, ? super Throwable> fn;
673 DelayedWhen(Executor async, CompletableFuture<T> dep,
674 CompletableFuture<?> src,
675 BiConsumer<? super T, ? super Throwable> fn) {
676 super(async, dep, src); this.fn = fn;
677 }
678 final CompletableFuture<?> tryAct() {
679 CompletableFuture<T> d; CompletableFuture<?> a; Object r;
680 if ((d = dep) != null && (a = src) != null &&
681 (r = a.result) != null && claim(d)) {
682 nowWhen(async, d, r, fn);
683 src = null; fn = null;
684 if (d.result != null) return d;
685 }
686 return null;
687 }
688 }
689
690 private CompletableFuture<T> doWhenComplete(
691 BiConsumer<? super T, ? super Throwable> fn, Executor e) {
692 if (fn == null) throw new NullPointerException();
693 CompletableFuture<T> d = new CompletableFuture<T>();
694 Object r = result;
695 if (r == null)
696 unipush(new DelayedWhen<T>(e, d, this, fn));
697 else
698 nowWhen(e, d, r, fn);
699 return d;
700 }
701
702 // Handle
703
704 static <T,U> void nowHandle(Executor e, CompletableFuture<U> d, Object r,
705 BiFunction<? super T, Throwable, ? extends U> f) {
706 if (d != null && f != null) {
707 T t; U u; Throwable x, dx;
708 if (r instanceof AltResult) {
709 t = null;
710 x = ((AltResult)r).ex;
711 }
712 else {
713 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
714 x = null;
715 }
716 try {
717 if (e != null) {
718 e.execute(new AsyncCombine<T,Throwable,U>(d, t, x, f));
719 return;
720 }
721 u = f.apply(t, x);
722 dx = null;
723 } catch (Throwable ex) {
724 dx = ex;
725 u = null;
726 }
727 d.internalComplete(encodeOutcome(u, dx));
728 }
729 }
730
731 static final class DelayedHandle<T,U> extends UniCompletion<U> {
732 BiFunction<? super T, Throwable, ? extends U> fn;
733 DelayedHandle(Executor async, CompletableFuture<U> dep,
734 CompletableFuture<?> src,
735 BiFunction<? super T, Throwable, ? extends U> fn) {
736 super(async, dep, src); this.fn = fn;
737 }
738 final CompletableFuture<?> tryAct() {
739 CompletableFuture<U> d; CompletableFuture<?> a; Object r;
740 if ((d = dep) != null && (a = src) != null &&
741 (r = a.result) != null && claim(d)) {
742 nowHandle(async, d, r, fn);
743 src = null; fn = null;
744 if (d.result != null) return d;
745 }
746 return null;
747 }
748 }
749
750 private <U> CompletableFuture<U> doHandle(
751 BiFunction<? super T, Throwable, ? extends U> fn,
752 Executor e) {
753 if (fn == null) throw new NullPointerException();
754 CompletableFuture<U> d = new CompletableFuture<U>();
755 Object r = result;
756 if (r == null)
757 unipush(new DelayedHandle<T,U>(e, d, this, fn));
758 else
759 nowHandle(e, d, r, fn);
760 return d;
761 }
762
763 // Exceptionally
764
765 static <T> void nowExceptionally(CompletableFuture<T> d, Object r,
766 Function<? super Throwable, ? extends T> f) {
767 if (d != null && f != null) {
768 T t; Throwable x, dx;
769 if ((r instanceof AltResult) && (x = ((AltResult)r).ex) != null) {
770 try {
771 t = f.apply(x);
772 dx = null;
773 } catch (Throwable ex) {
774 dx = ex;
775 t = null;
776 }
777 }
778 else {
779 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
780 dx = null;
781 }
782 d.internalComplete(encodeOutcome(t, dx));
783 }
784 }
785
786 static final class DelayedExceptionally<T> extends UniCompletion<T> {
787 Function<? super Throwable, ? extends T> fn;
788 DelayedExceptionally(CompletableFuture<T> dep, CompletableFuture<?> src,
789 Function<? super Throwable, ? extends T> fn) {
790 super(null, dep, src); this.fn = fn;
791 }
792 final CompletableFuture<?> tryAct() {
793 CompletableFuture<T> d; CompletableFuture<?> a; Object r;
794 if ((d = dep) != null && (a = src) != null &&
795 (r = a.result) != null && claim(d)) {
796 nowExceptionally(d, r, fn);
797 src = null; fn = null;
798 if (d.result != null) return d;
799 }
800 return null;
801 }
802 }
803
804 private CompletableFuture<T> doExceptionally(
805 Function<Throwable, ? extends T> fn) {
806 if (fn == null) throw new NullPointerException();
807 CompletableFuture<T> d = new CompletableFuture<T>();
808 Object r = result;
809 if (r == null)
810 unipush(new DelayedExceptionally<T>(d, this, fn));
811 else
812 nowExceptionally(d, r, fn);
813 return d;
814 }
815
816 // Identity function used by nowCompose and anyOf
817
818 static <T> void nowCopy(CompletableFuture<T> d, Object r) {
819 if (d != null && d.result == null) {
820 Throwable x;
821 d.internalComplete(((r instanceof AltResult) &&
822 (x = ((AltResult)r).ex) != null &&
823 !(x instanceof CompletionException)) ?
824 new AltResult(new CompletionException(x)): r);
825 }
826 }
827
828 static final class DelayedCopy<T> extends UniCompletion<T> {
829 DelayedCopy(CompletableFuture<T> dep, CompletableFuture<?> src) {
830 super(null, dep, src);
831 }
832 final CompletableFuture<?> tryAct() {
833 CompletableFuture<T> d; CompletableFuture<?> a; Object r;
834 if ((d = dep) != null && (a = src) != null &&
835 (r = a.result) != null && claim(d)) {
836 nowCopy(d, r);
837 src = null;
838 if (d.result != null) return d;
839 }
840 return null;
841 }
842 }
843
844 // Compose
845
846 static <T,U> void nowCompose(Executor e, CompletableFuture<U> d, Object r,
847 Function<? super T, ? extends CompletionStage<U>> f) {
848 if (d != null && f != null) {
849 T t; Throwable x;
850 if (r instanceof AltResult) {
851 t = null;
852 x = ((AltResult)r).ex;
853 }
854 else {
855 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
856 x = null;
857 }
858 if (x == null) {
859 try {
860 if (e != null)
861 e.execute(new AsyncCompose<T,U>(d, t, f));
862 else {
863 CompletableFuture<U> c =
864 f.apply(t).toCompletableFuture();
865 Object s = c.result;
866 if (s == null)
867 c.unipush(new DelayedCopy<U>(d, c));
868 else
869 nowCopy(d, s);
870 }
871 return;
872 } catch (Throwable ex) {
873 x = ex;
874 }
875 }
876 d.internalComplete(encodeOutcome(null, x));
877 }
878 }
879
880 static final class AsyncCompose<T,U> extends Async<U> {
881 T arg; Function<? super T, ? extends CompletionStage<U>> fn;
882 AsyncCompose(CompletableFuture<U> dep, T arg,
883 Function<? super T, ? extends CompletionStage<U>> fn) {
884 super(dep); this.arg = arg; this.fn = fn;
885 }
886 final void compute() { nowCompose(null, dep, arg, fn); }
887 private static final long serialVersionUID = 5232453952276885070L;
888 }
889
890 static final class DelayedCompose<T,U> extends UniCompletion<U> {
891 Function<? super T, ? extends CompletionStage<U>> fn;
892 DelayedCompose(Executor async, CompletableFuture<U> dep,
893 CompletableFuture<?> src,
894 Function<? super T, ? extends CompletionStage<U>> fn) {
895 super(async, dep, src); this.fn = fn;
896 }
897 final CompletableFuture<?> tryAct() {
898 CompletableFuture<U> d; CompletableFuture<?> a; Object r;
899 if ((d = dep) != null && (a = src) != null &&
900 (r = a.result) != null && claim(d)) {
901 nowCompose(async, d, r, fn);
902 src = null; fn = null;
903 if (d.result != null) return d;
904 }
905 return null;
906 }
907 }
908
909 private <U> CompletableFuture<U> doThenCompose(
910 Function<? super T, ? extends CompletionStage<U>> fn, Executor e) {
911 if (fn == null) throw new NullPointerException();
912 Object r = result;
913 if (r == null || e != null) {
914 CompletableFuture<U> d = new CompletableFuture<U>();
915 if (r == null)
916 unipush(new DelayedCompose<T,U>(e, d, this, fn));
917 else
918 nowCompose(e, d, r, fn);
919 return d;
920 }
921 else { // try to return function result
922 T t; Throwable x;
923 if (r instanceof AltResult) {
924 t = null;
925 x = ((AltResult)r).ex;
926 }
927 else {
928 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
929 x = null;
930 }
931 if (x == null) {
932 try {
933 return fn.apply(t).toCompletableFuture();
934 } catch (Throwable ex) {
935 x = ex;
936 }
937 }
938 CompletableFuture<U> d = new CompletableFuture<U>();
939 d.result = encodeOutcome(null, x);
940 return d;
941 }
942 }
943
944 /* ------------- Two-source Completions -------------- */
945
946 /** A Completion with two sources */
947 abstract static class BiCompletion<T> extends UniCompletion<T> {
948 CompletableFuture<?> snd; // second source for tryAct
949 BiCompletion(Executor async, CompletableFuture<T> dep,
950 CompletableFuture<?> src, CompletableFuture<?> snd) {
951 super(async, dep, src); this.snd = snd;
952 }
953 }
954
955 /** A Completion delegating to a shared BiCompletion */
956 static final class CoBiCompletion<T> extends Completion<T> {
957 BiCompletion<T> completion;
958 CoBiCompletion(BiCompletion<T> completion) {
959 this.completion = completion;
960 }
961 final CompletableFuture<?> tryAct() {
962 BiCompletion<T> c;
963 return (c = completion) == null ? null : c.tryAct();
964 }
965 }
966
967 /* ------------- Two-source Anded -------------- */
968
969 /* Pushes c on to completions and o's completions unless both done. */
970 private <U> void bipushAnded(CompletableFuture<?> o, BiCompletion<U> c) {
971 if (c != null && o != null) {
972 Object r; CompletableFuture<?> d;
973 while ((r = result) == null &&
974 !casCompletions(c.next = completions, c))
975 c.next = null;
976 if (o.result == null) {
977 Completion<U> q = (r != null) ? c : new CoBiCompletion<U>(c);
978 while (o.result == null &&
979 !o.casCompletions(q.next = o.completions, q))
980 q.next = null;
981 }
982 if ((d = c.tryAct()) != null)
983 d.postComplete();
984 if (o.result != null)
985 o.postComplete();
986 if (result != null)
987 postComplete();
988 }
989 }
990
991 // BiFunction/combine
992
993 static <T,U,V> void nowCombine(Executor e, CompletableFuture<V> d,
994 Object r, Object s,
995 BiFunction<? super T,? super U,? extends V> f) {
996 if (d != null && f != null) {
997 T t; U u; V v; Throwable x;
998 if (r instanceof AltResult) {
999 t = null;
1000 x = ((AltResult)r).ex;
1001 }
1002 else {
1003 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
1004 x = null;
1005 }
1006 if (x != null)
1007 u = null;
1008 else if (s instanceof AltResult) {
1009 x = ((AltResult)s).ex;
1010 u = null;
1011 }
1012 else {
1013 @SuppressWarnings("unchecked") U us = (U) s; u = us;
1014 }
1015 if (x == null) {
1016 try {
1017 if (e != null) {
1018 e.execute(new AsyncCombine<T,U,V>(d, t, u, f));
1019 return;
1020 }
1021 v = f.apply(t, u);
1022 } catch (Throwable ex) {
1023 x = ex;
1024 v = null;
1025 }
1026 }
1027 else
1028 v = null;
1029 d.internalComplete(encodeOutcome(v, x));
1030 }
1031 }
1032
1033 static final class AsyncCombine<T,U,V> extends Async<V> {
1034 T arg1; U arg2; BiFunction<? super T,? super U,? extends V> fn;
1035 AsyncCombine(CompletableFuture<V> dep, T arg1, U arg2,
1036 BiFunction<? super T,? super U,? extends V> fn) {
1037 super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
1038 }
1039 final void compute() { nowCombine(null, dep, arg1, arg2, fn); }
1040 private static final long serialVersionUID = 5232453952276885070L;
1041 }
1042
1043 static final class DelayedCombine<T,U,V> extends BiCompletion<V> {
1044 BiFunction<? super T,? super U,? extends V> fn;
1045 DelayedCombine(Executor async, CompletableFuture<V> dep,
1046 CompletableFuture<?> src, CompletableFuture<?> snd,
1047 BiFunction<? super T,? super U,? extends V> fn) {
1048 super(async, dep, src, snd); this.fn = fn;
1049 }
1050 final CompletableFuture<?> tryAct() {
1051 CompletableFuture<V> d; CompletableFuture<?> a, b; Object r, s;
1052 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1053 (r = a.result) != null && (s = b.result) != null && claim(d)) {
1054 nowCombine(async, d, r, s, fn);
1055 src = null; snd = null; fn = null;
1056 if (d.result != null) return d;
1057 }
1058 return null;
1059 }
1060 }
1061
1062 private <U,V> CompletableFuture<V> doThenCombine(
1063 CompletableFuture<? extends U> o,
1064 BiFunction<? super T,? super U,? extends V> fn,
1065 Executor e) {
1066 if (o == null || fn == null) throw new NullPointerException();
1067 CompletableFuture<V> d = new CompletableFuture<V>();
1068 Object r = result, s = o.result;
1069 if (r == null || s == null)
1070 bipushAnded(o, new DelayedCombine<T,U,V>(e, d, this, o, fn));
1071 else
1072 nowCombine(e, d, r, s, fn);
1073 return d;
1074 }
1075
1076 // BiConsumer/AcceptBoth
1077
1078 static <T,U,V> void nowAcceptBoth(Executor e, CompletableFuture<V> d,
1079 Object r, Object s,
1080 BiConsumer<? super T,? super U> f) {
1081 if (d != null && f != null) {
1082 T t; U u; Throwable x;
1083 if (r instanceof AltResult) {
1084 t = null;
1085 x = ((AltResult)r).ex;
1086 }
1087 else {
1088 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
1089 x = null;
1090 }
1091 if (x != null)
1092 u = null;
1093 else if (s instanceof AltResult) {
1094 x = ((AltResult)s).ex;
1095 u = null;
1096 }
1097 else {
1098 @SuppressWarnings("unchecked") U us = (U) s; u = us;
1099 }
1100 if (x == null) {
1101 try {
1102 if (e != null) {
1103 e.execute(new AsyncAcceptBoth<T,U,V>(d, t, u, f));
1104 return;
1105 }
1106 f.accept(t, u);
1107 } catch (Throwable ex) {
1108 x = ex;
1109 }
1110 }
1111 d.internalComplete(encodeOutcome(null, x));
1112 }
1113 }
1114
1115 static final class AsyncAcceptBoth<T,U,V> extends Async<V> {
1116 T arg1; U arg2; BiConsumer<? super T,? super U> fn;
1117 AsyncAcceptBoth(CompletableFuture<V> dep, T arg1, U arg2,
1118 BiConsumer<? super T,? super U> fn) {
1119 super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
1120 }
1121 final void compute() { nowAcceptBoth(null, dep, arg1, arg2, fn); }
1122 private static final long serialVersionUID = 5232453952276885070L;
1123 }
1124
1125 static final class DelayedAcceptBoth<T,U> extends BiCompletion<Void> {
1126 BiConsumer<? super T,? super U> fn;
1127 DelayedAcceptBoth(Executor async, CompletableFuture<Void> dep,
1128 CompletableFuture<?> src, CompletableFuture<?> snd,
1129 BiConsumer<? super T,? super U> fn) {
1130 super(async, dep, src, snd); this.fn = fn;
1131 }
1132 final CompletableFuture<?> tryAct() {
1133 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1134 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1135 (r = a.result) != null && (s = b.result) != null && claim(d)) {
1136 nowAcceptBoth(async, d, r, s, fn);
1137 src = null; snd = null; fn = null;
1138 if (d.result != null) return d;
1139 }
1140 return null;
1141 }
1142 }
1143
1144 private <U> CompletableFuture<Void> doThenAcceptBoth(
1145 CompletableFuture<? extends U> o,
1146 BiConsumer<? super T, ? super U> fn,
1147 Executor e) {
1148 if (o == null || fn == null) throw new NullPointerException();
1149 CompletableFuture<Void> d = new CompletableFuture<Void>();
1150 Object r = result, s = o.result;
1151 if (r == null || s == null)
1152 bipushAnded(o, new DelayedAcceptBoth<T,U>(e, d, this, o, fn));
1153 else
1154 nowAcceptBoth(e, d, r, s, fn);
1155 return d;
1156 }
1157
1158 // Runnable/both
1159
1160 static final class DelayedRunAfterBoth extends BiCompletion<Void> {
1161 Runnable fn;
1162 DelayedRunAfterBoth(Executor async, CompletableFuture<Void> dep,
1163 CompletableFuture<?> src, CompletableFuture<?> snd,
1164 Runnable fn) {
1165 super(async, dep, src, snd); this.fn = fn;
1166 }
1167 final CompletableFuture<?> tryAct() {
1168 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1169 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1170 (r = a.result) != null && (s = b.result) != null && claim(d)) {
1171 Throwable x = (r instanceof AltResult) ?
1172 ((AltResult)r).ex : null;
1173 nowRun(async, d, (x == null) ? s : r, fn);
1174 src = null; snd = null; fn = null;
1175 if (d.result != null) return d;
1176 }
1177 return null;
1178 }
1179 }
1180
1181 private CompletableFuture<Void> doRunAfterBoth(
1182 CompletableFuture<?> o, Runnable fn, Executor e) {
1183 if (o == null || fn == null) throw new NullPointerException();
1184 CompletableFuture<Void> d = new CompletableFuture<Void>();
1185 Object r = result, s = o.result;
1186 if (r == null || s == null)
1187 bipushAnded(o, new DelayedRunAfterBoth(e, d, this, o, fn));
1188 else {
1189 Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1190 nowRun(e, d, (x == null) ? s : r, fn);
1191 }
1192 return d;
1193 }
1194
1195 // allOf
1196
1197 static <T> void nowAnd(CompletableFuture<T> d, Object r, Object s) {
1198 if (d != null) {
1199 Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1200 if (x == null && (s instanceof AltResult))
1201 x = ((AltResult)s).ex;
1202 d.internalComplete(encodeOutcome(null, x));
1203 }
1204 }
1205
1206 static final class DelayedAnd extends BiCompletion<Void> {
1207 DelayedAnd(CompletableFuture<Void> dep,
1208 CompletableFuture<?> src, CompletableFuture<?> snd) {
1209 super(null, dep, src, snd);
1210 }
1211 final CompletableFuture<?> tryAct() {
1212 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1213 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1214 (r = a.result) != null && (s = b.result) != null && claim(d)) {
1215 nowAnd(d, r, s);
1216 src = null; snd = null;
1217 if (d.result != null) return d;
1218 }
1219 return null;
1220 }
1221 }
1222
1223 /** Recursively constructs a tree of And completions */
1224 private static CompletableFuture<Void> doAllOf(CompletableFuture<?>[] cfs,
1225 int lo, int hi) {
1226 CompletableFuture<Void> d = new CompletableFuture<Void>();
1227 if (lo > hi) // empty
1228 d.result = NIL;
1229 else {
1230 int mid = (lo + hi) >>> 1;
1231 CompletableFuture<?> fst = (lo == mid ? cfs[lo] :
1232 doAllOf(cfs, lo, mid));
1233 CompletableFuture<?> snd = (lo == hi ? fst : // and fst with self
1234 (hi == mid+1) ? cfs[hi] :
1235 doAllOf(cfs, mid+1, hi));
1236 Object r = fst.result, s = snd.result; // throw NPE if null elements
1237 if (r == null || s == null) {
1238 DelayedAnd a = new DelayedAnd(d, fst, snd);
1239 if (fst == snd)
1240 fst.unipush(a);
1241 else
1242 fst.bipushAnded(snd, a);
1243 }
1244 else
1245 nowAnd(d, r, s);
1246 }
1247 return d;
1248 }
1249
1250 /* ------------- Two-source Ored -------------- */
1251
1252 /* Pushes c on to completions and o's completions unless either done. */
1253 private <U> void bipushOred(CompletableFuture<?> o, BiCompletion<U> c) {
1254 if (c != null && o != null) {
1255 CompletableFuture<?> d;
1256 while (o.result == null && result == null) {
1257 if (casCompletions(c.next = completions, c)) {
1258 CoBiCompletion<U> q = new CoBiCompletion<U>(c);
1259 while (result == null && o.result == null &&
1260 !o.casCompletions(q.next = o.completions, q))
1261 q.next = null;
1262 break;
1263 }
1264 c.next = null;
1265 }
1266 if ((d = c.tryAct()) != null)
1267 d.postComplete();
1268 if (o.result != null)
1269 o.postComplete();
1270 if (result != null)
1271 postComplete();
1272 }
1273 }
1274
1275 // Function/applyEither
1276
1277 static final class DelayedApplyToEither<T,U> extends BiCompletion<U> {
1278 Function<? super T,? extends U> fn;
1279 DelayedApplyToEither(Executor async, CompletableFuture<U> dep,
1280 CompletableFuture<?> src, CompletableFuture<?> snd,
1281 Function<? super T,? extends U> fn) {
1282 super(async, dep, src, snd); this.fn = fn;
1283 }
1284 final CompletableFuture<?> tryAct() {
1285 CompletableFuture<U> d; CompletableFuture<?> a, b; Object r;
1286 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1287 ((r = a.result) != null || (r = b.result) != null) &&
1288 claim(d)) {
1289 nowApply(async, d, r, fn);
1290 src = null; snd = null; fn = null;
1291 if (d.result != null) return d;
1292 }
1293 return null;
1294 }
1295 }
1296
1297 private <U> CompletableFuture<U> doApplyToEither(
1298 CompletableFuture<? extends T> o,
1299 Function<? super T, U> fn, Executor e) {
1300 if (o == null || fn == null) throw new NullPointerException();
1301 CompletableFuture<U> d = new CompletableFuture<U>();
1302 Object r = result;
1303 if (r == null && (r = o.result) == null)
1304 bipushOred(o, new DelayedApplyToEither<T,U>(e, d, this, o, fn));
1305 else
1306 nowApply(e, d, r, fn);
1307 return d;
1308 }
1309
1310 // Consumer/acceptEither
1311
1312 static final class DelayedAcceptEither<T> extends BiCompletion<Void> {
1313 Consumer<? super T> fn;
1314 DelayedAcceptEither(Executor async, CompletableFuture<Void> dep,
1315 CompletableFuture<?> src, CompletableFuture<?> snd,
1316 Consumer<? super T> fn) {
1317 super(async, dep, src, snd); this.fn = fn;
1318 }
1319 final CompletableFuture<?> tryAct() {
1320 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1321 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1322 ((r = a.result) != null || (r = b.result) != null) &&
1323 claim(d)) {
1324 nowAccept(async, d, r, fn);
1325 src = null; snd = null; fn = null;
1326 if (d.result != null) return d;
1327 }
1328 return null;
1329 }
1330 }
1331
1332 private CompletableFuture<Void> doAcceptEither(
1333 CompletableFuture<? extends T> o,
1334 Consumer<? super T> fn, Executor e) {
1335 if (o == null || fn == null) throw new NullPointerException();
1336 CompletableFuture<Void> d = new CompletableFuture<Void>();
1337 Object r = result;
1338 if (r == null && (r = o.result) == null)
1339 bipushOred(o, new DelayedAcceptEither<T>(e, d, this, o, fn));
1340 else
1341 nowAccept(e, d, r, fn);
1342 return d;
1343 }
1344
1345 // Runnable/runEither
1346
1347 static final class DelayedRunAfterEither extends BiCompletion<Void> {
1348 Runnable fn;
1349 DelayedRunAfterEither(Executor async, CompletableFuture<Void> dep,
1350 CompletableFuture<?> src,
1351 CompletableFuture<?> snd, Runnable fn) {
1352 super(async, dep, src, snd); this.fn = fn;
1353 }
1354 final CompletableFuture<?> tryAct() {
1355 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1356 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1357 ((r = a.result) != null || (r = b.result) != null) &&
1358 claim(d)) {
1359 nowRun(async, d, r, fn);
1360 src = null; snd = null; fn = null;
1361 if (d.result != null) return d;
1362 }
1363 return null;
1364 }
1365 }
1366
1367 private CompletableFuture<Void> doRunAfterEither(
1368 CompletableFuture<?> o, Runnable fn, Executor e) {
1369 if (o == null || fn == null) throw new NullPointerException();
1370 CompletableFuture<Void> d = new CompletableFuture<Void>();
1371 Object r = result;
1372 if (r == null && (r = o.result) == null)
1373 bipushOred(o, new DelayedRunAfterEither(e, d, this, o, fn));
1374 else
1375 nowRun(e, d, r, fn);
1376 return d;
1377 }
1378
1379 /* ------------- Signallers -------------- */
1380
1381 /**
1382 * Heuristic spin value for waitingGet() before blocking on
1383 * multiprocessors
1384 */
1385 static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
1386 1 << 8 : 0);
1387
1388 /**
1389 * Completion for recording and releasing a waiting thread. See
1390 * other classes such as Phaser and SynchronousQueue for more
1391 * detailed explanation. This class implements ManagedBlocker to
1392 * avoid starvation when blocking actions pile up in
1393 * ForkJoinPools.
1394 */
1395 static final class Signaller extends Completion<Void>
1396 implements ForkJoinPool.ManagedBlocker {
1397 long nanos; // wait time if timed
1398 final long deadline; // non-zero if timed
1399 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
1400 volatile Thread thread;
1401 Signaller(boolean interruptible, long nanos, long deadline) {
1402 this.thread = Thread.currentThread();
1403 this.interruptControl = interruptible ? 1 : 0;
1404 this.nanos = nanos;
1405 this.deadline = deadline;
1406 }
1407 final CompletableFuture<?> tryAct() {
1408 Thread w = thread;
1409 if (w != null) {
1410 thread = null; // no need to CAS
1411 LockSupport.unpark(w);
1412 }
1413 return null;
1414 }
1415 public boolean isReleasable() {
1416 if (thread == null)
1417 return true;
1418 if (Thread.interrupted()) {
1419 int i = interruptControl;
1420 interruptControl = -1;
1421 if (i > 0)
1422 return true;
1423 }
1424 if (deadline != 0L &&
1425 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
1426 thread = null;
1427 return true;
1428 }
1429 return false;
1430 }
1431 public boolean block() {
1432 if (isReleasable())
1433 return true;
1434 else if (deadline == 0L)
1435 LockSupport.park(this);
1436 else if (nanos > 0L)
1437 LockSupport.parkNanos(this, nanos);
1438 return isReleasable();
1439 }
1440 }
1441
1442 /**
1443 * Returns raw result after waiting, or null if interruptible and
1444 * interrupted.
1445 */
1446 private Object waitingGet(boolean interruptible) {
1447 Signaller q = null;
1448 boolean queued = false;
1449 int spins = SPINS;
1450 Object r;
1451 while ((r = result) == null) {
1452 if (spins > 0) {
1453 if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1454 --spins;
1455 }
1456 else if (q == null)
1457 q = new Signaller(interruptible, 0L, 0L);
1458 else if (!queued)
1459 queued = casCompletions(q.next = completions, q);
1460 else if (interruptible && q.interruptControl < 0) {
1461 q.thread = null;
1462 removeCancelledSignallers();
1463 return null;
1464 }
1465 else if (q.thread != null && result == null) {
1466 try {
1467 ForkJoinPool.managedBlock(q);
1468 } catch (InterruptedException ie) {
1469 q.interruptControl = -1;
1470 }
1471 }
1472 }
1473 if (q != null) {
1474 q.thread = null;
1475 if (q.interruptControl < 0) {
1476 if (interruptible)
1477 r = null; // report interruption
1478 else
1479 Thread.currentThread().interrupt();
1480 }
1481 }
1482 postComplete();
1483 return r;
1484 }
1485
1486 /**
1487 * Returns raw result after waiting, or null if interrupted, or
1488 * throws TimeoutException on timeout.
1489 */
1490 private Object timedGet(long nanos) throws TimeoutException {
1491 if (Thread.interrupted())
1492 return null;
1493 if (nanos <= 0L)
1494 throw new TimeoutException();
1495 long d = System.nanoTime() + nanos;
1496 Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
1497 boolean queued = false;
1498 Object r;
1499 while ((r = result) == null) {
1500 if (!queued)
1501 queued = casCompletions(q.next = completions, q);
1502 else if (q.interruptControl < 0 || q.nanos <= 0L) {
1503 q.thread = null;
1504 removeCancelledSignallers();
1505 if (q.interruptControl < 0)
1506 return null;
1507 throw new TimeoutException();
1508 }
1509 else if (q.thread != null && result == null) {
1510 try {
1511 ForkJoinPool.managedBlock(q);
1512 } catch (InterruptedException ie) {
1513 q.interruptControl = -1;
1514 }
1515 }
1516 }
1517 q.thread = null;
1518 postComplete();
1519 return (q.interruptControl < 0) ? null : r;
1520 }
1521
1522 /**
1523 * Unlinks cancelled Signallers to avoid accumulating garbage.
1524 * Internal nodes are simply unspliced without CAS since it is
1525 * harmless if they are traversed anyway. To avoid effects of
1526 * unsplicing from already removed nodes, the list is retraversed
1527 * in case of an apparent race.
1528 */
1529 private void removeCancelledSignallers() {
1530 for (Completion<?> p = null, q = completions; q != null;) {
1531 Completion<?> s = q.next;
1532 if ((q instanceof Signaller) && ((Signaller)q).thread == null) {
1533 if (p != null) {
1534 p.next = s;
1535 if (!(p instanceof Signaller) ||
1536 ((Signaller)p).thread != null)
1537 break;
1538 }
1539 else if (casCompletions(q, s))
1540 break;
1541 p = null; // restart
1542 q = completions;
1543 }
1544 else {
1545 p = q;
1546 q = s;
1547 }
1548 }
1549 }
1550
1551 /* ------------- public methods -------------- */
1552
1553 /**
1554 * Creates a new incomplete CompletableFuture.
1555 */
1556 public CompletableFuture() {
1557 }
1558
1559 /**
1560 * Returns a new CompletableFuture that is asynchronously completed
1561 * by a task running in the {@link ForkJoinPool#commonPool()} with
1562 * the value obtained by calling the given Supplier.
1563 *
1564 * @param supplier a function returning the value to be used
1565 * to complete the returned CompletableFuture
1566 * @param <U> the function's return type
1567 * @return the new CompletableFuture
1568 */
1569 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1570 if (supplier == null) throw new NullPointerException();
1571 CompletableFuture<U> d = new CompletableFuture<U>();
1572 asyncPool.execute(new AsyncSupply<U>(d, supplier));
1573 return d;
1574 }
1575
1576 /**
1577 * Returns a new CompletableFuture that is asynchronously completed
1578 * by a task running in the given executor with the value obtained
1579 * by calling the given Supplier.
1580 *
1581 * @param supplier a function returning the value to be used
1582 * to complete the returned CompletableFuture
1583 * @param executor the executor to use for asynchronous execution
1584 * @param <U> the function's return type
1585 * @return the new CompletableFuture
1586 */
1587 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1588 Executor executor) {
1589 if (supplier == null) throw new NullPointerException();
1590 Executor e = screenExecutor(executor);
1591 CompletableFuture<U> d = new CompletableFuture<U>();
1592 e.execute(new AsyncSupply<U>(d, supplier));
1593 return d;
1594 }
1595
1596 /**
1597 * Returns a new CompletableFuture that is asynchronously completed
1598 * by a task running in the {@link ForkJoinPool#commonPool()} after
1599 * it runs the given action.
1600 *
1601 * @param runnable the action to run before completing the
1602 * returned CompletableFuture
1603 * @return the new CompletableFuture
1604 */
1605 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1606 if (runnable == null) throw new NullPointerException();
1607 CompletableFuture<Void> d = new CompletableFuture<Void>();
1608 asyncPool.execute(new AsyncRun<Void>(d, runnable));
1609 return d;
1610 }
1611
1612 /**
1613 * Returns a new CompletableFuture that is asynchronously completed
1614 * by a task running in the given executor after it runs the given
1615 * action.
1616 *
1617 * @param runnable the action to run before completing the
1618 * returned CompletableFuture
1619 * @param executor the executor to use for asynchronous execution
1620 * @return the new CompletableFuture
1621 */
1622 public static CompletableFuture<Void> runAsync(Runnable runnable,
1623 Executor executor) {
1624 if (runnable == null) throw new NullPointerException();
1625 Executor e = screenExecutor(executor);
1626 CompletableFuture<Void> d = new CompletableFuture<Void>();
1627 e.execute(new AsyncRun<Void>(d, runnable));
1628 return d;
1629 }
1630
1631 /**
1632 * Returns a new CompletableFuture that is already completed with
1633 * the given value.
1634 *
1635 * @param value the value
1636 * @param <U> the type of the value
1637 * @return the completed CompletableFuture
1638 */
1639 public static <U> CompletableFuture<U> completedFuture(U value) {
1640 CompletableFuture<U> d = new CompletableFuture<U>();
1641 d.result = (value == null) ? NIL : value;
1642 return d;
1643 }
1644
1645 /**
1646 * Returns {@code true} if completed in any fashion: normally,
1647 * exceptionally, or via cancellation.
1648 *
1649 * @return {@code true} if completed
1650 */
1651 public boolean isDone() {
1652 return result != null;
1653 }
1654
1655 /**
1656 * Waits if necessary for this future to complete, and then
1657 * returns its result.
1658 *
1659 * @return the result value
1660 * @throws CancellationException if this future was cancelled
1661 * @throws ExecutionException if this future completed exceptionally
1662 * @throws InterruptedException if the current thread was interrupted
1663 * while waiting
1664 */
1665 public T get() throws InterruptedException, ExecutionException {
1666 Object r;
1667 return reportGet((r = result) == null ? waitingGet(true) : r);
1668 }
1669
1670 /**
1671 * Waits if necessary for at most the given time for this future
1672 * to complete, and then returns its result, if available.
1673 *
1674 * @param timeout the maximum time to wait
1675 * @param unit the time unit of the timeout argument
1676 * @return the result value
1677 * @throws CancellationException if this future was cancelled
1678 * @throws ExecutionException if this future completed exceptionally
1679 * @throws InterruptedException if the current thread was interrupted
1680 * while waiting
1681 * @throws TimeoutException if the wait timed out
1682 */
1683 public T get(long timeout, TimeUnit unit)
1684 throws InterruptedException, ExecutionException, TimeoutException {
1685 Object r;
1686 long nanos = unit.toNanos(timeout);
1687 return reportGet((r = result) == null ? timedGet(nanos) : r);
1688 }
1689
1690 /**
1691 * Returns the result value when complete, or throws an
1692 * (unchecked) exception if completed exceptionally. To better
1693 * conform with the use of common functional forms, if a
1694 * computation involved in the completion of this
1695 * CompletableFuture threw an exception, this method throws an
1696 * (unchecked) {@link CompletionException} with the underlying
1697 * exception as its cause.
1698 *
1699 * @return the result value
1700 * @throws CancellationException if the computation was cancelled
1701 * @throws CompletionException if this future completed
1702 * exceptionally or a completion computation threw an exception
1703 */
1704 public T join() {
1705 Object r;
1706 return reportJoin((r = result) == null ? waitingGet(false) : r);
1707 }
1708
1709 /**
1710 * Returns the result value (or throws any encountered exception)
1711 * if completed, else returns the given valueIfAbsent.
1712 *
1713 * @param valueIfAbsent the value to return if not completed
1714 * @return the result value, if completed, else the given valueIfAbsent
1715 * @throws CancellationException if the computation was cancelled
1716 * @throws CompletionException if this future completed
1717 * exceptionally or a completion computation threw an exception
1718 */
1719 public T getNow(T valueIfAbsent) {
1720 Object r;
1721 return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
1722 }
1723
1724 /**
1725 * If not already completed, sets the value returned by {@link
1726 * #get()} and related methods to the given value.
1727 *
1728 * @param value the result value
1729 * @return {@code true} if this invocation caused this CompletableFuture
1730 * to transition to a completed state, else {@code false}
1731 */
1732 public boolean complete(T value) {
1733 boolean triggered = internalComplete(value == null ? NIL : value);
1734 postComplete();
1735 return triggered;
1736 }
1737
1738 /**
1739 * If not already completed, causes invocations of {@link #get()}
1740 * and related methods to throw the given exception.
1741 *
1742 * @param ex the exception
1743 * @return {@code true} if this invocation caused this CompletableFuture
1744 * to transition to a completed state, else {@code false}
1745 */
1746 public boolean completeExceptionally(Throwable ex) {
1747 if (ex == null) throw new NullPointerException();
1748 boolean triggered = internalComplete(new AltResult(ex));
1749 postComplete();
1750 return triggered;
1751 }
1752
1753 public <U> CompletableFuture<U> thenApply(
1754 Function<? super T,? extends U> fn) {
1755 return doThenApply(fn, null);
1756 }
1757
1758 public <U> CompletableFuture<U> thenApplyAsync(
1759 Function<? super T,? extends U> fn) {
1760 return doThenApply(fn, asyncPool);
1761 }
1762
1763 public <U> CompletableFuture<U> thenApplyAsync(
1764 Function<? super T,? extends U> fn, Executor executor) {
1765 return doThenApply(fn, screenExecutor(executor));
1766 }
1767
1768 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
1769 return doThenAccept(action, null);
1770 }
1771
1772 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
1773 return doThenAccept(action, asyncPool);
1774 }
1775
1776 public CompletableFuture<Void> thenAcceptAsync(
1777 Consumer<? super T> action, Executor executor) {
1778 return doThenAccept(action, screenExecutor(executor));
1779 }
1780
1781 public CompletableFuture<Void> thenRun(Runnable action) {
1782 return doThenRun(action, null);
1783 }
1784
1785 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1786 return doThenRun(action, asyncPool);
1787 }
1788
1789 public CompletableFuture<Void> thenRunAsync(
1790 Runnable action, Executor executor) {
1791 return doThenRun(action, screenExecutor(executor));
1792 }
1793
1794 public <U,V> CompletableFuture<V> thenCombine(
1795 CompletionStage<? extends U> other,
1796 BiFunction<? super T,? super U,? extends V> fn) {
1797 return doThenCombine(other.toCompletableFuture(), fn, null);
1798 }
1799
1800 public <U,V> CompletableFuture<V> thenCombineAsync(
1801 CompletionStage<? extends U> other,
1802 BiFunction<? super T,? super U,? extends V> fn) {
1803 return doThenCombine(other.toCompletableFuture(), fn, asyncPool);
1804 }
1805
1806 public <U,V> CompletableFuture<V> thenCombineAsync(
1807 CompletionStage<? extends U> other,
1808 BiFunction<? super T,? super U,? extends V> fn,
1809 Executor executor) {
1810 return doThenCombine(other.toCompletableFuture(), fn,
1811 screenExecutor(executor));
1812 }
1813
1814 public <U> CompletableFuture<Void> thenAcceptBoth(
1815 CompletionStage<? extends U> other,
1816 BiConsumer<? super T, ? super U> action) {
1817 return doThenAcceptBoth(other.toCompletableFuture(), action, null);
1818 }
1819
1820 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1821 CompletionStage<? extends U> other,
1822 BiConsumer<? super T, ? super U> action) {
1823 return doThenAcceptBoth(other.toCompletableFuture(), action, asyncPool);
1824 }
1825
1826 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1827 CompletionStage<? extends U> other,
1828 BiConsumer<? super T, ? super U> action,
1829 Executor executor) {
1830 return doThenAcceptBoth(other.toCompletableFuture(), action,
1831 screenExecutor(executor));
1832 }
1833
1834 public CompletableFuture<Void> runAfterBoth(
1835 CompletionStage<?> other, Runnable action) {
1836 return doRunAfterBoth(other.toCompletableFuture(), action, null);
1837 }
1838
1839 public CompletableFuture<Void> runAfterBothAsync(
1840 CompletionStage<?> other, Runnable action) {
1841 return doRunAfterBoth(other.toCompletableFuture(), action, asyncPool);
1842 }
1843
1844 public CompletableFuture<Void> runAfterBothAsync(
1845 CompletionStage<?> other, Runnable action, Executor executor) {
1846 return doRunAfterBoth(other.toCompletableFuture(), action,
1847 screenExecutor(executor));
1848 }
1849
1850 public <U> CompletableFuture<U> applyToEither(
1851 CompletionStage<? extends T> other, Function<? super T, U> fn) {
1852 return doApplyToEither(other.toCompletableFuture(), fn, null);
1853 }
1854
1855 public <U> CompletableFuture<U> applyToEitherAsync(
1856 CompletionStage<? extends T> other, Function<? super T, U> fn) {
1857 return doApplyToEither(other.toCompletableFuture(), fn, asyncPool);
1858 }
1859
1860 public <U> CompletableFuture<U> applyToEitherAsync
1861 (CompletionStage<? extends T> other, Function<? super T, U> fn,
1862 Executor executor) {
1863 return doApplyToEither(other.toCompletableFuture(), fn,
1864 screenExecutor(executor));
1865 }
1866
1867 public CompletableFuture<Void> acceptEither(
1868 CompletionStage<? extends T> other, Consumer<? super T> action) {
1869 return doAcceptEither(other.toCompletableFuture(), action, null);
1870 }
1871
1872 public CompletableFuture<Void> acceptEitherAsync
1873 (CompletionStage<? extends T> other, Consumer<? super T> action) {
1874 return doAcceptEither(other.toCompletableFuture(), action, asyncPool);
1875 }
1876
1877 public CompletableFuture<Void> acceptEitherAsync(
1878 CompletionStage<? extends T> other, Consumer<? super T> action,
1879 Executor executor) {
1880 return doAcceptEither(other.toCompletableFuture(), action,
1881 screenExecutor(executor));
1882 }
1883
1884 public CompletableFuture<Void> runAfterEither(
1885 CompletionStage<?> other, Runnable action) {
1886 return doRunAfterEither(other.toCompletableFuture(), action, null);
1887 }
1888
1889 public CompletableFuture<Void> runAfterEitherAsync(
1890 CompletionStage<?> other, Runnable action) {
1891 return doRunAfterEither(other.toCompletableFuture(), action, asyncPool);
1892 }
1893
1894 public CompletableFuture<Void> runAfterEitherAsync(
1895 CompletionStage<?> other, Runnable action, Executor executor) {
1896 return doRunAfterEither(other.toCompletableFuture(), action,
1897 screenExecutor(executor));
1898 }
1899
1900 public <U> CompletableFuture<U> thenCompose
1901 (Function<? super T, ? extends CompletionStage<U>> fn) {
1902 return doThenCompose(fn, null);
1903 }
1904
1905 public <U> CompletableFuture<U> thenComposeAsync(
1906 Function<? super T, ? extends CompletionStage<U>> fn) {
1907 return doThenCompose(fn, asyncPool);
1908 }
1909
1910 public <U> CompletableFuture<U> thenComposeAsync(
1911 Function<? super T, ? extends CompletionStage<U>> fn,
1912 Executor executor) {
1913 return doThenCompose(fn, screenExecutor(executor));
1914 }
1915
1916 public CompletableFuture<T> whenComplete(
1917 BiConsumer<? super T, ? super Throwable> action) {
1918 return doWhenComplete(action, null);
1919 }
1920
1921 public CompletableFuture<T> whenCompleteAsync(
1922 BiConsumer<? super T, ? super Throwable> action) {
1923 return doWhenComplete(action, asyncPool);
1924 }
1925
1926 public CompletableFuture<T> whenCompleteAsync(
1927 BiConsumer<? super T, ? super Throwable> action, Executor executor) {
1928 return doWhenComplete(action, screenExecutor(executor));
1929 }
1930
1931 public <U> CompletableFuture<U> handle(
1932 BiFunction<? super T, Throwable, ? extends U> fn) {
1933 return doHandle(fn, null);
1934 }
1935
1936 public <U> CompletableFuture<U> handleAsync(
1937 BiFunction<? super T, Throwable, ? extends U> fn) {
1938 return doHandle(fn, asyncPool);
1939 }
1940
1941 public <U> CompletableFuture<U> handleAsync(
1942 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
1943 return doHandle(fn, screenExecutor(executor));
1944 }
1945
1946 /**
1947 * Returns this CompletableFuture.
1948 *
1949 * @return this CompletableFuture
1950 */
1951 public CompletableFuture<T> toCompletableFuture() {
1952 return this;
1953 }
1954
1955 // not in interface CompletionStage
1956
1957 /**
1958 * Returns a new CompletableFuture that is completed when this
1959 * CompletableFuture completes, with the result of the given
1960 * function of the exception triggering this CompletableFuture's
1961 * completion when it completes exceptionally; otherwise, if this
1962 * CompletableFuture completes normally, then the returned
1963 * CompletableFuture also completes normally with the same value.
1964 * Note: More flexible versions of this functionality are
1965 * available using methods {@code whenComplete} and {@code handle}.
1966 *
1967 * @param fn the function to use to compute the value of the
1968 * returned CompletableFuture if this CompletableFuture completed
1969 * exceptionally
1970 * @return the new CompletableFuture
1971 */
1972 public CompletableFuture<T> exceptionally(
1973 Function<Throwable, ? extends T> fn) {
1974 return doExceptionally(fn);
1975 }
1976
1977 /* ------------- Arbitrary-arity constructions -------------- */
1978
1979 /**
1980 * Returns a new CompletableFuture that is completed when all of
1981 * the given CompletableFutures complete. If any of the given
1982 * CompletableFutures complete exceptionally, then the returned
1983 * CompletableFuture also does so, with a CompletionException
1984 * holding this exception as its cause. Otherwise, the results,
1985 * if any, of the given CompletableFutures are not reflected in
1986 * the returned CompletableFuture, but may be obtained by
1987 * inspecting them individually. If no CompletableFutures are
1988 * provided, returns a CompletableFuture completed with the value
1989 * {@code null}.
1990 *
1991 * <p>Among the applications of this method is to await completion
1992 * of a set of independent CompletableFutures before continuing a
1993 * program, as in: {@code CompletableFuture.allOf(c1, c2,
1994 * c3).join();}.
1995 *
1996 * @param cfs the CompletableFutures
1997 * @return a new CompletableFuture that is completed when all of the
1998 * given CompletableFutures complete
1999 * @throws NullPointerException if the array or any of its elements are
2000 * {@code null}
2001 */
2002 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2003 return doAllOf(cfs, 0, cfs.length - 1);
2004 }
2005
2006 /**
2007 * Returns a new CompletableFuture that is completed when any of
2008 * the given CompletableFutures complete, with the same result.
2009 * Otherwise, if it completed exceptionally, the returned
2010 * CompletableFuture also does so, with a CompletionException
2011 * holding this exception as its cause. If no CompletableFutures
2012 * are provided, returns an incomplete CompletableFuture.
2013 *
2014 * @param cfs the CompletableFutures
2015 * @return a new CompletableFuture that is completed with the
2016 * result or exception of any of the given CompletableFutures when
2017 * one completes
2018 * @throws NullPointerException if the array or any of its elements are
2019 * {@code null}
2020 */
2021 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2022 CompletableFuture<Object> d = new CompletableFuture<Object>();
2023 for (int i = 0; i < cfs.length; ++i) {
2024 CompletableFuture<?> c = cfs[i];
2025 Object r = c.result; // throw NPE if null element
2026 if (d.result == null) {
2027 if (r == null)
2028 c.unipush(new DelayedCopy<Object>(d, c));
2029 else
2030 nowCopy(d, r);
2031 }
2032 }
2033 return d;
2034 }
2035
2036 /* ------------- Control and status methods -------------- */
2037
2038 /**
2039 * If not already completed, completes this CompletableFuture with
2040 * a {@link CancellationException}. Dependent CompletableFutures
2041 * that have not already completed will also complete
2042 * exceptionally, with a {@link CompletionException} caused by
2043 * this {@code CancellationException}.
2044 *
2045 * @param mayInterruptIfRunning this value has no effect in this
2046 * implementation because interrupts are not used to control
2047 * processing.
2048 *
2049 * @return {@code true} if this task is now cancelled
2050 */
2051 public boolean cancel(boolean mayInterruptIfRunning) {
2052 boolean cancelled = (result == null) &&
2053 internalComplete(new AltResult(new CancellationException()));
2054 postComplete();
2055 return cancelled || isCancelled();
2056 }
2057
2058 /**
2059 * Returns {@code true} if this CompletableFuture was cancelled
2060 * before it completed normally.
2061 *
2062 * @return {@code true} if this CompletableFuture was cancelled
2063 * before it completed normally
2064 */
2065 public boolean isCancelled() {
2066 Object r;
2067 return ((r = result) instanceof AltResult) &&
2068 (((AltResult)r).ex instanceof CancellationException);
2069 }
2070
2071 /**
2072 * Returns {@code true} if this CompletableFuture completed
2073 * exceptionally, in any way. Possible causes include
2074 * cancellation, explicit invocation of {@code
2075 * completeExceptionally}, and abrupt termination of a
2076 * CompletionStage action.
2077 *
2078 * @return {@code true} if this CompletableFuture completed
2079 * exceptionally
2080 */
2081 public boolean isCompletedExceptionally() {
2082 Object r;
2083 return ((r = result) instanceof AltResult) && r != NIL;
2084 }
2085
2086 /**
2087 * Forcibly sets or resets the value subsequently returned by
2088 * method {@link #get()} and related methods, whether or not
2089 * already completed. This method is designed for use only in
2090 * error recovery actions, and even in such situations may result
2091 * in ongoing dependent completions using established versus
2092 * overwritten outcomes.
2093 *
2094 * @param value the completion value
2095 */
2096 public void obtrudeValue(T value) {
2097 result = (value == null) ? NIL : value;
2098 postComplete();
2099 }
2100
2101 /**
2102 * Forcibly causes subsequent invocations of method {@link #get()}
2103 * and related methods to throw the given exception, whether or
2104 * not already completed. This method is designed for use only in
2105 * recovery actions, and even in such situations may result in
2106 * ongoing dependent completions using established versus
2107 * overwritten outcomes.
2108 *
2109 * @param ex the exception
2110 */
2111 public void obtrudeException(Throwable ex) {
2112 if (ex == null) throw new NullPointerException();
2113 result = new AltResult(ex);
2114 postComplete();
2115 }
2116
2117 /**
2118 * Returns the estimated number of CompletableFutures whose
2119 * completions are awaiting completion of this CompletableFuture.
2120 * This method is designed for use in monitoring system state, not
2121 * for synchronization control.
2122 *
2123 * @return the number of dependent CompletableFutures
2124 */
2125 public int getNumberOfDependents() {
2126 int count = 0;
2127 for (Completion<?> p = completions; p != null; p = p.next)
2128 ++count;
2129 return count;
2130 }
2131
2132 /**
2133 * Returns a string identifying this CompletableFuture, as well as
2134 * its completion state. The state, in brackets, contains the
2135 * String {@code "Completed Normally"} or the String {@code
2136 * "Completed Exceptionally"}, or the String {@code "Not
2137 * completed"} followed by the number of CompletableFutures
2138 * dependent upon its completion, if any.
2139 *
2140 * @return a string identifying this CompletableFuture, as well as its state
2141 */
2142 public String toString() {
2143 Object r = result;
2144 int count;
2145 return super.toString() +
2146 ((r == null) ?
2147 (((count = getNumberOfDependents()) == 0) ?
2148 "[Not completed]" :
2149 "[Not completed, " + count + " dependents]") :
2150 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2151 "[Completed exceptionally]" :
2152 "[Completed normally]"));
2153 }
2154
2155 // Unsafe mechanics
2156 private static final sun.misc.Unsafe UNSAFE;
2157 private static final long RESULT;
2158 private static final long COMPLETIONS;
2159 static {
2160 try {
2161 UNSAFE = sun.misc.Unsafe.getUnsafe();
2162 Class<?> k = CompletableFuture.class;
2163 RESULT = UNSAFE.objectFieldOffset
2164 (k.getDeclaredField("result"));
2165 COMPLETIONS = UNSAFE.objectFieldOffset
2166 (k.getDeclaredField("completions"));
2167 } catch (Exception x) {
2168 throw new Error(x);
2169 }
2170 }
2171 }