ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.106
Committed: Fri May 2 23:29:33 2014 UTC (10 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.105: +4 -4 lines
Log Message:
coding style

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.locks.LockSupport;
25
26 /**
27 * A {@link Future} that may be explicitly completed (setting its
28 * value and status), and may be used as a {@link CompletionStage},
29 * supporting dependent functions and actions that trigger upon its
30 * completion.
31 *
32 * <p>When two or more threads attempt to
33 * {@link #complete complete},
34 * {@link #completeExceptionally completeExceptionally}, or
35 * {@link #cancel cancel}
36 * a CompletableFuture, only one of them succeeds.
37 *
38 * <p>In addition to these and related methods for directly
39 * manipulating status and results, CompletableFuture implements
40 * interface {@link CompletionStage} with the following policies: <ul>
41 *
42 * <li>Actions supplied for dependent completions of
43 * <em>non-async</em> methods may be performed by the thread that
44 * completes the current CompletableFuture, or by any other caller of
45 * a completion method.</li>
46 *
47 * <li>All <em>async</em> methods without an explicit Executor
48 * argument are performed using the {@link ForkJoinPool#commonPool()}
49 * (unless it does not support a parallelism level of at least two, in
50 * which case, a new Thread is used). To simplify monitoring,
51 * debugging, and tracking, all generated asynchronous tasks are
52 * instances of the marker interface {@link
53 * AsynchronousCompletionTask}. </li>
54 *
55 * <li>All CompletionStage methods are implemented independently of
56 * other public methods, so the behavior of one method is not impacted
57 * by overrides of others in subclasses. </li> </ul>
58 *
59 * <p>CompletableFuture also implements {@link Future} with the following
60 * policies: <ul>
61 *
62 * <li>Since (unlike {@link FutureTask}) this class has no direct
63 * control over the computation that causes it to be completed,
64 * cancellation is treated as just another form of exceptional
65 * completion. Method {@link #cancel cancel} has the same effect as
66 * {@code completeExceptionally(new CancellationException())}. Method
67 * {@link #isCompletedExceptionally} can be used to determine if a
68 * CompletableFuture completed in any exceptional fashion.</li>
69 *
70 * <li>In case of exceptional completion with a CompletionException,
71 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
72 * {@link ExecutionException} with the same cause as held in the
73 * corresponding CompletionException. To simplify usage in most
74 * contexts, this class also defines methods {@link #join()} and
75 * {@link #getNow} that instead throw the CompletionException directly
76 * in these cases.</li> </ul>
77 *
78 * @author Doug Lea
79 * @since 1.8
80 */
81 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
82
83 /*
84 * Overview:
85 *
86 * A CompletableFuture may have dependent completion actions,
87 * collected in a linked stack. It atomically completes by CASing
88 * a result field, and then pops off and runs those actions. This
89 * applies across normal vs exceptional outcomes, sync vs async
90 * actions, binary triggers, and various forms of completions.
91 *
92 * Non-nullness of field result (set via CAS) indicates done. An
93 * AltResult is used to box null as a result, as well as to hold
94 * exceptions. Using a single field makes completion simple to
95 * detect and trigger. Encoding and decoding is straightforward
96 * but adds vertical sprawl. One minor simplification relies on
97 * the (static) NIL (to box null results) being the only AltResult
98 * with a null exception field, so we don't usually need explicit
99 * comparisons with NIL. Exception propagation mechanics
100 * surrounding decoding rely on unchecked casts of decoded results
101 * really being unchecked, and user type errors being caught at
102 * point of use, as is currently the case in Java. These are
103 * highlighted by using SuppressWarnings annotated temporaries.
104 *
105 * Dependent actions are represented by Completion objects linked
106 * as Treiber stacks headed by field completions. There are four
107 * kinds of Completions: single-source (UniCompletion), two-source
108 * (BiCompletion), shared (CoBiCompletion, used by the second
109 * source of a BiCompletion), and Signallers that unblock waiters.
110 *
111 * The same patterns of methods and classes are used for each form
112 * of Completion (apply, combine, etc), and are written in a
113 * similar style. For each form X there is, when applicable:
114 *
115 * * Method nowX (for example nowApply) that immediately executes
116 * a supplied function and sets result
117 * * Class AsyncX class (for example AsyncApply) that calls nowX
118 * from another task,
119 * * Class DelayedX (for example DelayedApply) that holds
120 * arguments and calls Xnow when ready.
121 *
122 * For each public CompletionStage method M* (for example
123 * thenApply{Async}), there is a method doM (for example
124 * doThenApply) that creates and/or invokes the appropriate form.
125 * Each deals with three cases that can arise when adding a
126 * dependent completion to CompletableFuture f:
127 *
128 * * f is already complete, so the dependent action is run
129 * immediately, via a "now" method, which, if async,
130 * starts the action in a new task.
131 * * f is not complete, so a Completion action is created and
132 * pushed to f's completions. It is triggered via
133 * f.postComplete when f completes.
134 * * f is not complete, but completes while adding the completion
135 * action, so we try to trigger it upon adding (see method
136 * unipush and derivatives) to cover races.
137 *
138 * Methods with two sources (for example thenCombine) must deal
139 * with races across both while pushing actions. The second
140 * completion is an CoBiCompletion pointing to the first, shared
141 * to ensure that at most one claims and performs the action. The
142 * multiple-arity method allOf does this pairwise to form a tree
143 * of completions. (Method anyOf just uses a depth-one Or tree.)
144 *
145 * Upon setting results, method postComplete is called unless
146 * the target is guaranteed not to be observable (i.e., not yet
147 * returned or linked). Multiple threads can call postComplete,
148 * which atomically pops each dependent action, and tries to
149 * trigger it via method tryAct. Any such action must be performed
150 * only once, even if called from several threads, so Completions
151 * maintain status via CAS, and on success run one of the "now"
152 * methods. Triggering can propagate recursively, so tryAct
153 * returns a completed dependent (if one exists) for further
154 * processing by its caller.
155 *
156 * Blocking methods get() and join() rely on Signaller Completions
157 * that wake up waiting threads. The mechanics are similar to
158 * Treiber stack wait-nodes used in FutureTask, Phaser, and
159 * SynchronousQueue. See their internal documentation for
160 * algorithmic details.
161 *
162 * Without precautions, CompletableFutures would be prone to
163 * garbage accumulation as chains of completions build up, each
164 * pointing back to its sources. So we detach (null out) most
165 * Completion fields as soon as possible. To support this,
166 * internal methods check for and harmlessly ignore null arguments
167 * that may have been obtained during races with threads nulling
168 * out fields. (Some of these checked cases cannot currently
169 * happen.) Fields of Async classes can be but currently are not
170 * fully detached, because they do not in general form cycles.
171 */
172
173 volatile Object result; // Either the result or boxed AltResult
174 volatile Completion<?> completions; // Treiber stack of dependent actions
175
176 final boolean internalComplete(Object r) { // CAS from null to r
177 return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
178 }
179
180 final boolean casCompletions(Completion<?> cmp, Completion<?> val) {
181 return UNSAFE.compareAndSwapObject(this, COMPLETIONS, cmp, val);
182 }
183
184 /* ------------- Encoding and decoding outcomes -------------- */
185
186 static final class AltResult { // See above
187 final Throwable ex; // null only for NIL
188 AltResult(Throwable x) { this.ex = x; }
189 }
190
191 static final AltResult NIL = new AltResult(null);
192
193 /**
194 * Returns the encoding of the given (non-null) exception as a
195 * wrapped CompletionException unless it is one already.
196 */
197 static AltResult altThrowable(Throwable x) {
198 return new AltResult((x instanceof CompletionException) ? x :
199 new CompletionException(x));
200 }
201
202 /**
203 * Returns the encoding of the given arguments: if the exception
204 * is non-null, encodes as altThrowable. Otherwise uses the given
205 * value, boxed as NIL if null.
206 */
207 static Object encodeOutcome(Object v, Throwable x) {
208 return (x != null) ? altThrowable(x) : (v == null) ? NIL : v;
209 }
210
211 /**
212 * Decodes outcome to return result or throw unchecked exception
213 */
214 private static <T> T reportJoin(Object r) {
215 if (r instanceof AltResult) {
216 Throwable x;
217 if ((x = ((AltResult)r).ex) == null)
218 return null;
219 if (x instanceof CancellationException)
220 throw (CancellationException)x;
221 if (x instanceof CompletionException)
222 throw (CompletionException)x;
223 throw new CompletionException(x);
224 }
225 @SuppressWarnings("unchecked") T tr = (T) r;
226 return tr;
227 }
228
229 /**
230 * Reports result using Future.get conventions
231 */
232 private static <T> T reportGet(Object r)
233 throws InterruptedException, ExecutionException {
234 if (r == null) // by convention below, null means interrupted
235 throw new InterruptedException();
236 if (r instanceof AltResult) {
237 Throwable x, cause;
238 if ((x = ((AltResult)r).ex) == null)
239 return null;
240 if (x instanceof CancellationException)
241 throw (CancellationException)x;
242 if ((x instanceof CompletionException) &&
243 (cause = x.getCause()) != null)
244 x = cause;
245 throw new ExecutionException(x);
246 }
247 @SuppressWarnings("unchecked") T tr = (T) r;
248 return tr;
249 }
250
251 /* ------------- Async Tasks -------------- */
252
253 /**
254 * Default executor -- ForkJoinPool.commonPool() unless it cannot
255 * support parallelism.
256 */
257 static final Executor asyncPool =
258 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
259 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
260
261 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
262 static final class ThreadPerTaskExecutor implements Executor {
263 public void execute(Runnable r) { new Thread(r).start(); }
264 }
265
266 /**
267 * A marker interface identifying asynchronous tasks produced by
268 * {@code async} methods. This may be useful for monitoring,
269 * debugging, and tracking asynchronous activities.
270 *
271 * @since 1.8
272 */
273 public static interface AsynchronousCompletionTask {
274 }
275
276 /**
277 * Base class for tasks that can act as either FJ or plain
278 * Runnables. Abstract method compute calls an associated "now"
279 * method. Method exec calls compute if its CompletableFuture is
280 * not already done, and runs completions if done. Fields are not
281 * in general final and can be nulled out after use (but most
282 * currently are not). Classes include serialVersionUIDs even
283 * though they are currently never serialized.
284 */
285 abstract static class Async<T> extends ForkJoinTask<Void>
286 implements Runnable, AsynchronousCompletionTask {
287 CompletableFuture<T> dep; // the CompletableFuture to trigger
288 Async(CompletableFuture<T> dep) { this.dep = dep; }
289
290 abstract void compute(); // call the associated "now" method
291
292 public final boolean exec() {
293 CompletableFuture<T> d;
294 if ((d = dep) != null) {
295 if (d.result == null) // suppress if cancelled
296 compute();
297 if (d.result != null)
298 d.postComplete();
299 dep = null; // detach
300 }
301 return true;
302 }
303 public final Void getRawResult() { return null; }
304 public final void setRawResult(Void v) { }
305 public final void run() { exec(); }
306 private static final long serialVersionUID = 5232453952276885070L;
307 }
308
309 /* ------------- Completions -------------- */
310
311 abstract static class Completion<T> { // See above
312 volatile Completion<?> next; // Treiber stack link
313
314 /**
315 * Performs completion action if enabled, returning a
316 * completed dependent Completablefuture, if one exists.
317 */
318 abstract CompletableFuture<?> tryAct();
319 }
320
321 /**
322 * Triggers all reaachble enabled dependents. Call only when
323 * known to be done.
324 */
325 final void postComplete() {
326 /*
327 * On each step, variable f holds current completions to pop
328 * and run. It is extended along only one path at a time,
329 * pushing others to avoid StackOverflowErrors on recursion.
330 */
331 CompletableFuture<?> f = this; Completion<?> h;
332 while ((h = f.completions) != null ||
333 (f != this && (h = (f = this).completions) != null)) {
334 CompletableFuture<?> d; Completion<?> t;
335 if (f.casCompletions(h, t = h.next)) {
336 if (t != null) {
337 if (f != this) { // push
338 do {} while (!casCompletions(h.next = completions, h));
339 continue;
340 }
341 h.next = null; // detach
342 }
343 f = (d = h.tryAct()) == null ? this : d;
344 }
345 }
346 }
347
348 /* ------------- One-source Completions -------------- */
349
350 /**
351 * A Completion with a source and dependent. The "dep" field acts
352 * as a claim, nulled out to disable further attempts to
353 * trigger. Fields can only be observed by other threads upon
354 * successful push; and should be nulled out after claim.
355 */
356 abstract static class UniCompletion<T> extends Completion<T> {
357 Executor async; // executor to use (null if none)
358 CompletableFuture<T> dep; // the dependent to complete
359 CompletableFuture<?> src; // source of value for tryAct
360
361 UniCompletion(Executor async, CompletableFuture<T> dep,
362 CompletableFuture<?> src) {
363 this.async = async; this.dep = dep; this.src = src;
364 }
365
366 /** Tries to claim completion action by CASing dep to null */
367 final boolean claim(CompletableFuture<T> d) {
368 return UNSAFE.compareAndSwapObject(this, DEP, d, null);
369 }
370
371 private static final sun.misc.Unsafe UNSAFE;
372 private static final long DEP;
373 static {
374 try {
375 UNSAFE = sun.misc.Unsafe.getUnsafe();
376 Class<?> k = UniCompletion.class;
377 DEP = UNSAFE.objectFieldOffset
378 (k.getDeclaredField("dep"));
379 } catch (Exception x) {
380 throw new Error(x);
381 }
382 }
383 }
384
385 /** Pushes c on to completions, and triggers c if done. */
386 private void unipush(UniCompletion<?> c) {
387 if (c != null) {
388 CompletableFuture<?> d;
389 while (result == null && !casCompletions(c.next = completions, c))
390 c.next = null; // clear on CAS failure
391 if ((d = c.tryAct()) != null) // cover races
392 d.postComplete();
393 if (result != null) // clean stack
394 postComplete();
395 }
396 }
397
398 // Immediate, async, delayed, and routing support for Function/apply
399
400 static <T,U> void nowApply(Executor e, CompletableFuture<U> d, Object r,
401 Function<? super T,? extends U> f) {
402 if (d != null && f != null) {
403 T t; U u; Throwable x;
404 if (r instanceof AltResult) {
405 t = null;
406 x = ((AltResult)r).ex;
407 }
408 else {
409 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
410 x = null;
411 }
412 if (x == null) {
413 try {
414 if (e != null) {
415 e.execute(new AsyncApply<T,U>(d, t, f));
416 return;
417 }
418 u = f.apply(t);
419 } catch (Throwable ex) {
420 x = ex;
421 u = null;
422 }
423 }
424 else
425 u = null;
426 d.internalComplete(encodeOutcome(u, x));
427 }
428 }
429
430 static final class AsyncApply<T,U> extends Async<U> {
431 T arg; Function<? super T,? extends U> fn;
432 AsyncApply(CompletableFuture<U> dep, T arg,
433 Function<? super T,? extends U> fn) {
434 super(dep); this.arg = arg; this.fn = fn;
435 }
436 final void compute() { nowApply(null, dep, arg, fn); }
437 private static final long serialVersionUID = 5232453952276885070L;
438 }
439
440 static final class DelayedApply<T,U> extends UniCompletion<U> {
441 Function<? super T,? extends U> fn;
442 DelayedApply(Executor async, CompletableFuture<U> dep,
443 CompletableFuture<?> src,
444 Function<? super T,? extends U> fn) {
445 super(async, dep, src); this.fn = fn;
446 }
447 final CompletableFuture<?> tryAct() {
448 CompletableFuture<U> d; CompletableFuture<?> a; Object r;
449 if ((d = dep) != null && (a = src) != null &&
450 (r = a.result) != null && claim(d)) {
451 nowApply(async, d, r, fn);
452 src = null; fn = null;
453 if (d.result != null) return d;
454 }
455 return null;
456 }
457 }
458
459 private <U> CompletableFuture<U> doThenApply(
460 Function<? super T,? extends U> fn, Executor e) {
461 if (fn == null) throw new NullPointerException();
462 CompletableFuture<U> d = new CompletableFuture<U>();
463 Object r = result;
464 if (r == null)
465 unipush(new DelayedApply<T,U>(e, d, this, fn));
466 else
467 nowApply(e, d, r, fn);
468 return d;
469 }
470
471 // Consumer/accept
472
473 static <T,U> void nowAccept(Executor e, CompletableFuture<U> d,
474 Object r, Consumer<? super T> f) {
475 if (d != null && f != null) {
476 T t; Throwable x;
477 if (r instanceof AltResult) {
478 t = null;
479 x = ((AltResult)r).ex;
480 }
481 else {
482 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
483 x = null;
484 }
485 if (x == null) {
486 try {
487 if (e != null) {
488 e.execute(new AsyncAccept<T,U>(d, t, f));
489 return;
490 }
491 f.accept(t);
492 } catch (Throwable ex) {
493 x = ex;
494 }
495 }
496 d.internalComplete(encodeOutcome(null, x));
497 }
498 }
499
500 static final class AsyncAccept<T,U> extends Async<U> {
501 T arg; Consumer<? super T> fn;
502 AsyncAccept(CompletableFuture<U> dep, T arg,
503 Consumer<? super T> fn) {
504 super(dep); this.arg = arg; this.fn = fn;
505 }
506 final void compute() { nowAccept(null, dep, arg, fn); }
507 private static final long serialVersionUID = 5232453952276885070L;
508 }
509
510 static final class DelayedAccept<T> extends UniCompletion<Void> {
511 Consumer<? super T> fn;
512 DelayedAccept(Executor async, CompletableFuture<Void> dep,
513 CompletableFuture<?> src, Consumer<? super T> fn) {
514 super(async, dep, src); this.fn = fn;
515 }
516 final CompletableFuture<?> tryAct() {
517 CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
518 if ((d = dep) != null && (a = src) != null &&
519 (r = a.result) != null && claim(d)) {
520 nowAccept(async, d, r, fn);
521 src = null; fn = null;
522 if (d.result != null) return d;
523 }
524 return null;
525 }
526 }
527
528 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
529 Executor e) {
530 if (fn == null) throw new NullPointerException();
531 CompletableFuture<Void> d = new CompletableFuture<Void>();
532 Object r = result;
533 if (r == null)
534 unipush(new DelayedAccept<T>(e, d, this, fn));
535 else
536 nowAccept(e, d, r, fn);
537 return d;
538 }
539
540 // Runnable/run
541
542 static <T> void nowRun(Executor e, CompletableFuture<T> d, Object r,
543 Runnable f) {
544 if (d != null && f != null) {
545 Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
546 if (x == null) {
547 try {
548 if (e != null) {
549 e.execute(new AsyncRun<T>(d, f));
550 return;
551 }
552 f.run();
553 } catch (Throwable ex) {
554 x = ex;
555 }
556 }
557 d.internalComplete(encodeOutcome(null, x));
558 }
559 }
560
561 static final class AsyncRun<T> extends Async<T> {
562 Runnable fn;
563 AsyncRun(CompletableFuture<T> dep, Runnable fn) {
564 super(dep); this.fn = fn;
565 }
566 final void compute() { nowRun(null, dep, null, fn); }
567 private static final long serialVersionUID = 5232453952276885070L;
568 }
569
570 static final class DelayedRun extends UniCompletion<Void> {
571 Runnable fn;
572 DelayedRun(Executor async, CompletableFuture<Void> dep,
573 CompletableFuture<?> src, Runnable fn) {
574 super(async, dep, src); this.fn = fn;
575 }
576 final CompletableFuture<?> tryAct() {
577 CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
578 if ((d = dep) != null && (a = src) != null &&
579 (r = a.result) != null && claim(d)) {
580 nowRun(async, d, r, fn);
581 src = null; fn = null; // clear refs
582 if (d.result != null) return d;
583 }
584 return null;
585 }
586 }
587
588 private CompletableFuture<Void> doThenRun(Runnable fn, Executor e) {
589 if (fn == null) throw new NullPointerException();
590 CompletableFuture<Void> d = new CompletableFuture<Void>();
591 Object r = result;
592 if (r == null)
593 unipush(new DelayedRun(e, d, this, fn));
594 else
595 nowRun(e, d, r, fn);
596 return d;
597 }
598
599 // Supplier/get
600
601 static <T> void nowSupply(CompletableFuture<T> d, Supplier<T> f) {
602 if (d != null && f != null) {
603 T t; Throwable x;
604 try {
605 t = f.get();
606 x = null;
607 } catch (Throwable ex) {
608 x = ex;
609 t = null;
610 }
611 d.internalComplete(encodeOutcome(t, x));
612 }
613 }
614
615 static final class AsyncSupply<T> extends Async<T> {
616 Supplier<T> fn;
617 AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
618 super(dep); this.fn = fn;
619 }
620 final void compute() { nowSupply(dep, fn); }
621 private static final long serialVersionUID = 5232453952276885070L;
622 }
623
624 // WhenComplete
625
626 static <T> void nowWhen(Executor e, CompletableFuture<T> d, Object r,
627 BiConsumer<? super T,? super Throwable> f) {
628 if (d != null && f != null) {
629 T t; Throwable x, dx;
630 if (r instanceof AltResult) {
631 t = null;
632 x = ((AltResult)r).ex;
633 }
634 else {
635 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
636 x = null;
637 }
638 try {
639 if (e != null) {
640 e.execute(new AsyncWhen<T>(d, r, f));
641 return;
642 }
643 f.accept(t, x);
644 dx = null;
645 } catch (Throwable ex) {
646 dx = ex;
647 }
648 d.internalComplete(encodeOutcome(t, x != null ? x : dx));
649 }
650 }
651
652 static final class AsyncWhen<T> extends Async<T> {
653 Object arg; BiConsumer<? super T,? super Throwable> fn;
654 AsyncWhen(CompletableFuture<T> dep, Object arg,
655 BiConsumer<? super T,? super Throwable> fn) {
656 super(dep); this.arg = arg; this.fn = fn;
657 }
658 final void compute() { nowWhen(null, dep, arg, fn); }
659 private static final long serialVersionUID = 5232453952276885070L;
660 }
661
662 static final class DelayedWhen<T> extends UniCompletion<T> {
663 BiConsumer<? super T, ? super Throwable> fn;
664 DelayedWhen(Executor async, CompletableFuture<T> dep,
665 CompletableFuture<?> src,
666 BiConsumer<? super T, ? super Throwable> fn) {
667 super(async, dep, src); this.fn = fn;
668 }
669 final CompletableFuture<?> tryAct() {
670 CompletableFuture<T> d; CompletableFuture<?> a; Object r;
671 if ((d = dep) != null && (a = src) != null &&
672 (r = a.result) != null && claim(d)) {
673 nowWhen(async, d, r, fn);
674 src = null; fn = null;
675 if (d.result != null) return d;
676 }
677 return null;
678 }
679 }
680
681 private CompletableFuture<T> doWhenComplete(
682 BiConsumer<? super T, ? super Throwable> fn, Executor e) {
683 if (fn == null) throw new NullPointerException();
684 CompletableFuture<T> d = new CompletableFuture<T>();
685 Object r = result;
686 if (r == null)
687 unipush(new DelayedWhen<T>(e, d, this, fn));
688 else
689 nowWhen(e, d, r, fn);
690 return d;
691 }
692
693 // Handle
694
695 static <T,U> void nowHandle(Executor e, CompletableFuture<U> d, Object r,
696 BiFunction<? super T, Throwable, ? extends U> f) {
697 if (d != null && f != null) {
698 T t; U u; Throwable x, dx;
699 if (r instanceof AltResult) {
700 t = null;
701 x = ((AltResult)r).ex;
702 }
703 else {
704 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
705 x = null;
706 }
707 try {
708 if (e != null) {
709 e.execute(new AsyncCombine<T,Throwable,U>(d, t, x, f));
710 return;
711 }
712 u = f.apply(t, x);
713 dx = null;
714 } catch (Throwable ex) {
715 dx = ex;
716 u = null;
717 }
718 d.internalComplete(encodeOutcome(u, dx));
719 }
720 }
721
722 static final class DelayedHandle<T,U> extends UniCompletion<U> {
723 BiFunction<? super T, Throwable, ? extends U> fn;
724 DelayedHandle(Executor async, CompletableFuture<U> dep,
725 CompletableFuture<?> src,
726 BiFunction<? super T, Throwable, ? extends U> fn) {
727 super(async, dep, src); this.fn = fn;
728 }
729 final CompletableFuture<?> tryAct() {
730 CompletableFuture<U> d; CompletableFuture<?> a; Object r;
731 if ((d = dep) != null && (a = src) != null &&
732 (r = a.result) != null && claim(d)) {
733 nowHandle(async, d, r, fn);
734 src = null; fn = null;
735 if (d.result != null) return d;
736 }
737 return null;
738 }
739 }
740
741 private <U> CompletableFuture<U> doHandle(
742 BiFunction<? super T, Throwable, ? extends U> fn,
743 Executor e) {
744 if (fn == null) throw new NullPointerException();
745 CompletableFuture<U> d = new CompletableFuture<U>();
746 Object r = result;
747 if (r == null)
748 unipush(new DelayedHandle<T,U>(e, d, this, fn));
749 else
750 nowHandle(e, d, r, fn);
751 return d;
752 }
753
754 // Exceptionally
755
756 static <T> void nowExceptionally(CompletableFuture<T> d, Object r,
757 Function<? super Throwable, ? extends T> f) {
758 if (d != null && f != null) {
759 T t; Throwable x, dx;
760 if ((r instanceof AltResult) && (x = ((AltResult)r).ex) != null) {
761 try {
762 t = f.apply(x);
763 dx = null;
764 } catch (Throwable ex) {
765 dx = ex;
766 t = null;
767 }
768 }
769 else {
770 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
771 dx = null;
772 }
773 d.internalComplete(encodeOutcome(t, dx));
774 }
775 }
776
777 static final class DelayedExceptionally<T> extends UniCompletion<T> {
778 Function<? super Throwable, ? extends T> fn;
779 DelayedExceptionally(CompletableFuture<T> dep, CompletableFuture<?> src,
780 Function<? super Throwable, ? extends T> fn) {
781 super(null, dep, src); this.fn = fn;
782 }
783 final CompletableFuture<?> tryAct() {
784 CompletableFuture<T> d; CompletableFuture<?> a; Object r;
785 if ((d = dep) != null && (a = src) != null &&
786 (r = a.result) != null && claim(d)) {
787 nowExceptionally(d, r, fn);
788 src = null; fn = null;
789 if (d.result != null) return d;
790 }
791 return null;
792 }
793 }
794
795 private CompletableFuture<T> doExceptionally(
796 Function<Throwable, ? extends T> fn) {
797 if (fn == null) throw new NullPointerException();
798 CompletableFuture<T> d = new CompletableFuture<T>();
799 Object r = result;
800 if (r == null)
801 unipush(new DelayedExceptionally<T>(d, this, fn));
802 else
803 nowExceptionally(d, r, fn);
804 return d;
805 }
806
807 // Identity function used by nowCompose and anyOf
808
809 static <T> void nowCopy(CompletableFuture<T> d, Object r) {
810 if (d != null && d.result == null) {
811 Throwable x;
812 d.internalComplete(((r instanceof AltResult) &&
813 (x = ((AltResult)r).ex) != null &&
814 !(x instanceof CompletionException)) ?
815 new AltResult(new CompletionException(x)): r);
816 }
817 }
818
819 static final class DelayedCopy<T> extends UniCompletion<T> {
820 DelayedCopy(CompletableFuture<T> dep, CompletableFuture<?> src) {
821 super(null, dep, src);
822 }
823 final CompletableFuture<?> tryAct() {
824 CompletableFuture<T> d; CompletableFuture<?> a; Object r;
825 if ((d = dep) != null && (a = src) != null &&
826 (r = a.result) != null && claim(d)) {
827 nowCopy(d, r);
828 src = null;
829 if (d.result != null) return d;
830 }
831 return null;
832 }
833 }
834
835 // Compose
836
837 static <T,U> void nowCompose(Executor e, CompletableFuture<U> d, Object r,
838 Function<? super T, ? extends CompletionStage<U>> f) {
839 if (d != null && f != null) {
840 T t; Throwable x;
841 if (r instanceof AltResult) {
842 t = null;
843 x = ((AltResult)r).ex;
844 }
845 else {
846 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
847 x = null;
848 }
849 if (x == null) {
850 try {
851 if (e != null)
852 e.execute(new AsyncCompose<T,U>(d, t, f));
853 else {
854 CompletableFuture<U> c =
855 f.apply(t).toCompletableFuture();
856 Object s = c.result;
857 if (s == null)
858 c.unipush(new DelayedCopy<U>(d, c));
859 else
860 nowCopy(d, s);
861 }
862 return;
863 } catch (Throwable ex) {
864 x = ex;
865 }
866 }
867 d.internalComplete(encodeOutcome(null, x));
868 }
869 }
870
871 static final class AsyncCompose<T,U> extends Async<U> {
872 T arg; Function<? super T, ? extends CompletionStage<U>> fn;
873 AsyncCompose(CompletableFuture<U> dep, T arg,
874 Function<? super T, ? extends CompletionStage<U>> fn) {
875 super(dep); this.arg = arg; this.fn = fn;
876 }
877 final void compute() { nowCompose(null, dep, arg, fn); }
878 private static final long serialVersionUID = 5232453952276885070L;
879 }
880
881 static final class DelayedCompose<T,U> extends UniCompletion<U> {
882 Function<? super T, ? extends CompletionStage<U>> fn;
883 DelayedCompose(Executor async, CompletableFuture<U> dep,
884 CompletableFuture<?> src,
885 Function<? super T, ? extends CompletionStage<U>> fn) {
886 super(async, dep, src); this.fn = fn;
887 }
888 final CompletableFuture<?> tryAct() {
889 CompletableFuture<U> d; CompletableFuture<?> a; Object r;
890 if ((d = dep) != null && (a = src) != null &&
891 (r = a.result) != null && claim(d)) {
892 nowCompose(async, d, r, fn);
893 src = null; fn = null;
894 if (d.result != null) return d;
895 }
896 return null;
897 }
898 }
899
900 private <U> CompletableFuture<U> doThenCompose(
901 Function<? super T, ? extends CompletionStage<U>> fn, Executor e) {
902 if (fn == null) throw new NullPointerException();
903 Object r = result;
904 if (r == null || e != null) {
905 CompletableFuture<U> d = new CompletableFuture<U>();
906 if (r == null)
907 unipush(new DelayedCompose<T,U>(e, d, this, fn));
908 else
909 nowCompose(e, d, r, fn);
910 return d;
911 }
912 else { // try to return function result
913 T t; Throwable x;
914 if (r instanceof AltResult) {
915 t = null;
916 x = ((AltResult)r).ex;
917 }
918 else {
919 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
920 x = null;
921 }
922 if (x == null) {
923 try {
924 return fn.apply(t).toCompletableFuture();
925 } catch (Throwable ex) {
926 x = ex;
927 }
928 }
929 CompletableFuture<U> d = new CompletableFuture<U>();
930 d.result = encodeOutcome(null, x);
931 return d;
932 }
933 }
934
935 /* ------------- Two-source Completions -------------- */
936
937 /** A Completion with two sources */
938 abstract static class BiCompletion<T> extends UniCompletion<T> {
939 CompletableFuture<?> snd; // second source for tryAct
940 BiCompletion(Executor async, CompletableFuture<T> dep,
941 CompletableFuture<?> src, CompletableFuture<?> snd) {
942 super(async, dep, src); this.snd = snd;
943 }
944 }
945
946 /** A Completion delegating to a shared BiCompletion */
947 static final class CoBiCompletion<T> extends Completion<T> {
948 BiCompletion<T> completion;
949 CoBiCompletion(BiCompletion<T> completion) {
950 this.completion = completion;
951 }
952 final CompletableFuture<?> tryAct() {
953 BiCompletion<T> c;
954 return (c = completion) == null ? null : c.tryAct();
955 }
956 }
957
958 /* ------------- Two-source Anded -------------- */
959
960 /* Pushes c on to completions and o's completions unless both done. */
961 private <U> void bipushAnded(CompletableFuture<?> o, BiCompletion<U> c) {
962 if (c != null && o != null) {
963 Object r; CompletableFuture<?> d;
964 while ((r = result) == null &&
965 !casCompletions(c.next = completions, c))
966 c.next = null;
967 if (o.result == null) {
968 Completion<U> q = (r != null) ? c : new CoBiCompletion<U>(c);
969 while (o.result == null &&
970 !o.casCompletions(q.next = o.completions, q))
971 q.next = null;
972 }
973 if ((d = c.tryAct()) != null)
974 d.postComplete();
975 if (o.result != null)
976 o.postComplete();
977 if (result != null)
978 postComplete();
979 }
980 }
981
982 // BiFunction/combine
983
984 static <T,U,V> void nowCombine(Executor e, CompletableFuture<V> d,
985 Object r, Object s,
986 BiFunction<? super T,? super U,? extends V> f) {
987 if (d != null && f != null) {
988 T t; U u; V v; Throwable x;
989 if (r instanceof AltResult) {
990 t = null;
991 x = ((AltResult)r).ex;
992 }
993 else {
994 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
995 x = null;
996 }
997 if (x != null)
998 u = null;
999 else if (s instanceof AltResult) {
1000 x = ((AltResult)s).ex;
1001 u = null;
1002 }
1003 else {
1004 @SuppressWarnings("unchecked") U us = (U) s; u = us;
1005 }
1006 if (x == null) {
1007 try {
1008 if (e != null) {
1009 e.execute(new AsyncCombine<T,U,V>(d, t, u, f));
1010 return;
1011 }
1012 v = f.apply(t, u);
1013 } catch (Throwable ex) {
1014 x = ex;
1015 v = null;
1016 }
1017 }
1018 else
1019 v = null;
1020 d.internalComplete(encodeOutcome(v, x));
1021 }
1022 }
1023
1024 static final class AsyncCombine<T,U,V> extends Async<V> {
1025 T arg1; U arg2; BiFunction<? super T,? super U,? extends V> fn;
1026 AsyncCombine(CompletableFuture<V> dep, T arg1, U arg2,
1027 BiFunction<? super T,? super U,? extends V> fn) {
1028 super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
1029 }
1030 final void compute() { nowCombine(null, dep, arg1, arg2, fn); }
1031 private static final long serialVersionUID = 5232453952276885070L;
1032 }
1033
1034 static final class DelayedCombine<T,U,V> extends BiCompletion<V> {
1035 BiFunction<? super T,? super U,? extends V> fn;
1036 DelayedCombine(Executor async, CompletableFuture<V> dep,
1037 CompletableFuture<?> src, CompletableFuture<?> snd,
1038 BiFunction<? super T,? super U,? extends V> fn) {
1039 super(async, dep, src, snd); this.fn = fn;
1040 }
1041 final CompletableFuture<?> tryAct() {
1042 CompletableFuture<V> d; CompletableFuture<?> a, b; Object r, s;
1043 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1044 (r = a.result) != null && (s = b.result) != null && claim(d)) {
1045 nowCombine(async, d, r, s, fn);
1046 src = null; snd = null; fn = null;
1047 if (d.result != null) return d;
1048 }
1049 return null;
1050 }
1051 }
1052
1053 private <U,V> CompletableFuture<V> doThenCombine(
1054 CompletableFuture<? extends U> o,
1055 BiFunction<? super T,? super U,? extends V> fn,
1056 Executor e) {
1057 if (o == null || fn == null) throw new NullPointerException();
1058 CompletableFuture<V> d = new CompletableFuture<V>();
1059 Object r = result, s = o.result;
1060 if (r == null || s == null)
1061 bipushAnded(o, new DelayedCombine<T,U,V>(e, d, this, o, fn));
1062 else
1063 nowCombine(e, d, r, s, fn);
1064 return d;
1065 }
1066
1067 // BiConsumer/AcceptBoth
1068
1069 static <T,U,V> void nowAcceptBoth(Executor e, CompletableFuture<V> d,
1070 Object r, Object s,
1071 BiConsumer<? super T,? super U> f) {
1072 if (d != null && f != null) {
1073 T t; U u; Throwable x;
1074 if (r instanceof AltResult) {
1075 t = null;
1076 x = ((AltResult)r).ex;
1077 }
1078 else {
1079 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
1080 x = null;
1081 }
1082 if (x != null)
1083 u = null;
1084 else if (s instanceof AltResult) {
1085 x = ((AltResult)s).ex;
1086 u = null;
1087 }
1088 else {
1089 @SuppressWarnings("unchecked") U us = (U) s; u = us;
1090 }
1091 if (x == null) {
1092 try {
1093 if (e != null) {
1094 e.execute(new AsyncAcceptBoth<T,U,V>(d, t, u, f));
1095 return;
1096 }
1097 f.accept(t, u);
1098 } catch (Throwable ex) {
1099 x = ex;
1100 }
1101 }
1102 d.internalComplete(encodeOutcome(null, x));
1103 }
1104 }
1105
1106 static final class AsyncAcceptBoth<T,U,V> extends Async<V> {
1107 T arg1; U arg2; BiConsumer<? super T,? super U> fn;
1108 AsyncAcceptBoth(CompletableFuture<V> dep, T arg1, U arg2,
1109 BiConsumer<? super T,? super U> fn) {
1110 super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
1111 }
1112 final void compute() { nowAcceptBoth(null, dep, arg1, arg2, fn); }
1113 private static final long serialVersionUID = 5232453952276885070L;
1114 }
1115
1116 static final class DelayedAcceptBoth<T,U> extends BiCompletion<Void> {
1117 BiConsumer<? super T,? super U> fn;
1118 DelayedAcceptBoth(Executor async, CompletableFuture<Void> dep,
1119 CompletableFuture<?> src, CompletableFuture<?> snd,
1120 BiConsumer<? super T,? super U> fn) {
1121 super(async, dep, src, snd); this.fn = fn;
1122 }
1123 final CompletableFuture<?> tryAct() {
1124 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1125 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1126 (r = a.result) != null && (s = b.result) != null && claim(d)) {
1127 nowAcceptBoth(async, d, r, s, fn);
1128 src = null; snd = null; fn = null;
1129 if (d.result != null) return d;
1130 }
1131 return null;
1132 }
1133 }
1134
1135 private <U> CompletableFuture<Void> doThenAcceptBoth(
1136 CompletableFuture<? extends U> o,
1137 BiConsumer<? super T, ? super U> fn,
1138 Executor e) {
1139 if (o == null || fn == null) throw new NullPointerException();
1140 CompletableFuture<Void> d = new CompletableFuture<Void>();
1141 Object r = result, s = o.result;
1142 if (r == null || s == null)
1143 bipushAnded(o, new DelayedAcceptBoth<T,U>(e, d, this, o, fn));
1144 else
1145 nowAcceptBoth(e, d, r, s, fn);
1146 return d;
1147 }
1148
1149 // Runnable/both
1150
1151 static final class DelayedRunAfterBoth extends BiCompletion<Void> {
1152 Runnable fn;
1153 DelayedRunAfterBoth(Executor async, CompletableFuture<Void> dep,
1154 CompletableFuture<?> src, CompletableFuture<?> snd,
1155 Runnable fn) {
1156 super(async, dep, src, snd); this.fn = fn;
1157 }
1158 final CompletableFuture<?> tryAct() {
1159 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1160 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1161 (r = a.result) != null && (s = b.result) != null && claim(d)) {
1162 Throwable x = (r instanceof AltResult) ?
1163 ((AltResult)r).ex : null;
1164 nowRun(async, d, (x == null) ? s : r, fn);
1165 src = null; snd = null; fn = null;
1166 if (d.result != null) return d;
1167 }
1168 return null;
1169 }
1170 }
1171
1172 private CompletableFuture<Void> doRunAfterBoth(
1173 CompletableFuture<?> o, Runnable fn, Executor e) {
1174 if (o == null || fn == null) throw new NullPointerException();
1175 CompletableFuture<Void> d = new CompletableFuture<Void>();
1176 Object r = result, s = o.result;
1177 if (r == null || s == null)
1178 bipushAnded(o, new DelayedRunAfterBoth(e, d, this, o, fn));
1179 else {
1180 Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1181 nowRun(e, d, (x == null) ? s : r, fn);
1182 }
1183 return d;
1184 }
1185
1186 // allOf
1187
1188 static <T> void nowAnd(CompletableFuture<T> d, Object r, Object s) {
1189 if (d != null) {
1190 Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1191 if (x == null && (s instanceof AltResult))
1192 x = ((AltResult)s).ex;
1193 d.internalComplete(encodeOutcome(null, x));
1194 }
1195 }
1196
1197 static final class DelayedAnd extends BiCompletion<Void> {
1198 DelayedAnd(CompletableFuture<Void> dep,
1199 CompletableFuture<?> src, CompletableFuture<?> snd) {
1200 super(null, dep, src, snd);
1201 }
1202 final CompletableFuture<?> tryAct() {
1203 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1204 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1205 (r = a.result) != null && (s = b.result) != null && claim(d)) {
1206 nowAnd(d, r, s);
1207 src = null; snd = null;
1208 if (d.result != null) return d;
1209 }
1210 return null;
1211 }
1212 }
1213
1214 /** Recursively constructs a tree of And completions */
1215 private static CompletableFuture<Void> doAllOf(CompletableFuture<?>[] cfs,
1216 int lo, int hi) {
1217 CompletableFuture<Void> d = new CompletableFuture<Void>();
1218 if (lo > hi) // empty
1219 d.result = NIL;
1220 else {
1221 int mid = (lo + hi) >>> 1;
1222 CompletableFuture<?> fst = (lo == mid ? cfs[lo] :
1223 doAllOf(cfs, lo, mid));
1224 CompletableFuture<?> snd = (lo == hi ? fst : // and fst with self
1225 (hi == mid+1) ? cfs[hi] :
1226 doAllOf(cfs, mid+1, hi));
1227 Object r = fst.result, s = snd.result; // throw NPE if null elements
1228 if (r == null || s == null) {
1229 DelayedAnd a = new DelayedAnd(d, fst, snd);
1230 if (fst == snd)
1231 fst.unipush(a);
1232 else
1233 fst.bipushAnded(snd, a);
1234 }
1235 else
1236 nowAnd(d, r, s);
1237 }
1238 return d;
1239 }
1240
1241 /* ------------- Two-source Ored -------------- */
1242
1243 /* Pushes c on to completions and o's completions unless either done. */
1244 private <U> void bipushOred(CompletableFuture<?> o, BiCompletion<U> c) {
1245 if (c != null && o != null) {
1246 CompletableFuture<?> d;
1247 while (o.result == null && result == null) {
1248 if (casCompletions(c.next = completions, c)) {
1249 CoBiCompletion<U> q = new CoBiCompletion<U>(c);
1250 while (result == null && o.result == null &&
1251 !o.casCompletions(q.next = o.completions, q))
1252 q.next = null;
1253 break;
1254 }
1255 c.next = null;
1256 }
1257 if ((d = c.tryAct()) != null)
1258 d.postComplete();
1259 if (o.result != null)
1260 o.postComplete();
1261 if (result != null)
1262 postComplete();
1263 }
1264 }
1265
1266 // Function/applyEither
1267
1268 static final class DelayedApplyToEither<T,U> extends BiCompletion<U> {
1269 Function<? super T,? extends U> fn;
1270 DelayedApplyToEither(Executor async, CompletableFuture<U> dep,
1271 CompletableFuture<?> src, CompletableFuture<?> snd,
1272 Function<? super T,? extends U> fn) {
1273 super(async, dep, src, snd); this.fn = fn;
1274 }
1275 final CompletableFuture<?> tryAct() {
1276 CompletableFuture<U> d; CompletableFuture<?> a, b; Object r;
1277 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1278 ((r = a.result) != null || (r = b.result) != null) &&
1279 claim(d)) {
1280 nowApply(async, d, r, fn);
1281 src = null; snd = null; fn = null;
1282 if (d.result != null) return d;
1283 }
1284 return null;
1285 }
1286 }
1287
1288 private <U> CompletableFuture<U> doApplyToEither(
1289 CompletableFuture<? extends T> o,
1290 Function<? super T, U> fn, Executor e) {
1291 if (o == null || fn == null) throw new NullPointerException();
1292 CompletableFuture<U> d = new CompletableFuture<U>();
1293 Object r = result;
1294 if (r == null && (r = o.result) == null)
1295 bipushOred(o, new DelayedApplyToEither<T,U>(e, d, this, o, fn));
1296 else
1297 nowApply(e, d, r, fn);
1298 return d;
1299 }
1300
1301 // Consumer/acceptEither
1302
1303 static final class DelayedAcceptEither<T> extends BiCompletion<Void> {
1304 Consumer<? super T> fn;
1305 DelayedAcceptEither(Executor async, CompletableFuture<Void> dep,
1306 CompletableFuture<?> src, CompletableFuture<?> snd,
1307 Consumer<? super T> fn) {
1308 super(async, dep, src, snd); this.fn = fn;
1309 }
1310 final CompletableFuture<?> tryAct() {
1311 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1312 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1313 ((r = a.result) != null || (r = b.result) != null) &&
1314 claim(d)) {
1315 nowAccept(async, d, r, fn);
1316 src = null; snd = null; fn = null;
1317 if (d.result != null) return d;
1318 }
1319 return null;
1320 }
1321 }
1322
1323 private CompletableFuture<Void> doAcceptEither(
1324 CompletableFuture<? extends T> o,
1325 Consumer<? super T> fn, Executor e) {
1326 if (o == null || fn == null) throw new NullPointerException();
1327 CompletableFuture<Void> d = new CompletableFuture<Void>();
1328 Object r = result;
1329 if (r == null && (r = o.result) == null)
1330 bipushOred(o, new DelayedAcceptEither<T>(e, d, this, o, fn));
1331 else
1332 nowAccept(e, d, r, fn);
1333 return d;
1334 }
1335
1336 // Runnable/runEither
1337
1338 static final class DelayedRunAfterEither extends BiCompletion<Void> {
1339 Runnable fn;
1340 DelayedRunAfterEither(Executor async, CompletableFuture<Void> dep,
1341 CompletableFuture<?> src,
1342 CompletableFuture<?> snd, Runnable fn) {
1343 super(async, dep, src, snd); this.fn = fn;
1344 }
1345 final CompletableFuture<?> tryAct() {
1346 CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1347 if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1348 ((r = a.result) != null || (r = b.result) != null) &&
1349 claim(d)) {
1350 nowRun(async, d, r, fn);
1351 src = null; snd = null; fn = null;
1352 if (d.result != null) return d;
1353 }
1354 return null;
1355 }
1356 }
1357
1358 private CompletableFuture<Void> doRunAfterEither(
1359 CompletableFuture<?> o, Runnable fn, Executor e) {
1360 if (o == null || fn == null) throw new NullPointerException();
1361 CompletableFuture<Void> d = new CompletableFuture<Void>();
1362 Object r = result;
1363 if (r == null && (r = o.result) == null)
1364 bipushOred(o, new DelayedRunAfterEither(e, d, this, o, fn));
1365 else
1366 nowRun(e, d, r, fn);
1367 return d;
1368 }
1369
1370 /* ------------- Signallers -------------- */
1371
1372 /**
1373 * Heuristic spin value for waitingGet() before blocking on
1374 * multiprocessors
1375 */
1376 static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
1377 1 << 8 : 0);
1378
1379 /**
1380 * Completion for recording and releasing a waiting thread. See
1381 * other classes such as Phaser and SynchronousQueue for more
1382 * detailed explanation. This class implements ManagedBlocker to
1383 * avoid starvation when blocking actions pile up in
1384 * ForkJoinPools.
1385 */
1386 static final class Signaller extends Completion<Void>
1387 implements ForkJoinPool.ManagedBlocker {
1388 long nanos; // wait time if timed
1389 final long deadline; // non-zero if timed
1390 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
1391 volatile Thread thread;
1392 Signaller(boolean interruptible, long nanos, long deadline) {
1393 this.thread = Thread.currentThread();
1394 this.interruptControl = interruptible ? 1 : 0;
1395 this.nanos = nanos;
1396 this.deadline = deadline;
1397 }
1398 final CompletableFuture<?> tryAct() {
1399 Thread w = thread;
1400 if (w != null) {
1401 thread = null; // no need to CAS
1402 LockSupport.unpark(w);
1403 }
1404 return null;
1405 }
1406 public boolean isReleasable() {
1407 if (thread == null)
1408 return true;
1409 if (Thread.interrupted()) {
1410 int i = interruptControl;
1411 interruptControl = -1;
1412 if (i > 0)
1413 return true;
1414 }
1415 if (deadline != 0L &&
1416 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
1417 thread = null;
1418 return true;
1419 }
1420 return false;
1421 }
1422 public boolean block() {
1423 if (isReleasable())
1424 return true;
1425 else if (deadline == 0L)
1426 LockSupport.park(this);
1427 else if (nanos > 0L)
1428 LockSupport.parkNanos(this, nanos);
1429 return isReleasable();
1430 }
1431 }
1432
1433 /**
1434 * Returns raw result after waiting, or null if interruptible and
1435 * interrupted.
1436 */
1437 private Object waitingGet(boolean interruptible) {
1438 Signaller q = null;
1439 boolean queued = false;
1440 int spins = SPINS;
1441 Object r;
1442 while ((r = result) == null) {
1443 if (spins > 0) {
1444 if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1445 --spins;
1446 }
1447 else if (q == null)
1448 q = new Signaller(interruptible, 0L, 0L);
1449 else if (!queued)
1450 queued = casCompletions(q.next = completions, q);
1451 else if (interruptible && q.interruptControl < 0) {
1452 q.thread = null;
1453 removeCancelledSignallers();
1454 return null;
1455 }
1456 else if (q.thread != null && result == null) {
1457 try {
1458 ForkJoinPool.managedBlock(q);
1459 } catch (InterruptedException ie) {
1460 q.interruptControl = -1;
1461 }
1462 }
1463 }
1464 if (q != null) {
1465 q.thread = null;
1466 if (q.interruptControl < 0) {
1467 if (interruptible)
1468 r = null; // report interruption
1469 else
1470 Thread.currentThread().interrupt();
1471 }
1472 }
1473 postComplete();
1474 return r;
1475 }
1476
1477 /**
1478 * Returns raw result after waiting, or null if interrupted, or
1479 * throws TimeoutException on timeout.
1480 */
1481 private Object timedGet(long nanos) throws TimeoutException {
1482 if (Thread.interrupted())
1483 return null;
1484 if (nanos <= 0L)
1485 throw new TimeoutException();
1486 long d = System.nanoTime() + nanos;
1487 Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
1488 boolean queued = false;
1489 Object r;
1490 while ((r = result) == null) {
1491 if (!queued)
1492 queued = casCompletions(q.next = completions, q);
1493 else if (q.interruptControl < 0 || q.nanos <= 0L) {
1494 q.thread = null;
1495 removeCancelledSignallers();
1496 if (q.interruptControl < 0)
1497 return null;
1498 throw new TimeoutException();
1499 }
1500 else if (q.thread != null && result == null) {
1501 try {
1502 ForkJoinPool.managedBlock(q);
1503 } catch (InterruptedException ie) {
1504 q.interruptControl = -1;
1505 }
1506 }
1507 }
1508 q.thread = null;
1509 postComplete();
1510 return (q.interruptControl < 0) ? null : r;
1511 }
1512
1513 /**
1514 * Unlinks cancelled Signallers to avoid accumulating garbage.
1515 * Internal nodes are simply unspliced without CAS since it is
1516 * harmless if they are traversed anyway. To avoid effects of
1517 * unsplicing from already removed nodes, the list is retraversed
1518 * in case of an apparent race.
1519 */
1520 private void removeCancelledSignallers() {
1521 for (Completion<?> p = null, q = completions; q != null;) {
1522 Completion<?> s = q.next;
1523 if ((q instanceof Signaller) && ((Signaller)q).thread == null) {
1524 if (p != null) {
1525 p.next = s;
1526 if (!(p instanceof Signaller) ||
1527 ((Signaller)p).thread != null)
1528 break;
1529 }
1530 else if (casCompletions(q, s))
1531 break;
1532 p = null; // restart
1533 q = completions;
1534 }
1535 else {
1536 p = q;
1537 q = s;
1538 }
1539 }
1540 }
1541
1542 /* ------------- public methods -------------- */
1543
1544 /**
1545 * Creates a new incomplete CompletableFuture.
1546 */
1547 public CompletableFuture() {
1548 }
1549
1550 /**
1551 * Returns a new CompletableFuture that is asynchronously completed
1552 * by a task running in the {@link ForkJoinPool#commonPool()} with
1553 * the value obtained by calling the given Supplier.
1554 *
1555 * @param supplier a function returning the value to be used
1556 * to complete the returned CompletableFuture
1557 * @param <U> the function's return type
1558 * @return the new CompletableFuture
1559 */
1560 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1561 if (supplier == null) throw new NullPointerException();
1562 CompletableFuture<U> d = new CompletableFuture<U>();
1563 asyncPool.execute(new AsyncSupply<U>(d, supplier));
1564 return d;
1565 }
1566
1567 /**
1568 * Returns a new CompletableFuture that is asynchronously completed
1569 * by a task running in the given executor with the value obtained
1570 * by calling the given Supplier.
1571 *
1572 * @param supplier a function returning the value to be used
1573 * to complete the returned CompletableFuture
1574 * @param executor the executor to use for asynchronous execution
1575 * @param <U> the function's return type
1576 * @return the new CompletableFuture
1577 */
1578 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1579 Executor executor) {
1580 if (executor == null || supplier == null)
1581 throw new NullPointerException();
1582 CompletableFuture<U> d = new CompletableFuture<U>();
1583 executor.execute(new AsyncSupply<U>(d, supplier));
1584 return d;
1585 }
1586
1587 /**
1588 * Returns a new CompletableFuture that is asynchronously completed
1589 * by a task running in the {@link ForkJoinPool#commonPool()} after
1590 * it runs the given action.
1591 *
1592 * @param runnable the action to run before completing the
1593 * returned CompletableFuture
1594 * @return the new CompletableFuture
1595 */
1596 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1597 if (runnable == null) throw new NullPointerException();
1598 CompletableFuture<Void> d = new CompletableFuture<Void>();
1599 asyncPool.execute(new AsyncRun<Void>(d, runnable));
1600 return d;
1601 }
1602
1603 /**
1604 * Returns a new CompletableFuture that is asynchronously completed
1605 * by a task running in the given executor after it runs the given
1606 * action.
1607 *
1608 * @param runnable the action to run before completing the
1609 * returned CompletableFuture
1610 * @param executor the executor to use for asynchronous execution
1611 * @return the new CompletableFuture
1612 */
1613 public static CompletableFuture<Void> runAsync(Runnable runnable,
1614 Executor executor) {
1615 if (executor == null || runnable == null)
1616 throw new NullPointerException();
1617 CompletableFuture<Void> d = new CompletableFuture<Void>();
1618 executor.execute(new AsyncRun<Void>(d, runnable));
1619 return d;
1620 }
1621
1622 /**
1623 * Returns a new CompletableFuture that is already completed with
1624 * the given value.
1625 *
1626 * @param value the value
1627 * @param <U> the type of the value
1628 * @return the completed CompletableFuture
1629 */
1630 public static <U> CompletableFuture<U> completedFuture(U value) {
1631 CompletableFuture<U> d = new CompletableFuture<U>();
1632 d.result = (value == null) ? NIL : value;
1633 return d;
1634 }
1635
1636 /**
1637 * Returns {@code true} if completed in any fashion: normally,
1638 * exceptionally, or via cancellation.
1639 *
1640 * @return {@code true} if completed
1641 */
1642 public boolean isDone() {
1643 return result != null;
1644 }
1645
1646 /**
1647 * Waits if necessary for this future to complete, and then
1648 * returns its result.
1649 *
1650 * @return the result value
1651 * @throws CancellationException if this future was cancelled
1652 * @throws ExecutionException if this future completed exceptionally
1653 * @throws InterruptedException if the current thread was interrupted
1654 * while waiting
1655 */
1656 public T get() throws InterruptedException, ExecutionException {
1657 Object r;
1658 return reportGet((r = result) == null ? waitingGet(true) : r);
1659 }
1660
1661 /**
1662 * Waits if necessary for at most the given time for this future
1663 * to complete, and then returns its result, if available.
1664 *
1665 * @param timeout the maximum time to wait
1666 * @param unit the time unit of the timeout argument
1667 * @return the result value
1668 * @throws CancellationException if this future was cancelled
1669 * @throws ExecutionException if this future completed exceptionally
1670 * @throws InterruptedException if the current thread was interrupted
1671 * while waiting
1672 * @throws TimeoutException if the wait timed out
1673 */
1674 public T get(long timeout, TimeUnit unit)
1675 throws InterruptedException, ExecutionException, TimeoutException {
1676 Object r;
1677 long nanos = unit.toNanos(timeout);
1678 return reportGet((r = result) == null ? timedGet(nanos) : r);
1679 }
1680
1681 /**
1682 * Returns the result value when complete, or throws an
1683 * (unchecked) exception if completed exceptionally. To better
1684 * conform with the use of common functional forms, if a
1685 * computation involved in the completion of this
1686 * CompletableFuture threw an exception, this method throws an
1687 * (unchecked) {@link CompletionException} with the underlying
1688 * exception as its cause.
1689 *
1690 * @return the result value
1691 * @throws CancellationException if the computation was cancelled
1692 * @throws CompletionException if this future completed
1693 * exceptionally or a completion computation threw an exception
1694 */
1695 public T join() {
1696 Object r;
1697 return reportJoin((r = result) == null ? waitingGet(false) : r);
1698 }
1699
1700 /**
1701 * Returns the result value (or throws any encountered exception)
1702 * if completed, else returns the given valueIfAbsent.
1703 *
1704 * @param valueIfAbsent the value to return if not completed
1705 * @return the result value, if completed, else the given valueIfAbsent
1706 * @throws CancellationException if the computation was cancelled
1707 * @throws CompletionException if this future completed
1708 * exceptionally or a completion computation threw an exception
1709 */
1710 public T getNow(T valueIfAbsent) {
1711 Object r;
1712 return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
1713 }
1714
1715 /**
1716 * If not already completed, sets the value returned by {@link
1717 * #get()} and related methods to the given value.
1718 *
1719 * @param value the result value
1720 * @return {@code true} if this invocation caused this CompletableFuture
1721 * to transition to a completed state, else {@code false}
1722 */
1723 public boolean complete(T value) {
1724 boolean triggered = internalComplete(value == null ? NIL : value);
1725 postComplete();
1726 return triggered;
1727 }
1728
1729 /**
1730 * If not already completed, causes invocations of {@link #get()}
1731 * and related methods to throw the given exception.
1732 *
1733 * @param ex the exception
1734 * @return {@code true} if this invocation caused this CompletableFuture
1735 * to transition to a completed state, else {@code false}
1736 */
1737 public boolean completeExceptionally(Throwable ex) {
1738 if (ex == null) throw new NullPointerException();
1739 boolean triggered = internalComplete(new AltResult(ex));
1740 postComplete();
1741 return triggered;
1742 }
1743
1744 public <U> CompletableFuture<U> thenApply(
1745 Function<? super T,? extends U> fn) {
1746 return doThenApply(fn, null);
1747 }
1748
1749 public <U> CompletableFuture<U> thenApplyAsync(
1750 Function<? super T,? extends U> fn) {
1751 return doThenApply(fn, asyncPool);
1752 }
1753
1754 public <U> CompletableFuture<U> thenApplyAsync(
1755 Function<? super T,? extends U> fn, Executor executor) {
1756 if (executor == null) throw new NullPointerException();
1757 return doThenApply(fn, executor);
1758 }
1759
1760 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
1761 return doThenAccept(action, null);
1762 }
1763
1764 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
1765 return doThenAccept(action, asyncPool);
1766 }
1767
1768 public CompletableFuture<Void> thenAcceptAsync(
1769 Consumer<? super T> action, Executor executor) {
1770 if (executor == null) throw new NullPointerException();
1771 return doThenAccept(action, executor);
1772 }
1773
1774 public CompletableFuture<Void> thenRun(Runnable action) {
1775 return doThenRun(action, null);
1776 }
1777
1778 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1779 return doThenRun(action, asyncPool);
1780 }
1781
1782 public CompletableFuture<Void> thenRunAsync(
1783 Runnable action, Executor executor) {
1784 if (executor == null) throw new NullPointerException();
1785 return doThenRun(action, executor);
1786 }
1787
1788 public <U,V> CompletableFuture<V> thenCombine(
1789 CompletionStage<? extends U> other,
1790 BiFunction<? super T,? super U,? extends V> fn) {
1791 return doThenCombine(other.toCompletableFuture(), fn, null);
1792 }
1793
1794 public <U,V> CompletableFuture<V> thenCombineAsync(
1795 CompletionStage<? extends U> other,
1796 BiFunction<? super T,? super U,? extends V> fn) {
1797 return doThenCombine(other.toCompletableFuture(), fn, asyncPool);
1798 }
1799
1800 public <U,V> CompletableFuture<V> thenCombineAsync(
1801 CompletionStage<? extends U> other,
1802 BiFunction<? super T,? super U,? extends V> fn,
1803 Executor executor) {
1804 if (executor == null) throw new NullPointerException();
1805 return doThenCombine(other.toCompletableFuture(), fn, executor);
1806 }
1807
1808 public <U> CompletableFuture<Void> thenAcceptBoth(
1809 CompletionStage<? extends U> other,
1810 BiConsumer<? super T, ? super U> action) {
1811 return doThenAcceptBoth(other.toCompletableFuture(), action, null);
1812 }
1813
1814 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1815 CompletionStage<? extends U> other,
1816 BiConsumer<? super T, ? super U> action) {
1817 return doThenAcceptBoth(other.toCompletableFuture(), action, asyncPool);
1818 }
1819
1820 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1821 CompletionStage<? extends U> other,
1822 BiConsumer<? super T, ? super U> action,
1823 Executor executor) {
1824 if (executor == null) throw new NullPointerException();
1825 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
1826 }
1827
1828 public CompletableFuture<Void> runAfterBoth(
1829 CompletionStage<?> other, Runnable action) {
1830 return doRunAfterBoth(other.toCompletableFuture(), action, null);
1831 }
1832
1833 public CompletableFuture<Void> runAfterBothAsync(
1834 CompletionStage<?> other, Runnable action) {
1835 return doRunAfterBoth(other.toCompletableFuture(), action, asyncPool);
1836 }
1837
1838 public CompletableFuture<Void> runAfterBothAsync(
1839 CompletionStage<?> other, Runnable action, Executor executor) {
1840 if (executor == null) throw new NullPointerException();
1841 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
1842 }
1843
1844 public <U> CompletableFuture<U> applyToEither(
1845 CompletionStage<? extends T> other, Function<? super T, U> fn) {
1846 return doApplyToEither(other.toCompletableFuture(), fn, null);
1847 }
1848
1849 public <U> CompletableFuture<U> applyToEitherAsync(
1850 CompletionStage<? extends T> other, Function<? super T, U> fn) {
1851 return doApplyToEither(other.toCompletableFuture(), fn, asyncPool);
1852 }
1853
1854 public <U> CompletableFuture<U> applyToEitherAsync
1855 (CompletionStage<? extends T> other, Function<? super T, U> fn,
1856 Executor executor) {
1857 if (executor == null) throw new NullPointerException();
1858 return doApplyToEither(other.toCompletableFuture(), fn, executor);
1859 }
1860
1861 public CompletableFuture<Void> acceptEither(
1862 CompletionStage<? extends T> other, Consumer<? super T> action) {
1863 return doAcceptEither(other.toCompletableFuture(), action, null);
1864 }
1865
1866 public CompletableFuture<Void> acceptEitherAsync
1867 (CompletionStage<? extends T> other, Consumer<? super T> action) {
1868 return doAcceptEither(other.toCompletableFuture(), action, asyncPool);
1869 }
1870
1871 public CompletableFuture<Void> acceptEitherAsync(
1872 CompletionStage<? extends T> other, Consumer<? super T> action,
1873 Executor executor) {
1874 if (executor == null) throw new NullPointerException();
1875 return doAcceptEither(other.toCompletableFuture(), action, executor);
1876 }
1877
1878 public CompletableFuture<Void> runAfterEither(
1879 CompletionStage<?> other, Runnable action) {
1880 return doRunAfterEither(other.toCompletableFuture(), action, null);
1881 }
1882
1883 public CompletableFuture<Void> runAfterEitherAsync(
1884 CompletionStage<?> other, Runnable action) {
1885 return doRunAfterEither(other.toCompletableFuture(), action, asyncPool);
1886 }
1887
1888 public CompletableFuture<Void> runAfterEitherAsync(
1889 CompletionStage<?> other, Runnable action, Executor executor) {
1890 if (executor == null) throw new NullPointerException();
1891 return doRunAfterEither(other.toCompletableFuture(), action, executor);
1892 }
1893
1894 public <U> CompletableFuture<U> thenCompose
1895 (Function<? super T, ? extends CompletionStage<U>> fn) {
1896 return doThenCompose(fn, null);
1897 }
1898
1899 public <U> CompletableFuture<U> thenComposeAsync(
1900 Function<? super T, ? extends CompletionStage<U>> fn) {
1901 return doThenCompose(fn, asyncPool);
1902 }
1903
1904 public <U> CompletableFuture<U> thenComposeAsync(
1905 Function<? super T, ? extends CompletionStage<U>> fn,
1906 Executor executor) {
1907 if (executor == null) throw new NullPointerException();
1908 return doThenCompose(fn, executor);
1909 }
1910
1911 public CompletableFuture<T> whenComplete(
1912 BiConsumer<? super T, ? super Throwable> action) {
1913 return doWhenComplete(action, null);
1914 }
1915
1916 public CompletableFuture<T> whenCompleteAsync(
1917 BiConsumer<? super T, ? super Throwable> action) {
1918 return doWhenComplete(action, asyncPool);
1919 }
1920
1921 public CompletableFuture<T> whenCompleteAsync(
1922 BiConsumer<? super T, ? super Throwable> action, Executor executor) {
1923 if (executor == null) throw new NullPointerException();
1924 return doWhenComplete(action, executor);
1925 }
1926
1927 public <U> CompletableFuture<U> handle(
1928 BiFunction<? super T, Throwable, ? extends U> fn) {
1929 return doHandle(fn, null);
1930 }
1931
1932 public <U> CompletableFuture<U> handleAsync(
1933 BiFunction<? super T, Throwable, ? extends U> fn) {
1934 return doHandle(fn, asyncPool);
1935 }
1936
1937 public <U> CompletableFuture<U> handleAsync(
1938 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
1939 if (executor == null) throw new NullPointerException();
1940 return doHandle(fn, executor);
1941 }
1942
1943 /**
1944 * Returns this CompletableFuture
1945 *
1946 * @return this CompletableFuture
1947 */
1948 public CompletableFuture<T> toCompletableFuture() {
1949 return this;
1950 }
1951
1952 // not in interface CompletionStage
1953
1954 /**
1955 * Returns a new CompletableFuture that is completed when this
1956 * CompletableFuture completes, with the result of the given
1957 * function of the exception triggering this CompletableFuture's
1958 * completion when it completes exceptionally; otherwise, if this
1959 * CompletableFuture completes normally, then the returned
1960 * CompletableFuture also completes normally with the same value.
1961 * Note: More flexible versions of this functionality are
1962 * available using methods {@code whenComplete} and {@code handle}.
1963 *
1964 * @param fn the function to use to compute the value of the
1965 * returned CompletableFuture if this CompletableFuture completed
1966 * exceptionally
1967 * @return the new CompletableFuture
1968 */
1969 public CompletableFuture<T> exceptionally(
1970 Function<Throwable, ? extends T> fn) {
1971 return doExceptionally(fn);
1972 }
1973
1974 /* ------------- Arbitrary-arity constructions -------------- */
1975
1976 /**
1977 * Returns a new CompletableFuture that is completed when all of
1978 * the given CompletableFutures complete. If any of the given
1979 * CompletableFutures complete exceptionally, then the returned
1980 * CompletableFuture also does so, with a CompletionException
1981 * holding this exception as its cause. Otherwise, the results,
1982 * if any, of the given CompletableFutures are not reflected in
1983 * the returned CompletableFuture, but may be obtained by
1984 * inspecting them individually. If no CompletableFutures are
1985 * provided, returns a CompletableFuture completed with the value
1986 * {@code null}.
1987 *
1988 * <p>Among the applications of this method is to await completion
1989 * of a set of independent CompletableFutures before continuing a
1990 * program, as in: {@code CompletableFuture.allOf(c1, c2,
1991 * c3).join();}.
1992 *
1993 * @param cfs the CompletableFutures
1994 * @return a new CompletableFuture that is completed when all of the
1995 * given CompletableFutures complete
1996 * @throws NullPointerException if the array or any of its elements are
1997 * {@code null}
1998 */
1999 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2000 return doAllOf(cfs, 0, cfs.length - 1);
2001 }
2002
2003 /**
2004 * Returns a new CompletableFuture that is completed when any of
2005 * the given CompletableFutures complete, with the same result.
2006 * Otherwise, if it completed exceptionally, the returned
2007 * CompletableFuture also does so, with a CompletionException
2008 * holding this exception as its cause. If no CompletableFutures
2009 * are provided, returns an incomplete CompletableFuture.
2010 *
2011 * @param cfs the CompletableFutures
2012 * @return a new CompletableFuture that is completed with the
2013 * result or exception of any of the given CompletableFutures when
2014 * one completes
2015 * @throws NullPointerException if the array or any of its elements are
2016 * {@code null}
2017 */
2018 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2019 CompletableFuture<Object> d = new CompletableFuture<Object>();
2020 for (int i = 0; i < cfs.length; ++i) {
2021 CompletableFuture<?> c = cfs[i];
2022 Object r = c.result; // throw NPE if null element
2023 if (d.result == null) {
2024 if (r == null)
2025 c.unipush(new DelayedCopy<Object>(d, c));
2026 else
2027 nowCopy(d, r);
2028 }
2029 }
2030 return d;
2031 }
2032
2033 /* ------------- Control and status methods -------------- */
2034
2035 /**
2036 * If not already completed, completes this CompletableFuture with
2037 * a {@link CancellationException}. Dependent CompletableFutures
2038 * that have not already completed will also complete
2039 * exceptionally, with a {@link CompletionException} caused by
2040 * this {@code CancellationException}.
2041 *
2042 * @param mayInterruptIfRunning this value has no effect in this
2043 * implementation because interrupts are not used to control
2044 * processing.
2045 *
2046 * @return {@code true} if this task is now cancelled
2047 */
2048 public boolean cancel(boolean mayInterruptIfRunning) {
2049 boolean cancelled = (result == null) &&
2050 internalComplete(new AltResult(new CancellationException()));
2051 postComplete();
2052 return cancelled || isCancelled();
2053 }
2054
2055 /**
2056 * Returns {@code true} if this CompletableFuture was cancelled
2057 * before it completed normally.
2058 *
2059 * @return {@code true} if this CompletableFuture was cancelled
2060 * before it completed normally
2061 */
2062 public boolean isCancelled() {
2063 Object r;
2064 return ((r = result) instanceof AltResult) &&
2065 (((AltResult)r).ex instanceof CancellationException);
2066 }
2067
2068 /**
2069 * Returns {@code true} if this CompletableFuture completed
2070 * exceptionally, in any way. Possible causes include
2071 * cancellation, explicit invocation of {@code
2072 * completeExceptionally}, and abrupt termination of a
2073 * CompletionStage action.
2074 *
2075 * @return {@code true} if this CompletableFuture completed
2076 * exceptionally
2077 */
2078 public boolean isCompletedExceptionally() {
2079 Object r;
2080 return ((r = result) instanceof AltResult) && r != NIL;
2081 }
2082
2083 /**
2084 * Forcibly sets or resets the value subsequently returned by
2085 * method {@link #get()} and related methods, whether or not
2086 * already completed. This method is designed for use only in
2087 * error recovery actions, and even in such situations may result
2088 * in ongoing dependent completions using established versus
2089 * overwritten outcomes.
2090 *
2091 * @param value the completion value
2092 */
2093 public void obtrudeValue(T value) {
2094 result = (value == null) ? NIL : value;
2095 postComplete();
2096 }
2097
2098 /**
2099 * Forcibly causes subsequent invocations of method {@link #get()}
2100 * and related methods to throw the given exception, whether or
2101 * not already completed. This method is designed for use only in
2102 * recovery actions, and even in such situations may result in
2103 * ongoing dependent completions using established versus
2104 * overwritten outcomes.
2105 *
2106 * @param ex the exception
2107 */
2108 public void obtrudeException(Throwable ex) {
2109 if (ex == null) throw new NullPointerException();
2110 result = new AltResult(ex);
2111 postComplete();
2112 }
2113
2114 /**
2115 * Returns the estimated number of CompletableFutures whose
2116 * completions are awaiting completion of this CompletableFuture.
2117 * This method is designed for use in monitoring system state, not
2118 * for synchronization control.
2119 *
2120 * @return the number of dependent CompletableFutures
2121 */
2122 public int getNumberOfDependents() {
2123 int count = 0;
2124 for (Completion<?> p = completions; p != null; p = p.next)
2125 ++count;
2126 return count;
2127 }
2128
2129 /**
2130 * Returns a string identifying this CompletableFuture, as well as
2131 * its completion state. The state, in brackets, contains the
2132 * String {@code "Completed Normally"} or the String {@code
2133 * "Completed Exceptionally"}, or the String {@code "Not
2134 * completed"} followed by the number of CompletableFutures
2135 * dependent upon its completion, if any.
2136 *
2137 * @return a string identifying this CompletableFuture, as well as its state
2138 */
2139 public String toString() {
2140 Object r = result;
2141 int count;
2142 return super.toString() +
2143 ((r == null) ?
2144 (((count = getNumberOfDependents()) == 0) ?
2145 "[Not completed]" :
2146 "[Not completed, " + count + " dependents]") :
2147 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2148 "[Completed exceptionally]" :
2149 "[Completed normally]"));
2150 }
2151
2152 // Unsafe mechanics
2153 private static final sun.misc.Unsafe UNSAFE;
2154 private static final long RESULT;
2155 private static final long COMPLETIONS;
2156 static {
2157 try {
2158 UNSAFE = sun.misc.Unsafe.getUnsafe();
2159 Class<?> k = CompletableFuture.class;
2160 RESULT = UNSAFE.objectFieldOffset
2161 (k.getDeclaredField("result"));
2162 COMPLETIONS = UNSAFE.objectFieldOffset
2163 (k.getDeclaredField("completions"));
2164 } catch (Exception x) {
2165 throw new Error(x);
2166 }
2167 }
2168 }