ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.112
Committed: Sat May 3 21:05:50 2014 UTC (10 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.111: +4 -4 lines
Log Message:
/* => /** for javadoc

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.locks.LockSupport;
25
26 /**
27 * A {@link Future} that may be explicitly completed (setting its
28 * value and status), and may be used as a {@link CompletionStage},
29 * supporting dependent functions and actions that trigger upon its
30 * completion.
31 *
32 * <p>When two or more threads attempt to
33 * {@link #complete complete},
34 * {@link #completeExceptionally completeExceptionally}, or
35 * {@link #cancel cancel}
36 * a CompletableFuture, only one of them succeeds.
37 *
38 * <p>In addition to these and related methods for directly
39 * manipulating status and results, CompletableFuture implements
40 * interface {@link CompletionStage} with the following policies: <ul>
41 *
42 * <li>Actions supplied for dependent completions of
43 * <em>non-async</em> methods may be performed by the thread that
44 * completes the current CompletableFuture, or by any other caller of
45 * a completion method.</li>
46 *
47 * <li>All <em>async</em> methods without an explicit Executor
48 * argument are performed using the {@link ForkJoinPool#commonPool()}
49 * (unless it does not support a parallelism level of at least two, in
50 * which case, a new Thread is used). To simplify monitoring,
51 * debugging, and tracking, all generated asynchronous tasks are
52 * instances of the marker interface {@link
53 * AsynchronousCompletionTask}. </li>
54 *
55 * <li>All CompletionStage methods are implemented independently of
56 * other public methods, so the behavior of one method is not impacted
57 * by overrides of others in subclasses. </li> </ul>
58 *
59 * <p>CompletableFuture also implements {@link Future} with the following
60 * policies: <ul>
61 *
62 * <li>Since (unlike {@link FutureTask}) this class has no direct
63 * control over the computation that causes it to be completed,
64 * cancellation is treated as just another form of exceptional
65 * completion. Method {@link #cancel cancel} has the same effect as
66 * {@code completeExceptionally(new CancellationException())}. Method
67 * {@link #isCompletedExceptionally} can be used to determine if a
68 * CompletableFuture completed in any exceptional fashion.</li>
69 *
70 * <li>In case of exceptional completion with a CompletionException,
71 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
72 * {@link ExecutionException} with the same cause as held in the
73 * corresponding CompletionException. To simplify usage in most
74 * contexts, this class also defines methods {@link #join()} and
75 * {@link #getNow} that instead throw the CompletionException directly
76 * in these cases.</li> </ul>
77 *
78 * @author Doug Lea
79 * @since 1.8
80 */
81 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
82
83 /*
84 * Overview:
85 *
86 * A CompletableFuture may have dependent completion actions,
87 * collected in a linked stack. It atomically completes by CASing
88 * a result field, and then pops off and runs those actions. This
89 * applies across normal vs exceptional outcomes, sync vs async
90 * actions, binary triggers, and various forms of completions.
91 *
92 * Non-nullness of field result (set via CAS) indicates done. An
93 * AltResult is used to box null as a result, as well as to hold
94 * exceptions. Using a single field makes completion simple to
95 * detect and trigger. Encoding and decoding is straightforward
96 * but adds vertical sprawl. One minor simplification relies on
97 * the (static) NIL (to box null results) being the only AltResult
98 * with a null exception field, so we don't usually need explicit
99 * comparisons with NIL. Exception propagation mechanics
100 * surrounding decoding rely on unchecked casts of decoded results
101 * really being unchecked, and user type errors being caught at
102 * point of use, as is currently the case in Java. These are
103 * highlighted by using SuppressWarnings annotated temporaries.
104 *
105 * Dependent actions are represented by Completion objects linked
106 * as Treiber stacks headed by field completions. There are four
107 * kinds of Completions: single-source (UniCompletion), two-source
108 * (BiCompletion), shared (CoCompletion, used by the second
109 * source of a BiCompletion), and Signallers that unblock waiters.
110 *
111 * The same patterns of methods and classes are used for each form
112 * of Completion (apply, combine, etc), and are written in a
113 * similar style. For each form X there is, when applicable:
114 *
115 * * Method nowX (for example nowApply) that immediately executes
116 * a supplied function and sets result
117 * * Class AsyncX class (for example AsyncApply) that calls nowX
118 * from another task,
119 * * Class DelayedX (for example DelayedApply) that holds
120 * arguments and calls nowX when ready.
121 *
122 * For each public CompletionStage method M* (for example
123 * thenApply{Async}), there is a method doM (for example
124 * doThenApply) that creates and/or invokes the appropriate form.
125 * Each deals with three cases that can arise when adding a
126 * dependent completion to CompletableFuture f:
127 *
128 * * f is already complete, so the dependent action is run
129 * immediately, via a "now" method, which, if async,
130 * starts the action in a new task.
131 * * f is not complete, so a Completion action is created and
132 * pushed to f's completions. It is triggered via
133 * f.postComplete when f completes.
134 * * f is not complete, but completes while adding the completion
135 * action, so we try to trigger it upon adding (see method
136 * unipush and derivatives) to cover races.
137 *
138 * Methods with two sources (for example thenCombine) must deal
139 * with races across both while pushing actions. The second
140 * completion is a CoCompletion pointing to the first, shared to
141 * ensure that at most one claims and performs the action. The
142 * multiple-arity method allOf does this pairwise to form a tree
143 * of completions. (Method anyOf just uses a depth-one Or tree.)
144 *
145 * Upon setting results, method postComplete is called unless
146 * the target is guaranteed not to be observable (i.e., not yet
147 * returned or linked). Multiple threads can call postComplete,
148 * which atomically pops each dependent action, and tries to
149 * trigger it via method tryAct. Any such action must be performed
150 * only once, even if called from several threads, so Completions
151 * maintain status via CAS, and on success run one of the "now"
152 * methods. Triggering can propagate recursively, so tryAct
153 * returns a completed dependent (if one exists) for further
154 * processing by its caller.
155 *
156 * Blocking methods get() and join() rely on Signaller Completions
157 * that wake up waiting threads. The mechanics are similar to
158 * Treiber stack wait-nodes used in FutureTask, Phaser, and
159 * SynchronousQueue. See their internal documentation for
160 * algorithmic details.
161 *
162 * Without precautions, CompletableFutures would be prone to
163 * garbage accumulation as chains of completions build up, each
164 * pointing back to its sources. So we detach (null out) most
165 * Completion fields as soon as possible. To support this,
166 * internal methods check for and harmlessly ignore null arguments
167 * that may have been obtained during races with threads nulling
168 * out fields. (Some of these checked cases cannot currently
169 * happen.) Fields of Async classes can be but currently are not
170 * fully detached, because they do not in general form cycles.
171 */
172
173 volatile Object result; // Either the result or boxed AltResult
174 volatile Completion completions; // Treiber stack of dependent actions
175
176 final boolean internalComplete(Object r) { // CAS from null to r
177 return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
178 }
179
180 final boolean casCompletions(Completion cmp, Completion val) {
181 return UNSAFE.compareAndSwapObject(this, COMPLETIONS, cmp, val);
182 }
183
184 /* ------------- Encoding and decoding outcomes -------------- */
185
186 static final class AltResult { // See above
187 final Throwable ex; // null only for NIL
188 AltResult(Throwable x) { this.ex = x; }
189 }
190
191 static final AltResult NIL = new AltResult(null);
192
193 /**
194 * Returns the encoding of the given (non-null) exception as a
195 * wrapped CompletionException unless it is one already.
196 */
197 static AltResult altThrowable(Throwable x) {
198 return new AltResult((x instanceof CompletionException) ? x :
199 new CompletionException(x));
200 }
201
202 /**
203 * Returns the encoding of the given arguments: if the exception
204 * is non-null, encodes as altThrowable. Otherwise uses the given
205 * value, boxed as NIL if null.
206 */
207 static Object encodeOutcome(Object v, Throwable x) {
208 return (x != null) ? altThrowable(x) : (v == null) ? NIL : v;
209 }
210
211 /**
212 * Decodes outcome to return result or throw unchecked exception.
213 */
214 private static <T> T reportJoin(Object r) {
215 if (r instanceof AltResult) {
216 Throwable x;
217 if ((x = ((AltResult)r).ex) == null)
218 return null;
219 if (x instanceof CancellationException)
220 throw (CancellationException)x;
221 if (x instanceof CompletionException)
222 throw (CompletionException)x;
223 throw new CompletionException(x);
224 }
225 @SuppressWarnings("unchecked") T tr = (T) r;
226 return tr;
227 }
228
229 /**
230 * Reports result using Future.get conventions.
231 */
232 private static <T> T reportGet(Object r)
233 throws InterruptedException, ExecutionException {
234 if (r == null) // by convention below, null means interrupted
235 throw new InterruptedException();
236 if (r instanceof AltResult) {
237 Throwable x, cause;
238 if ((x = ((AltResult)r).ex) == null)
239 return null;
240 if (x instanceof CancellationException)
241 throw (CancellationException)x;
242 if ((x instanceof CompletionException) &&
243 (cause = x.getCause()) != null)
244 x = cause;
245 throw new ExecutionException(x);
246 }
247 @SuppressWarnings("unchecked") T tr = (T) r;
248 return tr;
249 }
250
251 /* ------------- Async Tasks -------------- */
252
253 /**
254 * A marker interface identifying asynchronous tasks produced by
255 * {@code async} methods. This may be useful for monitoring,
256 * debugging, and tracking asynchronous activities.
257 *
258 * @since 1.8
259 */
260 public static interface AsynchronousCompletionTask {
261 }
262
263 /**
264 * Base class for tasks that can act as either FJ or plain
265 * Runnables. Abstract method compute calls an associated "now"
266 * method. Method exec calls compute if its CompletableFuture is
267 * not already done, and runs completions if done. Fields are not
268 * in general final and can be nulled out after use (but most
269 * currently are not). Classes include serialVersionUIDs even
270 * though they are currently never serialized.
271 */
272 abstract static class Async<T> extends ForkJoinTask<Void>
273 implements Runnable, AsynchronousCompletionTask {
274 CompletableFuture<T> dep; // the CompletableFuture to trigger
275 Async(CompletableFuture<T> dep) { this.dep = dep; }
276
277 abstract void compute(); // call the associated "now" method
278
279 public final boolean exec() {
280 CompletableFuture<T> d;
281 if ((d = dep) != null) {
282 if (d.result == null) // suppress if cancelled
283 compute();
284 if (d.result != null)
285 d.postComplete();
286 dep = null; // detach
287 }
288 return true;
289 }
290 public final Void getRawResult() { return null; }
291 public final void setRawResult(Void v) { }
292 public final void run() { exec(); }
293 private static final long serialVersionUID = 5232453952276885070L;
294 }
295
296 /**
297 * Default executor -- ForkJoinPool.commonPool() unless it cannot
298 * support parallelism.
299 */
300 static final Executor asyncPool =
301 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
302 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
303
304 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
305 static final class ThreadPerTaskExecutor implements Executor {
306 public void execute(Runnable r) { new Thread(r).start(); }
307 }
308
309 /**
310 * Null-checks user executor argument, and translates uses of
311 * commonPool to asyncPool in case parallelism disabled.
312 */
313 static Executor screenExecutor(Executor e) {
314 if (e == null) throw new NullPointerException();
315 return (e == ForkJoinPool.commonPool()) ? asyncPool : e;
316 }
317
318 /* ------------- Completions -------------- */
319
320 abstract static class Completion { // See above
321 volatile Completion next; // Treiber stack link
322
323 /**
324 * Performs completion action if enabled, returning a
325 * completed dependent CompletableFuture, if one exists.
326 */
327 abstract CompletableFuture<?> tryAct();
328 }
329
330 /**
331 * Triggers all reachable enabled dependents. Call only when
332 * known to be done.
333 */
334 final void postComplete() {
335 /*
336 * On each step, variable f holds current completions to pop
337 * and run. It is extended along only one path at a time,
338 * pushing others to avoid StackOverflowErrors on recursion.
339 */
340 CompletableFuture<?> f = this; Completion h;
341 while ((h = f.completions) != null ||
342 (f != this && (h = (f = this).completions) != null)) {
343 CompletableFuture<?> d; Completion t;
344 if (f.casCompletions(h, t = h.next)) {
345 if (t != null) {
346 if (f != this) { // push
347 do {} while (!casCompletions(h.next = completions, h));
348 continue;
349 }
350 h.next = null; // detach
351 }
352 f = (d = h.tryAct()) == null ? this : d;
353 }
354 }
355 }
356
357 /* ------------- One-source Completions -------------- */
358
359 /**
360 * A Completion with a source and dependent. The "dep" field acts
361 * as a claim, nulled out to disable further attempts to
362 * trigger. Fields can only be observed by other threads upon
363 * successful push; and should be nulled out after claim.
364 */
365 abstract static class UniCompletion<T> extends Completion {
366 Executor async; // executor to use (null if none)
367 CompletableFuture<T> dep; // the dependent to complete
368 CompletableFuture<?> src; // source of value for tryAct
369
370 UniCompletion(Executor async, CompletableFuture<T> dep,
371 CompletableFuture<?> src) {
372 this.async = async; this.dep = dep; this.src = src;
373 }
374
375 /** Tries to claim completion action by CASing dep to null */
376 final boolean claim(CompletableFuture<T> d) {
377 return UNSAFE.compareAndSwapObject(this, DEP, d, null);
378 }
379
380 private static final sun.misc.Unsafe UNSAFE;
381 private static final long DEP;
382 static {
383 try {
384 UNSAFE = sun.misc.Unsafe.getUnsafe();
385 Class<?> k = UniCompletion.class;
386 DEP = UNSAFE.objectFieldOffset
387 (k.getDeclaredField("dep"));
388 } catch (Exception x) {
389 throw new Error(x);
390 }
391 }
392 }
393
394 /** Pushes c on to completions, and triggers c if done. */
395 private void unipush(Completion c) {
396 if (c != null) {
397 CompletableFuture<?> d;
398 while (result == null && !casCompletions(c.next = completions, c))
399 c.next = null; // clear on CAS failure
400 if ((d = c.tryAct()) != null) // cover races
401 d.postComplete();
402 if (result != null) // clean stack
403 postComplete();
404 }
405 }
406
407 // Immediate, async, delayed, and routing support for Function/apply
408
409 static <T,U> void nowApply(Executor e, CompletableFuture<U> d, Object r,
410 Function<? super T,? extends U> f) {
411 if (d != null && f != null) {
412 T t; U u; Throwable x;
413 if (r instanceof AltResult) {
414 t = null;
415 x = ((AltResult)r).ex;
416 }
417 else {
418 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
419 x = null;
420 }
421 if (x == null) {
422 try {
423 if (e != null) {
424 e.execute(new AsyncApply<T,U>(d, t, f));
425 return;
426 }
427 u = f.apply(t);
428 } catch (Throwable ex) {
429 x = ex;
430 u = null;
431 }
432 }
433 else
434 u = null;
435 d.internalComplete(encodeOutcome(u, x));
436 }
437 }
438
439 static final class AsyncApply<T,U> extends Async<U> {
440 T arg; Function<? super T,? extends U> fn;
441 AsyncApply(CompletableFuture<U> dep, T arg,
442 Function<? super T,? extends U> fn) {
443 super(dep); this.arg = arg; this.fn = fn;
444 }
445 final void compute() { nowApply(null, dep, arg, fn); }
446 private static final long serialVersionUID = 5232453952276885070L;
447 }
448
449 static final class DelayedApply<T,U> extends UniCompletion<U> {
450 Function<? super T,? extends U> fn;
451 DelayedApply(Executor async, CompletableFuture<U> dep,
452 CompletableFuture<?> src,
453 Function<? super T,? extends U> fn) {
454 super(async, dep, src); this.fn = fn;
455 }
456 final CompletableFuture<?> tryAct() {
457 CompletableFuture<U> d; CompletableFuture<?> a; Object r;
458 if ((d = dep) != null && (a = src) != null &&
459 (r = a.result) != null && claim(d)) {
460 nowApply(async, d, r, fn);
461 src = null; fn = null;
462 if (d.result != null) return d;
463 }
464 return null;
465 }
466 }
467
468 private <U> CompletableFuture<U> doThenApply(
469 Function<? super T,? extends U> fn, Executor e) {
470 if (fn == null) throw new NullPointerException();
471 CompletableFuture<U> d = new CompletableFuture<U>();
472 Object r = result;
473 if (r == null)
474 unipush(new DelayedApply<T,U>(e, d, this, fn));
475 else
476 nowApply(e, d, r, fn);
477 return d;
478 }
479
480 // Consumer/accept
481
482 static <T,U> void nowAccept(Executor e, CompletableFuture<U> d,
483 Object r, Consumer<? super T> f) {
484 if (d != null && f != null) {
485 T t; Throwable x;
486 if (r instanceof AltResult) {
487 t = null;
488 x = ((AltResult)r).ex;
489 }
490 else {
491 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
492 x = null;
493 }
494 if (x == null) {
495 try {
496 if (e != null) {
497 e.execute(new AsyncAccept<T,U>(d, t, f));
498 return;
499 }
500 f.accept(t);
501 } catch (Throwable ex) {
502 x = ex;
503 }
504 }
505 d.internalComplete(encodeOutcome(null, x));
506 }
507 }
508
509 static final class AsyncAccept<T,U> extends Async<U> {
510 T arg; Consumer<? super T> fn;
511 AsyncAccept(CompletableFuture<U> dep, T arg,
512 Consumer<? super T> fn) {
513 super(dep); this.arg = arg; this.fn = fn;
514 }
515 final void compute() { nowAccept(null, dep, arg, fn); }
516 private static final long serialVersionUID = 5232453952276885070L;
517 }
518
519 static final class DelayedAccept<T> extends UniCompletion<Void> {
520 Consumer<? super T> fn;
521 DelayedAccept(Executor async, CompletableFuture<Void> dep,
522 CompletableFuture<?> src, Consumer<? super T> fn) {
523 super(async, dep, src); this.fn = fn;
524 }
525 final CompletableFuture<?> tryAct() {
526 CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
527 if ((d = dep) != null && (a = src) != null &&
528 (r = a.result) != null && claim(d)) {
529 nowAccept(async, d, r, fn);
530 src = null; fn = null;
531 if (d.result != null) return d;
532 }
533 return null;
534 }
535 }
536
537 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
538 Executor e) {
539 if (fn == null) throw new NullPointerException();
540 CompletableFuture<Void> d = new CompletableFuture<Void>();
541 Object r = result;
542 if (r == null)
543 unipush(new DelayedAccept<T>(e, d, this, fn));
544 else
545 nowAccept(e, d, r, fn);
546 return d;
547 }
548
549 // Runnable/run
550
551 static <T> void nowRun(Executor e, CompletableFuture<T> d, Object r,
552 Runnable f) {
553 if (d != null && f != null) {
554 Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
555 if (x == null) {
556 try {
557 if (e != null) {
558 e.execute(new AsyncRun<T>(d, f));
559 return;
560 }
561 f.run();
562 } catch (Throwable ex) {
563 x = ex;
564 }
565 }
566 d.internalComplete(encodeOutcome(null, x));
567 }
568 }
569
570 static final class AsyncRun<T> extends Async<T> {
571 Runnable fn;
572 AsyncRun(CompletableFuture<T> dep, Runnable fn) {
573 super(dep); this.fn = fn;
574 }
575 final void compute() { nowRun(null, dep, null, fn); }
576 private static final long serialVersionUID = 5232453952276885070L;
577 }
578
579 static final class DelayedRun extends UniCompletion<Void> {
580 Runnable fn;
581 DelayedRun(Executor async, CompletableFuture<Void> dep,
582 CompletableFuture<?> src, Runnable fn) {
583 super(async, dep, src); this.fn = fn;
584 }
585 final CompletableFuture<?> tryAct() {
586 CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
587 if ((d = dep) != null && (a = src) != null &&
588 (r = a.result) != null && claim(d)) {
589 nowRun(async, d, r, fn);
590 src = null; fn = null; // clear refs
591 if (d.result != null) return d;
592 }
593 return null;
594 }
595 }
596
597 private CompletableFuture<Void> doThenRun(Runnable fn, Executor e) {
598 if (fn == null) throw new NullPointerException();
599 CompletableFuture<Void> d = new CompletableFuture<Void>();
600 Object r = result;
601 if (r == null)
602 unipush(new DelayedRun(e, d, this, fn));
603 else
604 nowRun(e, d, r, fn);
605 return d;
606 }
607
608 // Supplier/get
609
610 static <T> void nowSupply(CompletableFuture<T> d, Supplier<T> f) {
611 if (d != null && f != null) {
612 T t; Throwable x;
613 try {
614 t = f.get();
615 x = null;
616 } catch (Throwable ex) {
617 x = ex;
618 t = null;
619 }
620 d.internalComplete(encodeOutcome(t, x));
621 }
622 }
623
624 static final class AsyncSupply<T> extends Async<T> {
625 Supplier<T> fn;
626 AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
627 super(dep); this.fn = fn;
628 }
629 final void compute() { nowSupply(dep, fn); }
630 private static final long serialVersionUID = 5232453952276885070L;
631 }
632
633 // WhenComplete
634
635 static <T> void nowWhen(Executor e, CompletableFuture<T> d, Object r,
636 BiConsumer<? super T,? super Throwable> f) {
637 if (d != null && f != null) {
638 T t; Throwable x, dx;
639 if (r instanceof AltResult) {
640 t = null;
641 x = ((AltResult)r).ex;
642 }
643 else {
644 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
645 x = null;
646 }
647 try {
648 if (e != null) {
649 e.execute(new AsyncWhen<T>(d, r, f));
650 return;
651 }
652 f.accept(t, x);
653 dx = null;
654 } catch (Throwable ex) {
655 dx = ex;
656 }
657 d.internalComplete(encodeOutcome(t, x != null ? x : dx));
658 }
659 }
660
661 static final class AsyncWhen<T> extends Async<T> {
662 Object arg; BiConsumer<? super T,? super Throwable> fn;
663 AsyncWhen(CompletableFuture<T> dep, Object arg,
664 BiConsumer<? super T,? super Throwable> fn) {
665 super(dep); this.arg = arg; this.fn = fn;
666 }
667 final void compute() { nowWhen(null, dep, arg, fn); }
668 private static final long serialVersionUID = 5232453952276885070L;
669 }
670
671 static final class DelayedWhen<T> extends UniCompletion<T> {
672 BiConsumer<? super T, ? super Throwable> fn;
673 DelayedWhen(Executor async, CompletableFuture<T> dep,
674 CompletableFuture<?> src,
675 BiConsumer<? super T, ? super Throwable> fn) {
676 super(async, dep, src); this.fn = fn;
677 }
678 final CompletableFuture<?> tryAct() {
679 CompletableFuture<T> d; CompletableFuture<?> a; Object r;
680 if ((d = dep) != null && (a = src) != null &&
681 (r = a.result) != null && claim(d)) {
682 nowWhen(async, d, r, fn);
683 src = null; fn = null;
684 if (d.result != null) return d;
685 }
686 return null;
687 }
688 }
689
690 private CompletableFuture<T> doWhenComplete(
691 BiConsumer<? super T, ? super Throwable> fn, Executor e) {
692 if (fn == null) throw new NullPointerException();
693 CompletableFuture<T> d = new CompletableFuture<T>();
694 Object r = result;
695 if (r == null)
696 unipush(new DelayedWhen<T>(e, d, this, fn));
697 else
698 nowWhen(e, d, r, fn);
699 return d;
700 }
701
702 // Handle
703
704 static <T,U> void nowHandle(Executor e, CompletableFuture<U> d, Object r,
705 BiFunction<? super T, Throwable, ? extends U> f) {
706 if (d != null && f != null) {
707 T t; U u; Throwable x, dx;
708 if (r instanceof AltResult) {
709 t = null;
710 x = ((AltResult)r).ex;
711 }
712 else {
713 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
714 x = null;
715 }
716 try {
717 if (e != null) {
718 e.execute(new AsyncCombine<T,Throwable,U>(d, t, x, f));
719 return;
720 }
721 u = f.apply(t, x);
722 dx = null;
723 } catch (Throwable ex) {
724 dx = ex;
725 u = null;
726 }
727 d.internalComplete(encodeOutcome(u, dx));
728 }
729 }
730
731 static final class DelayedHandle<T,U> extends UniCompletion<U> {
732 BiFunction<? super T, Throwable, ? extends U> fn;
733 DelayedHandle(Executor async, CompletableFuture<U> dep,
734 CompletableFuture<?> src,
735 BiFunction<? super T, Throwable, ? extends U> fn) {
736 super(async, dep, src); this.fn = fn;
737 }
738 final CompletableFuture<?> tryAct() {
739 CompletableFuture<U> d; CompletableFuture<?> a; Object r;
740 if ((d = dep) != null && (a = src) != null &&
741 (r = a.result) != null && claim(d)) {
742 nowHandle(async, d, r, fn);
743 src = null; fn = null;
744 if (d.result != null) return d;
745 }
746 return null;
747 }
748 }
749
750 private <U> CompletableFuture<U> doHandle(
751 BiFunction<? super T, Throwable, ? extends U> fn,
752 Executor e) {
753 if (fn == null) throw new NullPointerException();
754 CompletableFuture<U> d = new CompletableFuture<U>();
755 Object r = result;
756 if (r == null)
757 unipush(new DelayedHandle<T,U>(e, d, this, fn));
758 else
759 nowHandle(e, d, r, fn);
760 return d;
761 }
762
763 // Exceptionally
764
765 static <T> void nowExceptionally(CompletableFuture<T> d, Object r,
766 Function<? super Throwable, ? extends T> f) {
767 if (d != null && f != null) {
768 T t; Throwable x, dx;
769 if ((r instanceof AltResult) && (x = ((AltResult)r).ex) != null) {
770 try {
771 t = f.apply(x);
772 dx = null;
773 } catch (Throwable ex) {
774 dx = ex;
775 t = null;
776 }
777 }
778 else {
779 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
780 dx = null;
781 }
782 d.internalComplete(encodeOutcome(t, dx));
783 }
784 }
785
786 static final class DelayedExceptionally<T> extends UniCompletion<T> {
787 Function<? super Throwable, ? extends T> fn;
788 DelayedExceptionally(CompletableFuture<T> dep, CompletableFuture<?> src,
789 Function<? super Throwable, ? extends T> fn) {
790 super(null, dep, src); this.fn = fn;
791 }
792 final CompletableFuture<?> tryAct() {
793 CompletableFuture<T> d; CompletableFuture<?> a; Object r;
794 if ((d = dep) != null && (a = src) != null &&
795 (r = a.result) != null && claim(d)) {
796 nowExceptionally(d, r, fn);
797 src = null; fn = null;
798 if (d.result != null) return d;
799 }
800 return null;
801 }
802 }
803
804 private CompletableFuture<T> doExceptionally(
805 Function<Throwable, ? extends T> fn) {
806 if (fn == null) throw new NullPointerException();
807 CompletableFuture<T> d = new CompletableFuture<T>();
808 Object r = result;
809 if (r == null)
810 unipush(new DelayedExceptionally<T>(d, this, fn));
811 else
812 nowExceptionally(d, r, fn);
813 return d;
814 }
815
816 // Identity function used by nowCompose and anyOf
817
818 static <T> void nowCopy(CompletableFuture<T> d, Object r) {
819 if (d != null && d.result == null) {
820 Throwable x;
821 d.internalComplete(((r instanceof AltResult) &&
822 (x = ((AltResult)r).ex) != null &&
823 !(x instanceof CompletionException)) ?
824 new AltResult(new CompletionException(x)): r);
825 }
826 }
827
828 static final class DelayedCopy<T> extends UniCompletion<T> {
829 DelayedCopy(CompletableFuture<T> dep, CompletableFuture<?> src) {
830 super(null, dep, src);
831 }
832 final CompletableFuture<?> tryAct() {
833 CompletableFuture<T> d; CompletableFuture<?> a; Object r;
834 if ((d = dep) != null && (a = src) != null &&
835 (r = a.result) != null && claim(d)) {
836 nowCopy(d, r);
837 src = null;
838 if (d.result != null) return d;
839 }
840 return null;
841 }
842 }
843
844 // Compose
845
846 static <T,U> void nowCompose(Executor e, CompletableFuture<U> d, Object r,
847 Function<? super T, ? extends CompletionStage<U>> f) {
848 if (d != null && f != null) {
849 T t; Throwable x;
850 if (r instanceof AltResult) {
851 t = null;
852 x = ((AltResult)r).ex;
853 }
854 else {
855 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
856 x = null;
857 }
858 if (x == null) {
859 try {
860 if (e != null)
861 e.execute(new AsyncCompose<T,U>(d, t, f));
862 else {
863 CompletableFuture<U> c =
864 f.apply(t).toCompletableFuture();
865 Object s = c.result;
866 if (s == null)
867 c.unipush(new DelayedCopy<U>(d, c));
868 else
869 nowCopy(d, s);
870 }
871 return;
872 } catch (Throwable ex) {
873 x = ex;
874 }
875 }
876 d.internalComplete(encodeOutcome(null, x));
877 }
878 }
879
880 static final class AsyncCompose<T,U> extends Async<U> {
881 T arg; Function<? super T, ? extends CompletionStage<U>> fn;
882 AsyncCompose(CompletableFuture<U> dep, T arg,
883 Function<? super T, ? extends CompletionStage<U>> fn) {
884 super(dep); this.arg = arg; this.fn = fn;
885 }
886 final void compute() { nowCompose(null, dep, arg, fn); }
887 private static final long serialVersionUID = 5232453952276885070L;
888 }
889
890 static final class DelayedCompose<T,U> extends UniCompletion<U> {
891 Function<? super T, ? extends CompletionStage<U>> fn;
892 DelayedCompose(Executor async, CompletableFuture<U> dep,
893 CompletableFuture<?> src,
894 Function<? super T, ? extends CompletionStage<U>> fn) {
895 super(async, dep, src); this.fn = fn;
896 }
897 final CompletableFuture<?> tryAct() {
898 CompletableFuture<U> d; CompletableFuture<?> a; Object r;
899 if ((d = dep) != null && (a = src) != null &&
900 (r = a.result) != null && claim(d)) {
901 nowCompose(async, d, r, fn);
902 src = null; fn = null;
903 if (d.result != null) return d;
904 }
905 return null;
906 }
907 }
908
909 private <U> CompletableFuture<U> doThenCompose(
910 Function<? super T, ? extends CompletionStage<U>> fn, Executor e) {
911 if (fn == null) throw new NullPointerException();
912 Object r = result;
913 if (r == null || e != null) {
914 CompletableFuture<U> d = new CompletableFuture<U>();
915 if (r == null)
916 unipush(new DelayedCompose<T,U>(e, d, this, fn));
917 else
918 nowCompose(e, d, r, fn);
919 return d;
920 }
921 else { // try to return function result
922 T t; Throwable x;
923 if (r instanceof AltResult) {
924 t = null;
925 x = ((AltResult)r).ex;
926 }
927 else {
928 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
929 x = null;
930 }
931 if (x == null) {
932 try {
933 return fn.apply(t).toCompletableFuture();
934 } catch (Throwable ex) {
935 x = ex;
936 }
937 }
938 CompletableFuture<U> d = new CompletableFuture<U>();
939 d.result = encodeOutcome(null, x);
940 return d;
941 }
942 }
943
944 /* ------------- Two-source Completions -------------- */
945
946 /** A Completion with two sources */
947 abstract static class BiCompletion<T> extends UniCompletion<T> {
948 CompletableFuture<?> snd; // second source for tryAct
949 BiCompletion(Executor async, CompletableFuture<T> dep,
950 CompletableFuture<?> src, CompletableFuture<?> snd) {
951 super(async, dep, src); this.snd = snd;
952 }
953 }
954
955 /** A Completion delegating to another Completion */
956 static final class CoCompletion extends Completion {
957 Completion completion;
958 CoCompletion(Completion completion) {
959 this.completion = completion;
960 }
961 final CompletableFuture<?> tryAct() {
962 Completion c; CompletableFuture<?> d;
963 if ((c = completion) == null || (d = c.tryAct()) == null)
964 return null;
965 completion = null; // detach
966 return d;
967 }
968 }
969
970 /* ------------- Two-source Anded -------------- */
971
972 /** Pushes c on to completions and o's completions unless both done. */
973 private void bipushAnded(CompletableFuture<?> o, Completion c) {
974 if (c != null && o != null) {
975 Object r; CompletableFuture<?> d;
976 while ((r = result) == null &&
977 !casCompletions(c.next = completions, c))
978 c.next = null;
979 if (o.result == null) {
980 Completion q = (r != null) ? c : new CoCompletion(c);
981 while (o.result == null &&
982 !o.casCompletions(q.next = o.completions, q))
983 q.next = null;
984 }
985 if ((d = c.tryAct()) != null)
986 d.postComplete();
987 if (o.result != null)
988 o.postComplete();
989 if (result != null)
990 postComplete();
991 }
992 }
993
994 // BiFunction/combine
995
996 static <T,U,V> void nowCombine(Executor e, CompletableFuture<V> d,
997 Object r, Object s,
998 BiFunction<? super T,? super U,? extends V> f) {
999 if (d != null && f != null) {
1000 T t; U u; V v; Throwable x;
1001 if (r instanceof AltResult) {
1002 t = null;
1003 x = ((AltResult)r).ex;
1004 }
1005 else {
1006 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
1007 x = null;
1008 }
1009 if (x != null)
1010 u = null;
1011 else if (s instanceof AltResult) {
1012 x = ((AltResult)s).ex;
1013 u = null;
1014 }
1015 else {
1016 @SuppressWarnings("unchecked") U us = (U) s; u = us;
1017 }
1018 if (x == null) {
1019 try {
1020 if (e != null) {
1021 e.execute(new AsyncCombine<T,U,V>(d, t, u, f));
1022 return;
1023 }
1024 v = f.apply(t, u);
1025 } catch (Throwable ex) {
1026 x = ex;
1027 v = null;
1028 }
1029 }
1030 else
1031 v = null;
1032 d.internalComplete(encodeOutcome(v, x));
1033 }
1034 }
1035
1036 static final class AsyncCombine<T,U,V> extends Async<V> {
1037 T arg1; U arg2; BiFunction<? super T,? super U,? extends V> fn;
1038 AsyncCombine(CompletableFuture<V> dep, T arg1, U arg2,
1039 BiFunction<? super T,? super U,? extends V> fn) {
1040 super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
1041 }
1042 final void compute() { nowCombine(null, dep, arg1, arg2, fn); }
1043 private static final long serialVersionUID = 5232453952276885070L;
1044 }
1045
1046 static final class DelayedCombine<T,U,V> extends BiCompletion<V> {
1047 BiFunction<? super T,? super U,? extends V> fn;
1048 DelayedCombine(Executor async, CompletableFuture<V> dep,
1049 CompletableFuture<?> src, CompletableFuture<?> snd,
1050 BiFunction<? super T,? super U,? extends V> fn) {
1051 super(async, dep, src, snd); this.fn = fn;
1052 }
1053 final CompletableFuture<?> tryAct() {
1054 CompletableFuture<V> d; CompletableFuture<?> a, b; Object r, s;
1055 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1056 (r = a.result) != null && (s = b.result) != null && claim(d)) {
1057 nowCombine(async, d, r, s, fn);
1058 src = null; snd = null; fn = null;
1059 if (d.result != null) return d;
1060 }
1061 return null;
1062 }
1063 }
1064
1065 private <U,V> CompletableFuture<V> doThenCombine(
1066 CompletableFuture<? extends U> o,
1067 BiFunction<? super T,? super U,? extends V> fn,
1068 Executor e) {
1069 if (o == null || fn == null) throw new NullPointerException();
1070 CompletableFuture<V> d = new CompletableFuture<V>();
1071 Object r = result, s = o.result;
1072 if (r == null || s == null)
1073 bipushAnded(o, new DelayedCombine<T,U,V>(e, d, this, o, fn));
1074 else
1075 nowCombine(e, d, r, s, fn);
1076 return d;
1077 }
1078
1079 // BiConsumer/AcceptBoth
1080
1081 static <T,U,V> void nowAcceptBoth(Executor e, CompletableFuture<V> d,
1082 Object r, Object s,
1083 BiConsumer<? super T,? super U> f) {
1084 if (d != null && f != null) {
1085 T t; U u; Throwable x;
1086 if (r instanceof AltResult) {
1087 t = null;
1088 x = ((AltResult)r).ex;
1089 }
1090 else {
1091 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
1092 x = null;
1093 }
1094 if (x != null)
1095 u = null;
1096 else if (s instanceof AltResult) {
1097 x = ((AltResult)s).ex;
1098 u = null;
1099 }
1100 else {
1101 @SuppressWarnings("unchecked") U us = (U) s; u = us;
1102 }
1103 if (x == null) {
1104 try {
1105 if (e != null) {
1106 e.execute(new AsyncAcceptBoth<T,U,V>(d, t, u, f));
1107 return;
1108 }
1109 f.accept(t, u);
1110 } catch (Throwable ex) {
1111 x = ex;
1112 }
1113 }
1114 d.internalComplete(encodeOutcome(null, x));
1115 }
1116 }
1117
1118 static final class AsyncAcceptBoth<T,U,V> extends Async<V> {
1119 T arg1; U arg2; BiConsumer<? super T,? super U> fn;
1120 AsyncAcceptBoth(CompletableFuture<V> dep, T arg1, U arg2,
1121 BiConsumer<? super T,? super U> fn) {
1122 super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
1123 }
1124 final void compute() { nowAcceptBoth(null, dep, arg1, arg2, fn); }
1125 private static final long serialVersionUID = 5232453952276885070L;
1126 }
1127
1128 static final class DelayedAcceptBoth<T,U> extends BiCompletion<Void> {
1129 BiConsumer<? super T,? super U> fn;
1130 DelayedAcceptBoth(Executor async, CompletableFuture<Void> dep,
1131 CompletableFuture<?> src, CompletableFuture<?> snd,
1132 BiConsumer<? super T,? super U> fn) {
1133 super(async, dep, src, snd); this.fn = fn;
1134 }
1135 final CompletableFuture<?> tryAct() {
1136 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1137 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1138 (r = a.result) != null && (s = b.result) != null && claim(d)) {
1139 nowAcceptBoth(async, d, r, s, fn);
1140 src = null; snd = null; fn = null;
1141 if (d.result != null) return d;
1142 }
1143 return null;
1144 }
1145 }
1146
1147 private <U> CompletableFuture<Void> doThenAcceptBoth(
1148 CompletableFuture<? extends U> o,
1149 BiConsumer<? super T, ? super U> fn,
1150 Executor e) {
1151 if (o == null || fn == null) throw new NullPointerException();
1152 CompletableFuture<Void> d = new CompletableFuture<Void>();
1153 Object r = result, s = o.result;
1154 if (r == null || s == null)
1155 bipushAnded(o, new DelayedAcceptBoth<T,U>(e, d, this, o, fn));
1156 else
1157 nowAcceptBoth(e, d, r, s, fn);
1158 return d;
1159 }
1160
1161 // Runnable/both
1162
1163 static final class DelayedRunAfterBoth extends BiCompletion<Void> {
1164 Runnable fn;
1165 DelayedRunAfterBoth(Executor async, CompletableFuture<Void> dep,
1166 CompletableFuture<?> src, CompletableFuture<?> snd,
1167 Runnable fn) {
1168 super(async, dep, src, snd); this.fn = fn;
1169 }
1170 final CompletableFuture<?> tryAct() {
1171 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1172 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1173 (r = a.result) != null && (s = b.result) != null && claim(d)) {
1174 Throwable x = (r instanceof AltResult) ?
1175 ((AltResult)r).ex : null;
1176 nowRun(async, d, (x == null) ? s : r, fn);
1177 src = null; snd = null; fn = null;
1178 if (d.result != null) return d;
1179 }
1180 return null;
1181 }
1182 }
1183
1184 private CompletableFuture<Void> doRunAfterBoth(
1185 CompletableFuture<?> o, Runnable fn, Executor e) {
1186 if (o == null || fn == null) throw new NullPointerException();
1187 CompletableFuture<Void> d = new CompletableFuture<Void>();
1188 Object r = result, s = o.result;
1189 if (r == null || s == null)
1190 bipushAnded(o, new DelayedRunAfterBoth(e, d, this, o, fn));
1191 else {
1192 Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1193 nowRun(e, d, (x == null) ? s : r, fn);
1194 }
1195 return d;
1196 }
1197
1198 // allOf
1199
1200 static <T> void nowAnd(CompletableFuture<T> d, Object r, Object s) {
1201 if (d != null) {
1202 Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1203 if (x == null && (s instanceof AltResult))
1204 x = ((AltResult)s).ex;
1205 d.internalComplete(encodeOutcome(null, x));
1206 }
1207 }
1208
1209 static final class DelayedAnd extends BiCompletion<Void> {
1210 DelayedAnd(CompletableFuture<Void> dep,
1211 CompletableFuture<?> src, CompletableFuture<?> snd) {
1212 super(null, dep, src, snd);
1213 }
1214 final CompletableFuture<?> tryAct() {
1215 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1216 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1217 (r = a.result) != null && (s = b.result) != null && claim(d)) {
1218 nowAnd(d, r, s);
1219 src = null; snd = null;
1220 if (d.result != null) return d;
1221 }
1222 return null;
1223 }
1224 }
1225
1226 /** Recursively constructs a tree of And completions */
1227 private static CompletableFuture<Void> doAllOf(CompletableFuture<?>[] cfs,
1228 int lo, int hi) {
1229 CompletableFuture<Void> d = new CompletableFuture<Void>();
1230 if (lo > hi) // empty
1231 d.result = NIL;
1232 else {
1233 int mid = (lo + hi) >>> 1;
1234 CompletableFuture<?> fst = (lo == mid ? cfs[lo] :
1235 doAllOf(cfs, lo, mid));
1236 CompletableFuture<?> snd = (lo == hi ? fst : // and fst with self
1237 (hi == mid+1) ? cfs[hi] :
1238 doAllOf(cfs, mid+1, hi));
1239 Object r = fst.result, s = snd.result; // throw NPE if null elements
1240 if (r == null || s == null) {
1241 DelayedAnd a = new DelayedAnd(d, fst, snd);
1242 if (fst == snd)
1243 fst.unipush(a);
1244 else
1245 fst.bipushAnded(snd, a);
1246 }
1247 else
1248 nowAnd(d, r, s);
1249 }
1250 return d;
1251 }
1252
1253 /* ------------- Two-source Ored -------------- */
1254
1255 /** Pushes c on to completions and o's completions unless either done. */
1256 private void bipushOred(CompletableFuture<?> o, Completion c) {
1257 if (c != null && o != null) {
1258 CompletableFuture<?> d;
1259 while (o.result == null && result == null) {
1260 if (casCompletions(c.next = completions, c)) {
1261 CoCompletion q = new CoCompletion(c);
1262 while (result == null && o.result == null &&
1263 !o.casCompletions(q.next = o.completions, q))
1264 q.next = null;
1265 break;
1266 }
1267 c.next = null;
1268 }
1269 if ((d = c.tryAct()) != null)
1270 d.postComplete();
1271 if (o.result != null)
1272 o.postComplete();
1273 if (result != null)
1274 postComplete();
1275 }
1276 }
1277
1278 // Function/applyEither
1279
1280 static final class DelayedApplyToEither<T,U> extends BiCompletion<U> {
1281 Function<? super T,? extends U> fn;
1282 DelayedApplyToEither(Executor async, CompletableFuture<U> dep,
1283 CompletableFuture<?> src, CompletableFuture<?> snd,
1284 Function<? super T,? extends U> fn) {
1285 super(async, dep, src, snd); this.fn = fn;
1286 }
1287 final CompletableFuture<?> tryAct() {
1288 CompletableFuture<U> d; CompletableFuture<?> a, b; Object r;
1289 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1290 ((r = a.result) != null || (r = b.result) != null) &&
1291 claim(d)) {
1292 nowApply(async, d, r, fn);
1293 src = null; snd = null; fn = null;
1294 if (d.result != null) return d;
1295 }
1296 return null;
1297 }
1298 }
1299
1300 private <U> CompletableFuture<U> doApplyToEither(
1301 CompletableFuture<? extends T> o,
1302 Function<? super T, U> fn, Executor e) {
1303 if (o == null || fn == null) throw new NullPointerException();
1304 CompletableFuture<U> d = new CompletableFuture<U>();
1305 Object r = result;
1306 if (r == null && (r = o.result) == null)
1307 bipushOred(o, new DelayedApplyToEither<T,U>(e, d, this, o, fn));
1308 else
1309 nowApply(e, d, r, fn);
1310 return d;
1311 }
1312
1313 // Consumer/acceptEither
1314
1315 static final class DelayedAcceptEither<T> extends BiCompletion<Void> {
1316 Consumer<? super T> fn;
1317 DelayedAcceptEither(Executor async, CompletableFuture<Void> dep,
1318 CompletableFuture<?> src, CompletableFuture<?> snd,
1319 Consumer<? super T> fn) {
1320 super(async, dep, src, snd); this.fn = fn;
1321 }
1322 final CompletableFuture<?> tryAct() {
1323 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1324 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1325 ((r = a.result) != null || (r = b.result) != null) &&
1326 claim(d)) {
1327 nowAccept(async, d, r, fn);
1328 src = null; snd = null; fn = null;
1329 if (d.result != null) return d;
1330 }
1331 return null;
1332 }
1333 }
1334
1335 private CompletableFuture<Void> doAcceptEither(
1336 CompletableFuture<? extends T> o,
1337 Consumer<? super T> fn, Executor e) {
1338 if (o == null || fn == null) throw new NullPointerException();
1339 CompletableFuture<Void> d = new CompletableFuture<Void>();
1340 Object r = result;
1341 if (r == null && (r = o.result) == null)
1342 bipushOred(o, new DelayedAcceptEither<T>(e, d, this, o, fn));
1343 else
1344 nowAccept(e, d, r, fn);
1345 return d;
1346 }
1347
1348 // Runnable/runEither
1349
1350 static final class DelayedRunAfterEither extends BiCompletion<Void> {
1351 Runnable fn;
1352 DelayedRunAfterEither(Executor async, CompletableFuture<Void> dep,
1353 CompletableFuture<?> src,
1354 CompletableFuture<?> snd, Runnable fn) {
1355 super(async, dep, src, snd); this.fn = fn;
1356 }
1357 final CompletableFuture<?> tryAct() {
1358 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1359 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1360 ((r = a.result) != null || (r = b.result) != null) &&
1361 claim(d)) {
1362 nowRun(async, d, r, fn);
1363 src = null; snd = null; fn = null;
1364 if (d.result != null) return d;
1365 }
1366 return null;
1367 }
1368 }
1369
1370 private CompletableFuture<Void> doRunAfterEither(
1371 CompletableFuture<?> o, Runnable fn, Executor e) {
1372 if (o == null || fn == null) throw new NullPointerException();
1373 CompletableFuture<Void> d = new CompletableFuture<Void>();
1374 Object r = result;
1375 if (r == null && (r = o.result) == null)
1376 bipushOred(o, new DelayedRunAfterEither(e, d, this, o, fn));
1377 else
1378 nowRun(e, d, r, fn);
1379 return d;
1380 }
1381
1382 /* ------------- Signallers -------------- */
1383
1384 /**
1385 * Heuristic spin value for waitingGet() before blocking on
1386 * multiprocessors
1387 */
1388 static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
1389 1 << 8 : 0);
1390
1391 /**
1392 * Completion for recording and releasing a waiting thread. See
1393 * other classes such as Phaser and SynchronousQueue for more
1394 * detailed explanation. This class implements ManagedBlocker to
1395 * avoid starvation when blocking actions pile up in
1396 * ForkJoinPools.
1397 */
1398 static final class Signaller extends Completion
1399 implements ForkJoinPool.ManagedBlocker {
1400 long nanos; // wait time if timed
1401 final long deadline; // non-zero if timed
1402 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
1403 volatile Thread thread;
1404 Signaller(boolean interruptible, long nanos, long deadline) {
1405 this.thread = Thread.currentThread();
1406 this.interruptControl = interruptible ? 1 : 0;
1407 this.nanos = nanos;
1408 this.deadline = deadline;
1409 }
1410 final CompletableFuture<?> tryAct() {
1411 Thread w = thread;
1412 if (w != null) {
1413 thread = null; // no need to CAS
1414 LockSupport.unpark(w);
1415 }
1416 return null;
1417 }
1418 public boolean isReleasable() {
1419 if (thread == null)
1420 return true;
1421 if (Thread.interrupted()) {
1422 int i = interruptControl;
1423 interruptControl = -1;
1424 if (i > 0)
1425 return true;
1426 }
1427 if (deadline != 0L &&
1428 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
1429 thread = null;
1430 return true;
1431 }
1432 return false;
1433 }
1434 public boolean block() {
1435 if (isReleasable())
1436 return true;
1437 else if (deadline == 0L)
1438 LockSupport.park(this);
1439 else if (nanos > 0L)
1440 LockSupport.parkNanos(this, nanos);
1441 return isReleasable();
1442 }
1443 }
1444
1445 /**
1446 * Returns raw result after waiting, or null if interruptible and
1447 * interrupted.
1448 */
1449 private Object waitingGet(boolean interruptible) {
1450 Signaller q = null;
1451 boolean queued = false;
1452 int spins = SPINS;
1453 Object r;
1454 while ((r = result) == null) {
1455 if (spins > 0) {
1456 if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1457 --spins;
1458 }
1459 else if (q == null)
1460 q = new Signaller(interruptible, 0L, 0L);
1461 else if (!queued)
1462 queued = casCompletions(q.next = completions, q);
1463 else if (interruptible && q.interruptControl < 0) {
1464 q.thread = null;
1465 removeCancelledSignallers();
1466 return null;
1467 }
1468 else if (q.thread != null && result == null) {
1469 try {
1470 ForkJoinPool.managedBlock(q);
1471 } catch (InterruptedException ie) {
1472 q.interruptControl = -1;
1473 }
1474 }
1475 }
1476 if (q != null) {
1477 q.thread = null;
1478 if (q.interruptControl < 0) {
1479 if (interruptible)
1480 r = null; // report interruption
1481 else
1482 Thread.currentThread().interrupt();
1483 }
1484 }
1485 postComplete();
1486 return r;
1487 }
1488
1489 /**
1490 * Returns raw result after waiting, or null if interrupted, or
1491 * throws TimeoutException on timeout.
1492 */
1493 private Object timedGet(long nanos) throws TimeoutException {
1494 if (Thread.interrupted())
1495 return null;
1496 if (nanos <= 0L)
1497 throw new TimeoutException();
1498 long d = System.nanoTime() + nanos;
1499 Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
1500 boolean queued = false;
1501 Object r;
1502 while ((r = result) == null) {
1503 if (!queued)
1504 queued = casCompletions(q.next = completions, q);
1505 else if (q.interruptControl < 0 || q.nanos <= 0L) {
1506 q.thread = null;
1507 removeCancelledSignallers();
1508 if (q.interruptControl < 0)
1509 return null;
1510 throw new TimeoutException();
1511 }
1512 else if (q.thread != null && result == null) {
1513 try {
1514 ForkJoinPool.managedBlock(q);
1515 } catch (InterruptedException ie) {
1516 q.interruptControl = -1;
1517 }
1518 }
1519 }
1520 q.thread = null;
1521 postComplete();
1522 return (q.interruptControl < 0) ? null : r;
1523 }
1524
1525 /**
1526 * Unlinks cancelled Signallers to avoid accumulating garbage.
1527 * Internal nodes are simply unspliced without CAS since it is
1528 * harmless if they are traversed anyway. To avoid effects of
1529 * unsplicing from already removed nodes, the list is retraversed
1530 * in case of an apparent race.
1531 */
1532 private void removeCancelledSignallers() {
1533 for (Completion p = null, q = completions; q != null;) {
1534 Completion s = q.next;
1535 if ((q instanceof Signaller) && ((Signaller)q).thread == null) {
1536 if (p != null) {
1537 p.next = s;
1538 if (!(p instanceof Signaller) ||
1539 ((Signaller)p).thread != null)
1540 break;
1541 }
1542 else if (casCompletions(q, s))
1543 break;
1544 p = null; // restart
1545 q = completions;
1546 }
1547 else {
1548 p = q;
1549 q = s;
1550 }
1551 }
1552 }
1553
1554 /* ------------- public methods -------------- */
1555
1556 /**
1557 * Creates a new incomplete CompletableFuture.
1558 */
1559 public CompletableFuture() {
1560 }
1561
1562 /**
1563 * Returns a new CompletableFuture that is asynchronously completed
1564 * by a task running in the {@link ForkJoinPool#commonPool()} with
1565 * the value obtained by calling the given Supplier.
1566 *
1567 * @param supplier a function returning the value to be used
1568 * to complete the returned CompletableFuture
1569 * @param <U> the function's return type
1570 * @return the new CompletableFuture
1571 */
1572 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1573 if (supplier == null) throw new NullPointerException();
1574 CompletableFuture<U> d = new CompletableFuture<U>();
1575 asyncPool.execute(new AsyncSupply<U>(d, supplier));
1576 return d;
1577 }
1578
1579 /**
1580 * Returns a new CompletableFuture that is asynchronously completed
1581 * by a task running in the given executor with the value obtained
1582 * by calling the given Supplier.
1583 *
1584 * @param supplier a function returning the value to be used
1585 * to complete the returned CompletableFuture
1586 * @param executor the executor to use for asynchronous execution
1587 * @param <U> the function's return type
1588 * @return the new CompletableFuture
1589 */
1590 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1591 Executor executor) {
1592 if (supplier == null) throw new NullPointerException();
1593 Executor e = screenExecutor(executor);
1594 CompletableFuture<U> d = new CompletableFuture<U>();
1595 e.execute(new AsyncSupply<U>(d, supplier));
1596 return d;
1597 }
1598
1599 /**
1600 * Returns a new CompletableFuture that is asynchronously completed
1601 * by a task running in the {@link ForkJoinPool#commonPool()} after
1602 * it runs the given action.
1603 *
1604 * @param runnable the action to run before completing the
1605 * returned CompletableFuture
1606 * @return the new CompletableFuture
1607 */
1608 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1609 if (runnable == null) throw new NullPointerException();
1610 CompletableFuture<Void> d = new CompletableFuture<Void>();
1611 asyncPool.execute(new AsyncRun<Void>(d, runnable));
1612 return d;
1613 }
1614
1615 /**
1616 * Returns a new CompletableFuture that is asynchronously completed
1617 * by a task running in the given executor after it runs the given
1618 * action.
1619 *
1620 * @param runnable the action to run before completing the
1621 * returned CompletableFuture
1622 * @param executor the executor to use for asynchronous execution
1623 * @return the new CompletableFuture
1624 */
1625 public static CompletableFuture<Void> runAsync(Runnable runnable,
1626 Executor executor) {
1627 if (runnable == null) throw new NullPointerException();
1628 Executor e = screenExecutor(executor);
1629 CompletableFuture<Void> d = new CompletableFuture<Void>();
1630 e.execute(new AsyncRun<Void>(d, runnable));
1631 return d;
1632 }
1633
1634 /**
1635 * Returns a new CompletableFuture that is already completed with
1636 * the given value.
1637 *
1638 * @param value the value
1639 * @param <U> the type of the value
1640 * @return the completed CompletableFuture
1641 */
1642 public static <U> CompletableFuture<U> completedFuture(U value) {
1643 CompletableFuture<U> d = new CompletableFuture<U>();
1644 d.result = (value == null) ? NIL : value;
1645 return d;
1646 }
1647
1648 /**
1649 * Returns {@code true} if completed in any fashion: normally,
1650 * exceptionally, or via cancellation.
1651 *
1652 * @return {@code true} if completed
1653 */
1654 public boolean isDone() {
1655 return result != null;
1656 }
1657
1658 /**
1659 * Waits if necessary for this future to complete, and then
1660 * returns its result.
1661 *
1662 * @return the result value
1663 * @throws CancellationException if this future was cancelled
1664 * @throws ExecutionException if this future completed exceptionally
1665 * @throws InterruptedException if the current thread was interrupted
1666 * while waiting
1667 */
1668 public T get() throws InterruptedException, ExecutionException {
1669 Object r;
1670 return reportGet((r = result) == null ? waitingGet(true) : r);
1671 }
1672
1673 /**
1674 * Waits if necessary for at most the given time for this future
1675 * to complete, and then returns its result, if available.
1676 *
1677 * @param timeout the maximum time to wait
1678 * @param unit the time unit of the timeout argument
1679 * @return the result value
1680 * @throws CancellationException if this future was cancelled
1681 * @throws ExecutionException if this future completed exceptionally
1682 * @throws InterruptedException if the current thread was interrupted
1683 * while waiting
1684 * @throws TimeoutException if the wait timed out
1685 */
1686 public T get(long timeout, TimeUnit unit)
1687 throws InterruptedException, ExecutionException, TimeoutException {
1688 Object r;
1689 long nanos = unit.toNanos(timeout);
1690 return reportGet((r = result) == null ? timedGet(nanos) : r);
1691 }
1692
1693 /**
1694 * Returns the result value when complete, or throws an
1695 * (unchecked) exception if completed exceptionally. To better
1696 * conform with the use of common functional forms, if a
1697 * computation involved in the completion of this
1698 * CompletableFuture threw an exception, this method throws an
1699 * (unchecked) {@link CompletionException} with the underlying
1700 * exception as its cause.
1701 *
1702 * @return the result value
1703 * @throws CancellationException if the computation was cancelled
1704 * @throws CompletionException if this future completed
1705 * exceptionally or a completion computation threw an exception
1706 */
1707 public T join() {
1708 Object r;
1709 return reportJoin((r = result) == null ? waitingGet(false) : r);
1710 }
1711
1712 /**
1713 * Returns the result value (or throws any encountered exception)
1714 * if completed, else returns the given valueIfAbsent.
1715 *
1716 * @param valueIfAbsent the value to return if not completed
1717 * @return the result value, if completed, else the given valueIfAbsent
1718 * @throws CancellationException if the computation was cancelled
1719 * @throws CompletionException if this future completed
1720 * exceptionally or a completion computation threw an exception
1721 */
1722 public T getNow(T valueIfAbsent) {
1723 Object r;
1724 return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
1725 }
1726
1727 /**
1728 * If not already completed, sets the value returned by {@link
1729 * #get()} and related methods to the given value.
1730 *
1731 * @param value the result value
1732 * @return {@code true} if this invocation caused this CompletableFuture
1733 * to transition to a completed state, else {@code false}
1734 */
1735 public boolean complete(T value) {
1736 boolean triggered = internalComplete(value == null ? NIL : value);
1737 postComplete();
1738 return triggered;
1739 }
1740
1741 /**
1742 * If not already completed, causes invocations of {@link #get()}
1743 * and related methods to throw the given exception.
1744 *
1745 * @param ex the exception
1746 * @return {@code true} if this invocation caused this CompletableFuture
1747 * to transition to a completed state, else {@code false}
1748 */
1749 public boolean completeExceptionally(Throwable ex) {
1750 if (ex == null) throw new NullPointerException();
1751 boolean triggered = internalComplete(new AltResult(ex));
1752 postComplete();
1753 return triggered;
1754 }
1755
1756 public <U> CompletableFuture<U> thenApply(
1757 Function<? super T,? extends U> fn) {
1758 return doThenApply(fn, null);
1759 }
1760
1761 public <U> CompletableFuture<U> thenApplyAsync(
1762 Function<? super T,? extends U> fn) {
1763 return doThenApply(fn, asyncPool);
1764 }
1765
1766 public <U> CompletableFuture<U> thenApplyAsync(
1767 Function<? super T,? extends U> fn, Executor executor) {
1768 return doThenApply(fn, screenExecutor(executor));
1769 }
1770
1771 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
1772 return doThenAccept(action, null);
1773 }
1774
1775 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
1776 return doThenAccept(action, asyncPool);
1777 }
1778
1779 public CompletableFuture<Void> thenAcceptAsync(
1780 Consumer<? super T> action, Executor executor) {
1781 return doThenAccept(action, screenExecutor(executor));
1782 }
1783
1784 public CompletableFuture<Void> thenRun(Runnable action) {
1785 return doThenRun(action, null);
1786 }
1787
1788 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1789 return doThenRun(action, asyncPool);
1790 }
1791
1792 public CompletableFuture<Void> thenRunAsync(
1793 Runnable action, Executor executor) {
1794 return doThenRun(action, screenExecutor(executor));
1795 }
1796
1797 public <U,V> CompletableFuture<V> thenCombine(
1798 CompletionStage<? extends U> other,
1799 BiFunction<? super T,? super U,? extends V> fn) {
1800 return doThenCombine(other.toCompletableFuture(), fn, null);
1801 }
1802
1803 public <U,V> CompletableFuture<V> thenCombineAsync(
1804 CompletionStage<? extends U> other,
1805 BiFunction<? super T,? super U,? extends V> fn) {
1806 return doThenCombine(other.toCompletableFuture(), fn, asyncPool);
1807 }
1808
1809 public <U,V> CompletableFuture<V> thenCombineAsync(
1810 CompletionStage<? extends U> other,
1811 BiFunction<? super T,? super U,? extends V> fn,
1812 Executor executor) {
1813 return doThenCombine(other.toCompletableFuture(), fn,
1814 screenExecutor(executor));
1815 }
1816
1817 public <U> CompletableFuture<Void> thenAcceptBoth(
1818 CompletionStage<? extends U> other,
1819 BiConsumer<? super T, ? super U> action) {
1820 return doThenAcceptBoth(other.toCompletableFuture(), action, null);
1821 }
1822
1823 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1824 CompletionStage<? extends U> other,
1825 BiConsumer<? super T, ? super U> action) {
1826 return doThenAcceptBoth(other.toCompletableFuture(), action, asyncPool);
1827 }
1828
1829 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1830 CompletionStage<? extends U> other,
1831 BiConsumer<? super T, ? super U> action,
1832 Executor executor) {
1833 return doThenAcceptBoth(other.toCompletableFuture(), action,
1834 screenExecutor(executor));
1835 }
1836
1837 public CompletableFuture<Void> runAfterBoth(
1838 CompletionStage<?> other, Runnable action) {
1839 return doRunAfterBoth(other.toCompletableFuture(), action, null);
1840 }
1841
1842 public CompletableFuture<Void> runAfterBothAsync(
1843 CompletionStage<?> other, Runnable action) {
1844 return doRunAfterBoth(other.toCompletableFuture(), action, asyncPool);
1845 }
1846
1847 public CompletableFuture<Void> runAfterBothAsync(
1848 CompletionStage<?> other, Runnable action, Executor executor) {
1849 return doRunAfterBoth(other.toCompletableFuture(), action,
1850 screenExecutor(executor));
1851 }
1852
1853 public <U> CompletableFuture<U> applyToEither(
1854 CompletionStage<? extends T> other, Function<? super T, U> fn) {
1855 return doApplyToEither(other.toCompletableFuture(), fn, null);
1856 }
1857
1858 public <U> CompletableFuture<U> applyToEitherAsync(
1859 CompletionStage<? extends T> other, Function<? super T, U> fn) {
1860 return doApplyToEither(other.toCompletableFuture(), fn, asyncPool);
1861 }
1862
1863 public <U> CompletableFuture<U> applyToEitherAsync
1864 (CompletionStage<? extends T> other, Function<? super T, U> fn,
1865 Executor executor) {
1866 return doApplyToEither(other.toCompletableFuture(), fn,
1867 screenExecutor(executor));
1868 }
1869
1870 public CompletableFuture<Void> acceptEither(
1871 CompletionStage<? extends T> other, Consumer<? super T> action) {
1872 return doAcceptEither(other.toCompletableFuture(), action, null);
1873 }
1874
1875 public CompletableFuture<Void> acceptEitherAsync
1876 (CompletionStage<? extends T> other, Consumer<? super T> action) {
1877 return doAcceptEither(other.toCompletableFuture(), action, asyncPool);
1878 }
1879
1880 public CompletableFuture<Void> acceptEitherAsync(
1881 CompletionStage<? extends T> other, Consumer<? super T> action,
1882 Executor executor) {
1883 return doAcceptEither(other.toCompletableFuture(), action,
1884 screenExecutor(executor));
1885 }
1886
1887 public CompletableFuture<Void> runAfterEither(
1888 CompletionStage<?> other, Runnable action) {
1889 return doRunAfterEither(other.toCompletableFuture(), action, null);
1890 }
1891
1892 public CompletableFuture<Void> runAfterEitherAsync(
1893 CompletionStage<?> other, Runnable action) {
1894 return doRunAfterEither(other.toCompletableFuture(), action, asyncPool);
1895 }
1896
1897 public CompletableFuture<Void> runAfterEitherAsync(
1898 CompletionStage<?> other, Runnable action, Executor executor) {
1899 return doRunAfterEither(other.toCompletableFuture(), action,
1900 screenExecutor(executor));
1901 }
1902
1903 public <U> CompletableFuture<U> thenCompose
1904 (Function<? super T, ? extends CompletionStage<U>> fn) {
1905 return doThenCompose(fn, null);
1906 }
1907
1908 public <U> CompletableFuture<U> thenComposeAsync(
1909 Function<? super T, ? extends CompletionStage<U>> fn) {
1910 return doThenCompose(fn, asyncPool);
1911 }
1912
1913 public <U> CompletableFuture<U> thenComposeAsync(
1914 Function<? super T, ? extends CompletionStage<U>> fn,
1915 Executor executor) {
1916 return doThenCompose(fn, screenExecutor(executor));
1917 }
1918
1919 public CompletableFuture<T> whenComplete(
1920 BiConsumer<? super T, ? super Throwable> action) {
1921 return doWhenComplete(action, null);
1922 }
1923
1924 public CompletableFuture<T> whenCompleteAsync(
1925 BiConsumer<? super T, ? super Throwable> action) {
1926 return doWhenComplete(action, asyncPool);
1927 }
1928
1929 public CompletableFuture<T> whenCompleteAsync(
1930 BiConsumer<? super T, ? super Throwable> action, Executor executor) {
1931 return doWhenComplete(action, screenExecutor(executor));
1932 }
1933
1934 public <U> CompletableFuture<U> handle(
1935 BiFunction<? super T, Throwable, ? extends U> fn) {
1936 return doHandle(fn, null);
1937 }
1938
1939 public <U> CompletableFuture<U> handleAsync(
1940 BiFunction<? super T, Throwable, ? extends U> fn) {
1941 return doHandle(fn, asyncPool);
1942 }
1943
1944 public <U> CompletableFuture<U> handleAsync(
1945 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
1946 return doHandle(fn, screenExecutor(executor));
1947 }
1948
1949 /**
1950 * Returns this CompletableFuture.
1951 *
1952 * @return this CompletableFuture
1953 */
1954 public CompletableFuture<T> toCompletableFuture() {
1955 return this;
1956 }
1957
1958 // not in interface CompletionStage
1959
1960 /**
1961 * Returns a new CompletableFuture that is completed when this
1962 * CompletableFuture completes, with the result of the given
1963 * function of the exception triggering this CompletableFuture's
1964 * completion when it completes exceptionally; otherwise, if this
1965 * CompletableFuture completes normally, then the returned
1966 * CompletableFuture also completes normally with the same value.
1967 * Note: More flexible versions of this functionality are
1968 * available using methods {@code whenComplete} and {@code handle}.
1969 *
1970 * @param fn the function to use to compute the value of the
1971 * returned CompletableFuture if this CompletableFuture completed
1972 * exceptionally
1973 * @return the new CompletableFuture
1974 */
1975 public CompletableFuture<T> exceptionally(
1976 Function<Throwable, ? extends T> fn) {
1977 return doExceptionally(fn);
1978 }
1979
1980 /* ------------- Arbitrary-arity constructions -------------- */
1981
1982 /**
1983 * Returns a new CompletableFuture that is completed when all of
1984 * the given CompletableFutures complete. If any of the given
1985 * CompletableFutures complete exceptionally, then the returned
1986 * CompletableFuture also does so, with a CompletionException
1987 * holding this exception as its cause. Otherwise, the results,
1988 * if any, of the given CompletableFutures are not reflected in
1989 * the returned CompletableFuture, but may be obtained by
1990 * inspecting them individually. If no CompletableFutures are
1991 * provided, returns a CompletableFuture completed with the value
1992 * {@code null}.
1993 *
1994 * <p>Among the applications of this method is to await completion
1995 * of a set of independent CompletableFutures before continuing a
1996 * program, as in: {@code CompletableFuture.allOf(c1, c2,
1997 * c3).join();}.
1998 *
1999 * @param cfs the CompletableFutures
2000 * @return a new CompletableFuture that is completed when all of the
2001 * given CompletableFutures complete
2002 * @throws NullPointerException if the array or any of its elements are
2003 * {@code null}
2004 */
2005 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2006 return doAllOf(cfs, 0, cfs.length - 1);
2007 }
2008
2009 /**
2010 * Returns a new CompletableFuture that is completed when any of
2011 * the given CompletableFutures complete, with the same result.
2012 * Otherwise, if it completed exceptionally, the returned
2013 * CompletableFuture also does so, with a CompletionException
2014 * holding this exception as its cause. If no CompletableFutures
2015 * are provided, returns an incomplete CompletableFuture.
2016 *
2017 * @param cfs the CompletableFutures
2018 * @return a new CompletableFuture that is completed with the
2019 * result or exception of any of the given CompletableFutures when
2020 * one completes
2021 * @throws NullPointerException if the array or any of its elements are
2022 * {@code null}
2023 */
2024 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2025 CompletableFuture<Object> d = new CompletableFuture<Object>();
2026 for (int i = 0; i < cfs.length; ++i) {
2027 CompletableFuture<?> c = cfs[i];
2028 Object r = c.result; // throw NPE if null element
2029 if (d.result == null) {
2030 if (r == null)
2031 c.unipush(new DelayedCopy<Object>(d, c));
2032 else
2033 nowCopy(d, r);
2034 }
2035 }
2036 return d;
2037 }
2038
2039 /* ------------- Control and status methods -------------- */
2040
2041 /**
2042 * If not already completed, completes this CompletableFuture with
2043 * a {@link CancellationException}. Dependent CompletableFutures
2044 * that have not already completed will also complete
2045 * exceptionally, with a {@link CompletionException} caused by
2046 * this {@code CancellationException}.
2047 *
2048 * @param mayInterruptIfRunning this value has no effect in this
2049 * implementation because interrupts are not used to control
2050 * processing.
2051 *
2052 * @return {@code true} if this task is now cancelled
2053 */
2054 public boolean cancel(boolean mayInterruptIfRunning) {
2055 boolean cancelled = (result == null) &&
2056 internalComplete(new AltResult(new CancellationException()));
2057 postComplete();
2058 return cancelled || isCancelled();
2059 }
2060
2061 /**
2062 * Returns {@code true} if this CompletableFuture was cancelled
2063 * before it completed normally.
2064 *
2065 * @return {@code true} if this CompletableFuture was cancelled
2066 * before it completed normally
2067 */
2068 public boolean isCancelled() {
2069 Object r;
2070 return ((r = result) instanceof AltResult) &&
2071 (((AltResult)r).ex instanceof CancellationException);
2072 }
2073
2074 /**
2075 * Returns {@code true} if this CompletableFuture completed
2076 * exceptionally, in any way. Possible causes include
2077 * cancellation, explicit invocation of {@code
2078 * completeExceptionally}, and abrupt termination of a
2079 * CompletionStage action.
2080 *
2081 * @return {@code true} if this CompletableFuture completed
2082 * exceptionally
2083 */
2084 public boolean isCompletedExceptionally() {
2085 Object r;
2086 return ((r = result) instanceof AltResult) && r != NIL;
2087 }
2088
2089 /**
2090 * Forcibly sets or resets the value subsequently returned by
2091 * method {@link #get()} and related methods, whether or not
2092 * already completed. This method is designed for use only in
2093 * error recovery actions, and even in such situations may result
2094 * in ongoing dependent completions using established versus
2095 * overwritten outcomes.
2096 *
2097 * @param value the completion value
2098 */
2099 public void obtrudeValue(T value) {
2100 result = (value == null) ? NIL : value;
2101 postComplete();
2102 }
2103
2104 /**
2105 * Forcibly causes subsequent invocations of method {@link #get()}
2106 * and related methods to throw the given exception, whether or
2107 * not already completed. This method is designed for use only in
2108 * recovery actions, and even in such situations may result in
2109 * ongoing dependent completions using established versus
2110 * overwritten outcomes.
2111 *
2112 * @param ex the exception
2113 */
2114 public void obtrudeException(Throwable ex) {
2115 if (ex == null) throw new NullPointerException();
2116 result = new AltResult(ex);
2117 postComplete();
2118 }
2119
2120 /**
2121 * Returns the estimated number of CompletableFutures whose
2122 * completions are awaiting completion of this CompletableFuture.
2123 * This method is designed for use in monitoring system state, not
2124 * for synchronization control.
2125 *
2126 * @return the number of dependent CompletableFutures
2127 */
2128 public int getNumberOfDependents() {
2129 int count = 0;
2130 for (Completion p = completions; p != null; p = p.next)
2131 ++count;
2132 return count;
2133 }
2134
2135 /**
2136 * Returns a string identifying this CompletableFuture, as well as
2137 * its completion state. The state, in brackets, contains the
2138 * String {@code "Completed Normally"} or the String {@code
2139 * "Completed Exceptionally"}, or the String {@code "Not
2140 * completed"} followed by the number of CompletableFutures
2141 * dependent upon its completion, if any.
2142 *
2143 * @return a string identifying this CompletableFuture, as well as its state
2144 */
2145 public String toString() {
2146 Object r = result;
2147 int count;
2148 return super.toString() +
2149 ((r == null) ?
2150 (((count = getNumberOfDependents()) == 0) ?
2151 "[Not completed]" :
2152 "[Not completed, " + count + " dependents]") :
2153 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2154 "[Completed exceptionally]" :
2155 "[Completed normally]"));
2156 }
2157
2158 // Unsafe mechanics
2159 private static final sun.misc.Unsafe UNSAFE;
2160 private static final long RESULT;
2161 private static final long COMPLETIONS;
2162 static {
2163 try {
2164 UNSAFE = sun.misc.Unsafe.getUnsafe();
2165 Class<?> k = CompletableFuture.class;
2166 RESULT = UNSAFE.objectFieldOffset
2167 (k.getDeclaredField("result"));
2168 COMPLETIONS = UNSAFE.objectFieldOffset
2169 (k.getDeclaredField("completions"));
2170 } catch (Exception x) {
2171 throw new Error(x);
2172 }
2173 }
2174 }