ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.129
Committed: Wed Jun 18 04:12:17 2014 UTC (9 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.128: +45 -14 lines
Log Message:
use lazySet on Completion.next when enqueuing or GC-clearing

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.locks.LockSupport;
25
26 /**
27 * A {@link Future} that may be explicitly completed (setting its
28 * value and status), and may be used as a {@link CompletionStage},
29 * supporting dependent functions and actions that trigger upon its
30 * completion.
31 *
32 * <p>When two or more threads attempt to
33 * {@link #complete complete},
34 * {@link #completeExceptionally completeExceptionally}, or
35 * {@link #cancel cancel}
36 * a CompletableFuture, only one of them succeeds.
37 *
38 * <p>In addition to these and related methods for directly
39 * manipulating status and results, CompletableFuture implements
40 * interface {@link CompletionStage} with the following policies: <ul>
41 *
42 * <li>Actions supplied for dependent completions of
43 * <em>non-async</em> methods may be performed by the thread that
44 * completes the current CompletableFuture, or by any other caller of
45 * a completion method.</li>
46 *
47 * <li>All <em>async</em> methods without an explicit Executor
48 * argument are performed using the {@link ForkJoinPool#commonPool()}
49 * (unless it does not support a parallelism level of at least two, in
50 * which case, a new Thread is created to run each task). To simplify
51 * monitoring, debugging, and tracking, all generated asynchronous
52 * tasks are instances of the marker interface {@link
53 * AsynchronousCompletionTask}. </li>
54 *
55 * <li>All CompletionStage methods are implemented independently of
56 * other public methods, so the behavior of one method is not impacted
57 * by overrides of others in subclasses. </li> </ul>
58 *
59 * <p>CompletableFuture also implements {@link Future} with the following
60 * policies: <ul>
61 *
62 * <li>Since (unlike {@link FutureTask}) this class has no direct
63 * control over the computation that causes it to be completed,
64 * cancellation is treated as just another form of exceptional
65 * completion. Method {@link #cancel cancel} has the same effect as
66 * {@code completeExceptionally(new CancellationException())}. Method
67 * {@link #isCompletedExceptionally} can be used to determine if a
68 * CompletableFuture completed in any exceptional fashion.</li>
69 *
70 * <li>In case of exceptional completion with a CompletionException,
71 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
72 * {@link ExecutionException} with the same cause as held in the
73 * corresponding CompletionException. To simplify usage in most
74 * contexts, this class also defines methods {@link #join()} and
75 * {@link #getNow} that instead throw the CompletionException directly
76 * in these cases.</li> </ul>
77 *
78 * @author Doug Lea
79 * @since 1.8
80 */
81 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
82
83 /*
84 * Overview:
85 *
86 * A CompletableFuture may have dependent completion actions,
87 * collected in a linked stack. It atomically completes by CASing
88 * a result field, and then pops off and runs those actions. This
89 * applies across normal vs exceptional outcomes, sync vs async
90 * actions, binary triggers, and various forms of completions.
91 *
92 * Non-nullness of field result (set via CAS) indicates done. An
93 * AltResult is used to box null as a result, as well as to hold
94 * exceptions. Using a single field makes completion simple to
95 * detect and trigger. Encoding and decoding is straightforward
96 * but adds to the sprawl of trapping and associating exceptions
97 * with targets. Minor simplifications rely on (static) NIL (to
98 * box null results) being the only AltResult with a null
99 * exception field, so we don't usually need explicit comparisons.
100 * Even though some of the generics casts are unchecked (see
101 * SuppressWarnings annotations), they are placed to be
102 * appropriate even if checked.
103 *
104 * Dependent actions are represented by Completion objects linked
105 * as Treiber stacks headed by field "stack". There are Completion
106 * classes for each kind of action, grouped into single-input
107 * (UniCompletion), two-input (BiCompletion), projected
108 * (BiCompletions using either (not both) of two inputs), shared
109 * (CoCompletion, used by the second of two sources), zero-input
110 * source actions, and Signallers that unblock waiters. Class
111 * Completion extends ForkJoinTask to enable async execution
112 * (adding no space overhead because we exploit its "tag" methods
113 * to maintain claims). It is also declared as Runnable to allow
114 * usage with arbitrary executors.
115 *
116 * Support for each kind of CompletionStage relies on a separate
117 * class, along with two CompletableFuture methods:
118 *
119 * * A Completion class with name X corresponding to function,
120 * prefaced with "Uni", "Bi", or "Or". Each class contains
121 * fields for source(s), actions, and dependent. They are
122 * boringly similar, differing from others only with respect to
123 * underlying functional forms. We do this so that users don't
124 * encounter layers of adaptors in common usages. We also
125 * include "Relay" classes/methods that don't correspond to user
126 * methods; they copy results from one stage to another.
127 *
128 * * Boolean CompletableFuture method x(...) (for example
129 * uniApply) takes all of the arguments needed to check that an
130 * action is triggerable, and then either runs the action or
131 * arranges its async execution by executing its Completion
132 * argument, if present. The method returns true if known to be
133 * complete.
134 *
135 * * Completion method tryFire(int mode) invokes the associated x
136 * method with its held arguments, and on success cleans up.
137 * The mode argument allows exec to be called twice (SYNC, then
138 * ASYNC); the first to screen and trap exceptions while
139 * arranging to execute, and the second when called from a
140 * task. (A few classes are not used async so take slightly
141 * different forms.) The claim() callback suppresses function
142 * invocation if already claimed by another thread.
143 *
144 * * CompletableFuture method xStage(...) is called from a public
145 * stage method of CompletableFuture x. It screens user
146 * arguments and invokes and/or creates the stage object. If
147 * not async and x is already complete, the action is run
148 * immediately. Otherwise a Completion c is created, pushed to
149 * x's stack (unless done), and started or triggered via
150 * c.tryFire. This also covers races possible if x completes
151 * while pushing. Classes with two inputs (for example BiApply)
152 * deal with races across both while pushing actions. The
153 * second completion is a CoCompletion pointing to the first,
154 * shared so that at most one performs the action. The
155 * multiple-arity methods allOf and anyOf do this pairwise to
156 * form trees of completions.
157 *
158 * Note that the generic type parameters of methods vary according
159 * to whether "this" is a source, dependent, or completion.
160 *
161 * Method postComplete is called upon completion unless the target
162 * is guaranteed not to be observable (i.e., not yet returned or
163 * linked). Multiple threads can call postComplete, which
164 * atomically pops each dependent action, and tries to trigger it
165 * via method exec. Triggering can propagate recursively, so exec
166 * in NESTED mode returns its completed dependent (if one exists)
167 * for further processing by its caller (see method postFire).
168 *
169 * Blocking methods get() and join() rely on Signaller Completions
170 * that wake up waiting threads. The mechanics are similar to
171 * Treiber stack wait-nodes used in FutureTask, Phaser, and
172 * SynchronousQueue. See their internal documentation for
173 * algorithmic details.
174 *
175 * Without precautions, CompletableFutures would be prone to
176 * garbage accumulation as chains of Completions build up, each
177 * pointing back to its sources. So we null out fields as soon as
178 * possible (see especially method Completion.detach). The
179 * screening checks needed anyway harmlessly ignore null arguments
180 * that may have been obtained during races with threads nulling
181 * out fields. We also try to unlink fired Completions from
182 * stacks that might never be popped (see method postFire).
183 * Completion fields need not be declared as final or volatile
184 * because they are only visible to other threads upon safe
185 * publication.
186 */
187
188 volatile Object result; // Either the result or boxed AltResult
189 volatile Completion stack; // Top of Treiber stack of dependent actions
190
191 final boolean internalComplete(Object r) { // CAS from null to r
192 return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
193 }
194
195 final boolean casStack(Completion cmp, Completion val) {
196 return UNSAFE.compareAndSwapObject(this, STACK, cmp, val);
197 }
198
199 /** Returns true if successfully pushed c onto stack. */
200 final boolean tryPushStack(Completion c) {
201 Completion h = stack;
202 c.lazySetNext(h);
203 return UNSAFE.compareAndSwapObject(this, STACK, h, c);
204 }
205
206 /** Unconditionally pushes c onto stack, retrying if necessary. */
207 final void pushStack(Completion c) {
208 do {} while (!tryPushStack(c));
209 }
210
211 /* ------------- Encoding and decoding outcomes -------------- */
212
213 static final class AltResult { // See above
214 final Throwable ex; // null only for NIL
215 AltResult(Throwable x) { this.ex = x; }
216 }
217
218 /** The encoding of the null value. */
219 static final AltResult NIL = new AltResult(null);
220
221 /** Completes with the null value, unless already completed. */
222 final boolean completeNull() {
223 return UNSAFE.compareAndSwapObject(this, RESULT, null,
224 NIL);
225 }
226
227 /** Returns the encoding of the given non-exceptional value. */
228 final Object encodeValue(T t) {
229 return (t == null) ? NIL : t;
230 }
231
232 /** Completes with a non-exceptional result, unless already completed. */
233 final boolean completeValue(T t) {
234 return UNSAFE.compareAndSwapObject(this, RESULT, null,
235 (t == null) ? NIL : t);
236 }
237
238 /**
239 * Returns the encoding of the given (non-null) exception as a
240 * wrapped CompletionException unless it is one already.
241 */
242 static AltResult encodeThrowable(Throwable x) {
243 return new AltResult((x instanceof CompletionException) ? x :
244 new CompletionException(x));
245 }
246
247 /** Completes with an exceptional result, unless already completed. */
248 final boolean completeThrowable(Throwable x) {
249 return UNSAFE.compareAndSwapObject(this, RESULT, null,
250 encodeThrowable(x));
251 }
252
253 /**
254 * Returns the encoding of the given (non-null) exception as a
255 * wrapped CompletionException unless it is one already. May
256 * return the given Object r (which must have been the result of a
257 * source future) if it is equivalent, i.e. if this is a simple
258 * relay of an existing CompletionException.
259 */
260 static Object encodeThrowable(Throwable x, Object r) {
261 if (!(x instanceof CompletionException))
262 x = new CompletionException(x);
263 else if (r instanceof AltResult && x == ((AltResult)r).ex)
264 return r;
265 return new AltResult(x);
266 }
267
268 /**
269 * Completes with the given (non-null) exceptional result as a
270 * wrapped CompletionException unless it is one already, unless
271 * already completed. May complete with the given Object r
272 * (which must have been the result of a source future) if it is
273 * equivalent, i.e. if this is a simple propagation of an
274 * existing CompletionException.
275 */
276 final boolean completeThrowable(Throwable x, Object r) {
277 return UNSAFE.compareAndSwapObject(this, RESULT, null,
278 encodeThrowable(x, r));
279 }
280
281 /**
282 * Returns the encoding of the given arguments: if the exception
283 * is non-null, encodes as AltResult. Otherwise uses the given
284 * value, boxed as NIL if null.
285 */
286 Object encodeOutcome(T t, Throwable x) {
287 return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x);
288 }
289
290 /**
291 * Returns the encoding of a copied outcome; if exceptional,
292 * rewraps as a CompletionException, else returns argument.
293 */
294 static Object encodeRelay(Object r) {
295 Throwable x;
296 return (((r instanceof AltResult) &&
297 (x = ((AltResult)r).ex) != null &&
298 !(x instanceof CompletionException)) ?
299 new AltResult(new CompletionException(x)) : r);
300 }
301
302 /**
303 * Completes with r or a copy of r, unless already completed.
304 * If exceptional, r is first coerced to a CompletionException.
305 */
306 final boolean completeRelay(Object r) {
307 return UNSAFE.compareAndSwapObject(this, RESULT, null,
308 encodeRelay(r));
309 }
310
311 /**
312 * Reports result using Future.get conventions.
313 */
314 private static <T> T reportGet(Object r)
315 throws InterruptedException, ExecutionException {
316 if (r == null) // by convention below, null means interrupted
317 throw new InterruptedException();
318 if (r instanceof AltResult) {
319 Throwable x, cause;
320 if ((x = ((AltResult)r).ex) == null)
321 return null;
322 if (x instanceof CancellationException)
323 throw (CancellationException)x;
324 if ((x instanceof CompletionException) &&
325 (cause = x.getCause()) != null)
326 x = cause;
327 throw new ExecutionException(x);
328 }
329 @SuppressWarnings("unchecked") T t = (T) r;
330 return t;
331 }
332
333 /**
334 * Decodes outcome to return result or throw unchecked exception.
335 */
336 private static <T> T reportJoin(Object r) {
337 if (r instanceof AltResult) {
338 Throwable x;
339 if ((x = ((AltResult)r).ex) == null)
340 return null;
341 if (x instanceof CancellationException)
342 throw (CancellationException)x;
343 if (x instanceof CompletionException)
344 throw (CompletionException)x;
345 throw new CompletionException(x);
346 }
347 @SuppressWarnings("unchecked") T t = (T) r;
348 return t;
349 }
350
351 /* ------------- Async task preliminaries -------------- */
352
353 /**
354 * A marker interface identifying asynchronous tasks produced by
355 * {@code async} methods. This may be useful for monitoring,
356 * debugging, and tracking asynchronous activities.
357 *
358 * @since 1.8
359 */
360 public static interface AsynchronousCompletionTask {
361 }
362
363 private static final boolean useCommonPool =
364 (ForkJoinPool.getCommonPoolParallelism() > 1);
365
366 /**
367 * Default executor -- ForkJoinPool.commonPool() unless it cannot
368 * support parallelism.
369 */
370 private static final Executor asyncPool = useCommonPool ?
371 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
372
373 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
374 static final class ThreadPerTaskExecutor implements Executor {
375 public void execute(Runnable r) { new Thread(r).start(); }
376 }
377
378 /**
379 * Null-checks user executor argument, and translates uses of
380 * commonPool to asyncPool in case parallelism disabled.
381 */
382 static Executor screenExecutor(Executor e) {
383 if (!useCommonPool && e == ForkJoinPool.commonPool())
384 return asyncPool;
385 if (e == null) throw new NullPointerException();
386 return e;
387 }
388
389 // Modes for Completion.exec. Signedness matters.
390 static final int SYNC = 0;
391 static final int ASYNC = 1;
392 static final int NESTED = -1;
393
394 /* ------------- Base Completion classes and operations -------------- */
395
396 @SuppressWarnings("serial")
397 abstract static class Completion extends ForkJoinTask<Void>
398 implements Runnable, AsynchronousCompletionTask {
399 volatile Completion next; // Treiber stack link
400
401 /**
402 * Performs completion action if triggered, returning a
403 * dependent that may need propagation, if one exists.
404 *
405 * @param mode SYNC, ASYNC, or NESTED
406 */
407 abstract CompletableFuture<?> tryFire(int mode);
408
409 /** Returns true if possibly still triggerable. Used by cleanStack. */
410 abstract boolean isLive();
411
412 public final void run() { tryFire(ASYNC); }
413 public final boolean exec() { tryFire(ASYNC); return true; }
414 public final Void getRawResult() { return null; }
415 public final void setRawResult(Void v) {}
416
417 void lazySetNext(Completion val) {
418 UNSAFE.putOrderedObject(this, NEXT, val);
419 }
420
421 // Unsafe mechanics
422
423 private static final sun.misc.Unsafe UNSAFE;
424 private static final long NEXT;
425
426 static {
427 try {
428 UNSAFE = sun.misc.Unsafe.getUnsafe();
429 NEXT = UNSAFE.objectFieldOffset
430 (Completion.class.getDeclaredField("next"));
431 } catch (Exception e) {
432 throw new Error(e);
433 }
434 }
435 }
436
437 /**
438 * Pops and tries to trigger all reachable dependents. Call only
439 * when known to be done.
440 */
441 final void postComplete() {
442 /*
443 * On each step, variable f holds current dependents to pop
444 * and run. It is extended along only one path at a time,
445 * pushing others to avoid unbounded recursion.
446 */
447 CompletableFuture<?> f = this; Completion h;
448 while ((h = f.stack) != null ||
449 (f != this && (h = (f = this).stack) != null)) {
450 CompletableFuture<?> d; Completion t;
451 if (f.casStack(h, t = h.next)) {
452 if (t != null) {
453 if (f != this) {
454 pushStack(h);
455 continue;
456 }
457 h.next = null; // detach
458 }
459 f = (d = h.tryFire(NESTED)) == null ? this : d;
460 }
461 }
462 }
463
464 /** Traverses stack and unlinks dead Completions. */
465 final void cleanStack() {
466 for (Completion p = null, q = stack; q != null;) {
467 Completion s = q.next;
468 if (q.isLive()) {
469 p = q;
470 q = s;
471 }
472 else if (p == null) {
473 casStack(q, s);
474 q = stack;
475 }
476 else {
477 p.next = s;
478 if (p.isLive())
479 q = s;
480 else {
481 p = null; // restart
482 q = stack;
483 }
484 }
485 }
486 }
487
488 /* ------------- One-input Completions -------------- */
489
490 /** A Completion with a source, dependent, and executor. */
491 @SuppressWarnings("serial")
492 abstract static class UniCompletion<T,V> extends Completion {
493 Executor executor; // executor to use (null if none)
494 CompletableFuture<V> dep; // the dependent to complete
495 CompletableFuture<T> src; // source for action
496
497 UniCompletion(Executor executor, CompletableFuture<V> dep,
498 CompletableFuture<T> src) {
499 this.executor = executor; this.dep = dep; this.src = src;
500 }
501
502 /**
503 * Returns true if action can be run. Call only when known to
504 * be triggerable. Uses FJ tag bit to ensure that only one
505 * thread claims ownership. If async, starts as task -- a
506 * later call to tryFire will run action.
507 */
508 final boolean claim() {
509 Executor e = executor;
510 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
511 if (e == null)
512 return true;
513 executor = null; // disable
514 e.execute(this);
515 }
516 return false;
517 }
518
519 final boolean isLive() { return dep != null; }
520 }
521
522 /** Pushes the given completion (if it exists) unless done. */
523 final void push(UniCompletion<?,?> c) {
524 if (c != null) {
525 while (result == null && !tryPushStack(c))
526 c.lazySetNext(null); // clear on failure
527 }
528 }
529
530 /**
531 * Post-processing by dependent after successful UniCompletion
532 * tryFire. Tries to clean stack of source a, and then either runs
533 * postComplete or returns this to caller, depending on mode.
534 */
535 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
536 if (a != null && a.stack != null) {
537 if (mode < 0 || a.result == null)
538 a.cleanStack();
539 else
540 a.postComplete();
541 }
542 if (result != null && stack != null) {
543 if (mode < 0)
544 return this;
545 else
546 postComplete();
547 }
548 return null;
549 }
550
551 @SuppressWarnings("serial")
552 static final class UniApply<T,V> extends UniCompletion<T,V> {
553 Function<? super T,? extends V> fn;
554 UniApply(Executor executor, CompletableFuture<V> dep,
555 CompletableFuture<T> src,
556 Function<? super T,? extends V> fn) {
557 super(executor, dep, src); this.fn = fn;
558 }
559 final CompletableFuture<V> tryFire(int mode) {
560 CompletableFuture<V> d; CompletableFuture<T> a;
561 if ((d = dep) == null ||
562 !d.uniApply(a = src, fn, mode > 0 ? null : this))
563 return null;
564 dep = null; src = null; fn = null;
565 return d.postFire(a, mode);
566 }
567 }
568
569 final <S> boolean uniApply(CompletableFuture<S> a,
570 Function<? super S,? extends T> f,
571 UniApply<S,T> c) {
572 Object r; Throwable x;
573 if (a == null || (r = a.result) == null || f == null)
574 return false;
575 tryComplete: if (result == null) {
576 if (r instanceof AltResult) {
577 if ((x = ((AltResult)r).ex) != null) {
578 completeThrowable(x, r);
579 break tryComplete;
580 }
581 r = null;
582 }
583 try {
584 if (c != null && !c.claim())
585 return false;
586 @SuppressWarnings("unchecked") S s = (S) r;
587 completeValue(f.apply(s));
588 } catch (Throwable ex) {
589 completeThrowable(ex);
590 }
591 }
592 return true;
593 }
594
595 private <V> CompletableFuture<V> uniApplyStage(
596 Executor e, Function<? super T,? extends V> f) {
597 if (f == null) throw new NullPointerException();
598 CompletableFuture<V> d = new CompletableFuture<V>();
599 if (e != null || !d.uniApply(this, f, null)) {
600 UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
601 push(c);
602 c.tryFire(SYNC);
603 }
604 return d;
605 }
606
607 @SuppressWarnings("serial")
608 static final class UniAccept<T> extends UniCompletion<T,Void> {
609 Consumer<? super T> fn;
610 UniAccept(Executor executor, CompletableFuture<Void> dep,
611 CompletableFuture<T> src, Consumer<? super T> fn) {
612 super(executor, dep, src); this.fn = fn;
613 }
614 final CompletableFuture<Void> tryFire(int mode) {
615 CompletableFuture<Void> d; CompletableFuture<T> a;
616 if ((d = dep) == null ||
617 !d.uniAccept(a = src, fn, mode > 0 ? null : this))
618 return null;
619 dep = null; src = null; fn = null;
620 return d.postFire(a, mode);
621 }
622 }
623
624 final <S> boolean uniAccept(CompletableFuture<S> a,
625 Consumer<? super S> f, UniAccept<S> c) {
626 Object r; Throwable x;
627 if (a == null || (r = a.result) == null || f == null)
628 return false;
629 tryComplete: if (result == null) {
630 if (r instanceof AltResult) {
631 if ((x = ((AltResult)r).ex) != null) {
632 completeThrowable(x, r);
633 break tryComplete;
634 }
635 r = null;
636 }
637 try {
638 if (c != null && !c.claim())
639 return false;
640 @SuppressWarnings("unchecked") S s = (S) r;
641 f.accept(s);
642 completeNull();
643 } catch (Throwable ex) {
644 completeThrowable(ex);
645 }
646 }
647 return true;
648 }
649
650 private CompletableFuture<Void> uniAcceptStage(Executor e,
651 Consumer<? super T> f) {
652 if (f == null) throw new NullPointerException();
653 CompletableFuture<Void> d = new CompletableFuture<Void>();
654 if (e != null || !d.uniAccept(this, f, null)) {
655 UniAccept<T> c = new UniAccept<T>(e, d, this, f);
656 push(c);
657 c.tryFire(SYNC);
658 }
659 return d;
660 }
661
662 @SuppressWarnings("serial")
663 static final class UniRun<T> extends UniCompletion<T,Void> {
664 Runnable fn;
665 UniRun(Executor executor, CompletableFuture<Void> dep,
666 CompletableFuture<T> src, Runnable fn) {
667 super(executor, dep, src); this.fn = fn;
668 }
669 final CompletableFuture<Void> tryFire(int mode) {
670 CompletableFuture<Void> d; CompletableFuture<T> a;
671 if ((d = dep) == null ||
672 !d.uniRun(a = src, fn, mode > 0 ? null : this))
673 return null;
674 dep = null; src = null; fn = null;
675 return d.postFire(a, mode);
676 }
677 }
678
679 final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
680 Object r; Throwable x;
681 if (a == null || (r = a.result) == null || f == null)
682 return false;
683 if (result == null) {
684 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
685 completeThrowable(x, r);
686 else
687 try {
688 if (c != null && !c.claim())
689 return false;
690 f.run();
691 completeNull();
692 } catch (Throwable ex) {
693 completeThrowable(ex);
694 }
695 }
696 return true;
697 }
698
699 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
700 if (f == null) throw new NullPointerException();
701 CompletableFuture<Void> d = new CompletableFuture<Void>();
702 if (e != null || !d.uniRun(this, f, null)) {
703 UniRun<T> c = new UniRun<T>(e, d, this, f);
704 push(c);
705 c.tryFire(SYNC);
706 }
707 return d;
708 }
709
710 @SuppressWarnings("serial")
711 static final class UniWhenComplete<T> extends UniCompletion<T,T> {
712 BiConsumer<? super T, ? super Throwable> fn;
713 UniWhenComplete(Executor executor, CompletableFuture<T> dep,
714 CompletableFuture<T> src,
715 BiConsumer<? super T, ? super Throwable> fn) {
716 super(executor, dep, src); this.fn = fn;
717 }
718 final CompletableFuture<T> tryFire(int mode) {
719 CompletableFuture<T> d; CompletableFuture<T> a;
720 if ((d = dep) == null ||
721 !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this))
722 return null;
723 dep = null; src = null; fn = null;
724 return d.postFire(a, mode);
725 }
726 }
727
728 final boolean uniWhenComplete(CompletableFuture<T> a,
729 BiConsumer<? super T,? super Throwable> f,
730 UniWhenComplete<T> c) {
731 Object r; T t; Throwable x = null;
732 if (a == null || (r = a.result) == null || f == null)
733 return false;
734 if (result == null) {
735 try {
736 if (c != null && !c.claim())
737 return false;
738 if (r instanceof AltResult) {
739 x = ((AltResult)r).ex;
740 t = null;
741 } else {
742 @SuppressWarnings("unchecked") T tr = (T) r;
743 t = tr;
744 }
745 f.accept(t, x);
746 if (x == null) {
747 internalComplete(r);
748 return true;
749 }
750 } catch (Throwable ex) {
751 if (x == null)
752 x = ex;
753 }
754 completeThrowable(x, r);
755 }
756 return true;
757 }
758
759 private CompletableFuture<T> uniWhenCompleteStage(
760 Executor e, BiConsumer<? super T, ? super Throwable> f) {
761 if (f == null) throw new NullPointerException();
762 CompletableFuture<T> d = new CompletableFuture<T>();
763 if (e != null || !d.uniWhenComplete(this, f, null)) {
764 UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
765 push(c);
766 c.tryFire(SYNC);
767 }
768 return d;
769 }
770
771 @SuppressWarnings("serial")
772 static final class UniHandle<T,V> extends UniCompletion<T,V> {
773 BiFunction<? super T, Throwable, ? extends V> fn;
774 UniHandle(Executor executor, CompletableFuture<V> dep,
775 CompletableFuture<T> src,
776 BiFunction<? super T, Throwable, ? extends V> fn) {
777 super(executor, dep, src); this.fn = fn;
778 }
779 final CompletableFuture<V> tryFire(int mode) {
780 CompletableFuture<V> d; CompletableFuture<T> a;
781 if ((d = dep) == null ||
782 !d.uniHandle(a = src, fn, mode > 0 ? null : this))
783 return null;
784 dep = null; src = null; fn = null;
785 return d.postFire(a, mode);
786 }
787 }
788
789 final <S> boolean uniHandle(CompletableFuture<S> a,
790 BiFunction<? super S, Throwable, ? extends T> f,
791 UniHandle<S,T> c) {
792 Object r; S s; Throwable x;
793 if (a == null || (r = a.result) == null || f == null)
794 return false;
795 if (result == null) {
796 try {
797 if (c != null && !c.claim())
798 return false;
799 if (r instanceof AltResult) {
800 x = ((AltResult)r).ex;
801 s = null;
802 } else {
803 x = null;
804 @SuppressWarnings("unchecked") S ss = (S) r;
805 s = ss;
806 }
807 completeValue(f.apply(s, x));
808 } catch (Throwable ex) {
809 completeThrowable(ex);
810 }
811 }
812 return true;
813 }
814
815 private <V> CompletableFuture<V> uniHandleStage(
816 Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
817 if (f == null) throw new NullPointerException();
818 CompletableFuture<V> d = new CompletableFuture<V>();
819 if (e != null || !d.uniHandle(this, f, null)) {
820 UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f);
821 push(c);
822 c.tryFire(SYNC);
823 }
824 return d;
825 }
826
827 @SuppressWarnings("serial")
828 static final class UniExceptionally<T> extends UniCompletion<T,T> {
829 Function<? super Throwable, ? extends T> fn;
830 UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src,
831 Function<? super Throwable, ? extends T> fn) {
832 super(null, dep, src); this.fn = fn;
833 }
834 final CompletableFuture<T> tryFire(int mode) { // never ASYNC
835 // assert mode != ASYNC;
836 CompletableFuture<T> d; CompletableFuture<T> a;
837 if ((d = dep) == null || !d.uniExceptionally(a = src, fn, this))
838 return null;
839 dep = null; src = null; fn = null;
840 return d.postFire(a, mode);
841 }
842 }
843
844 final boolean uniExceptionally(CompletableFuture<T> a,
845 Function<? super Throwable, ? extends T> f,
846 UniExceptionally<T> c) {
847 Object r; Throwable x;
848 if (a == null || (r = a.result) == null || f == null)
849 return false;
850 if (result == null) {
851 try {
852 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) {
853 if (c != null && !c.claim())
854 return false;
855 completeValue(f.apply(x));
856 } else
857 internalComplete(r);
858 } catch (Throwable ex) {
859 completeThrowable(ex);
860 }
861 }
862 return true;
863 }
864
865 private CompletableFuture<T> uniExceptionallyStage(
866 Function<Throwable, ? extends T> f) {
867 if (f == null) throw new NullPointerException();
868 CompletableFuture<T> d = new CompletableFuture<T>();
869 if (!d.uniExceptionally(this, f, null)) {
870 UniExceptionally<T> c = new UniExceptionally<T>(d, this, f);
871 push(c);
872 c.tryFire(SYNC);
873 }
874 return d;
875 }
876
877 @SuppressWarnings("serial")
878 static final class UniRelay<T> extends UniCompletion<T,T> { // for Compose
879 UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) {
880 super(null, dep, src);
881 }
882 final CompletableFuture<T> tryFire(int mode) {
883 CompletableFuture<T> d; CompletableFuture<T> a;
884 if ((d = dep) == null || !d.uniRelay(a = src))
885 return null;
886 src = null; dep = null;
887 return d.postFire(a, mode);
888 }
889 }
890
891 final boolean uniRelay(CompletableFuture<T> a) {
892 Object r;
893 if (a == null || (r = a.result) == null)
894 return false;
895 if (result == null) // no need to claim
896 completeRelay(r);
897 return true;
898 }
899
900 @SuppressWarnings("serial")
901 static final class UniCompose<T,V> extends UniCompletion<T,V> {
902 Function<? super T, ? extends CompletionStage<V>> fn;
903 UniCompose(Executor executor, CompletableFuture<V> dep,
904 CompletableFuture<T> src,
905 Function<? super T, ? extends CompletionStage<V>> fn) {
906 super(executor, dep, src); this.fn = fn;
907 }
908 final CompletableFuture<V> tryFire(int mode) {
909 CompletableFuture<V> d; CompletableFuture<T> a;
910 if ((d = dep) == null ||
911 !d.uniCompose(a = src, fn, mode > 0 ? null : this))
912 return null;
913 dep = null; src = null; fn = null;
914 return d.postFire(a, mode);
915 }
916 }
917
918 final <S> boolean uniCompose(
919 CompletableFuture<S> a,
920 Function<? super S, ? extends CompletionStage<T>> f,
921 UniCompose<S,T> c) {
922 Object r; Throwable x;
923 if (a == null || (r = a.result) == null || f == null)
924 return false;
925 tryComplete: if (result == null) {
926 if (r instanceof AltResult) {
927 if ((x = ((AltResult)r).ex) != null) {
928 completeThrowable(x, r);
929 break tryComplete;
930 }
931 r = null;
932 }
933 try {
934 if (c != null && !c.claim())
935 return false;
936 @SuppressWarnings("unchecked") S s = (S) r;
937 CompletableFuture<T> g = f.apply(s).toCompletableFuture();
938 if (g.result == null || !uniRelay(g)) {
939 UniRelay<T> copy = new UniRelay<T>(this, g);
940 g.push(copy);
941 copy.tryFire(SYNC);
942 if (result == null)
943 return false;
944 }
945 } catch (Throwable ex) {
946 completeThrowable(ex);
947 }
948 }
949 return true;
950 }
951
952 private <V> CompletableFuture<V> uniComposeStage(
953 Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
954 if (f == null) throw new NullPointerException();
955 Object r; Throwable x;
956 if (e == null && (r = result) != null) {
957 // try to return function result directly
958 if (r instanceof AltResult) {
959 if ((x = ((AltResult)r).ex) != null) {
960 return new CompletableFuture<V>(encodeThrowable(x, r));
961 }
962 r = null;
963 }
964 try {
965 @SuppressWarnings("unchecked") T t = (T) r;
966 return f.apply(t).toCompletableFuture();
967 } catch (Throwable ex) {
968 return new CompletableFuture<V>(encodeThrowable(ex));
969 }
970 }
971 CompletableFuture<V> d = new CompletableFuture<V>();
972 UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
973 push(c);
974 c.tryFire(SYNC);
975 return d;
976 }
977
978 /* ------------- Two-input Completions -------------- */
979
980 /** A Completion for an action with two sources */
981 @SuppressWarnings("serial")
982 abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {
983 CompletableFuture<U> snd; // second source for action
984 BiCompletion(Executor executor, CompletableFuture<V> dep,
985 CompletableFuture<T> src, CompletableFuture<U> snd) {
986 super(executor, dep, src); this.snd = snd;
987 }
988 }
989
990 /** A Completion delegating to a BiCompletion */
991 @SuppressWarnings("serial")
992 static final class CoCompletion extends Completion {
993 BiCompletion<?,?,?> base;
994 CoCompletion(BiCompletion<?,?,?> base) { this.base = base; }
995 final CompletableFuture<?> tryFire(int mode) {
996 BiCompletion<?,?,?> c; CompletableFuture<?> d;
997 if ((c = base) == null || (d = c.tryFire(mode)) == null)
998 return null;
999 base = null; // detach
1000 return d;
1001 }
1002 final boolean isLive() {
1003 BiCompletion<?,?,?> c;
1004 return (c = base) != null && c.dep != null;
1005 }
1006 }
1007
1008 /** Pushes completion to this and b unless both done. */
1009 final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
1010 if (c != null) {
1011 Object r;
1012 while ((r = result) == null && !tryPushStack(c))
1013 c.lazySetNext(null); // clear on failure
1014 if (b != null && b != this && b.result == null) {
1015 Completion q = (r != null) ? c : new CoCompletion(c);
1016 while (b.result == null && !b.tryPushStack(q))
1017 q.lazySetNext(null); // clear on failure
1018 }
1019 }
1020 }
1021
1022 /** Post-processing after successful BiCompletion tryFire. */
1023 final CompletableFuture<T> postFire(CompletableFuture<?> a,
1024 CompletableFuture<?> b, int mode) {
1025 if (b != null && b.stack != null) { // clean second source
1026 if (mode < 0 || b.result == null)
1027 b.cleanStack();
1028 else
1029 b.postComplete();
1030 }
1031 return postFire(a, mode);
1032 }
1033
1034 @SuppressWarnings("serial")
1035 static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {
1036 BiFunction<? super T,? super U,? extends V> fn;
1037 BiApply(Executor executor, CompletableFuture<V> dep,
1038 CompletableFuture<T> src, CompletableFuture<U> snd,
1039 BiFunction<? super T,? super U,? extends V> fn) {
1040 super(executor, dep, src, snd); this.fn = fn;
1041 }
1042 final CompletableFuture<V> tryFire(int mode) {
1043 CompletableFuture<V> d;
1044 CompletableFuture<T> a;
1045 CompletableFuture<U> b;
1046 if ((d = dep) == null ||
1047 !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this))
1048 return null;
1049 dep = null; src = null; snd = null; fn = null;
1050 return d.postFire(a, b, mode);
1051 }
1052 }
1053
1054 final <R,S> boolean biApply(CompletableFuture<R> a,
1055 CompletableFuture<S> b,
1056 BiFunction<? super R,? super S,? extends T> f,
1057 BiApply<R,S,T> c) {
1058 Object r, s; Throwable x;
1059 if (a == null || (r = a.result) == null ||
1060 b == null || (s = b.result) == null || f == null)
1061 return false;
1062 tryComplete: if (result == null) {
1063 if (r instanceof AltResult) {
1064 if ((x = ((AltResult)r).ex) != null) {
1065 completeThrowable(x, r);
1066 break tryComplete;
1067 }
1068 r = null;
1069 }
1070 if (s instanceof AltResult) {
1071 if ((x = ((AltResult)s).ex) != null) {
1072 completeThrowable(x, s);
1073 break tryComplete;
1074 }
1075 s = null;
1076 }
1077 try {
1078 if (c != null && !c.claim())
1079 return false;
1080 @SuppressWarnings("unchecked") R rr = (R) r;
1081 @SuppressWarnings("unchecked") S ss = (S) s;
1082 completeValue(f.apply(rr, ss));
1083 } catch (Throwable ex) {
1084 completeThrowable(ex);
1085 }
1086 }
1087 return true;
1088 }
1089
1090 private <U,V> CompletableFuture<V> biApplyStage(
1091 Executor e, CompletionStage<U> o,
1092 BiFunction<? super T,? super U,? extends V> f) {
1093 CompletableFuture<U> b;
1094 if (f == null || (b = o.toCompletableFuture()) == null)
1095 throw new NullPointerException();
1096 CompletableFuture<V> d = new CompletableFuture<V>();
1097 if (e != null || !d.biApply(this, b, f, null)) {
1098 BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
1099 bipush(b, c);
1100 c.tryFire(SYNC);
1101 }
1102 return d;
1103 }
1104
1105 @SuppressWarnings("serial")
1106 static final class BiAccept<T,U> extends BiCompletion<T,U,Void> {
1107 BiConsumer<? super T,? super U> fn;
1108 BiAccept(Executor executor, CompletableFuture<Void> dep,
1109 CompletableFuture<T> src, CompletableFuture<U> snd,
1110 BiConsumer<? super T,? super U> fn) {
1111 super(executor, dep, src, snd); this.fn = fn;
1112 }
1113 final CompletableFuture<Void> tryFire(int mode) {
1114 CompletableFuture<Void> d;
1115 CompletableFuture<T> a;
1116 CompletableFuture<U> b;
1117 if ((d = dep) == null ||
1118 !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this))
1119 return null;
1120 dep = null; src = null; snd = null; fn = null;
1121 return d.postFire(a, b, mode);
1122 }
1123 }
1124
1125 final <R,S> boolean biAccept(CompletableFuture<R> a,
1126 CompletableFuture<S> b,
1127 BiConsumer<? super R,? super S> f,
1128 BiAccept<R,S> c) {
1129 Object r, s; Throwable x;
1130 if (a == null || (r = a.result) == null ||
1131 b == null || (s = b.result) == null || f == null)
1132 return false;
1133 tryComplete: if (result == null) {
1134 if (r instanceof AltResult) {
1135 if ((x = ((AltResult)r).ex) != null) {
1136 completeThrowable(x, r);
1137 break tryComplete;
1138 }
1139 r = null;
1140 }
1141 if (s instanceof AltResult) {
1142 if ((x = ((AltResult)s).ex) != null) {
1143 completeThrowable(x, s);
1144 break tryComplete;
1145 }
1146 s = null;
1147 }
1148 try {
1149 if (c != null && !c.claim())
1150 return false;
1151 @SuppressWarnings("unchecked") R rr = (R) r;
1152 @SuppressWarnings("unchecked") S ss = (S) s;
1153 f.accept(rr, ss);
1154 completeNull();
1155 } catch (Throwable ex) {
1156 completeThrowable(ex);
1157 }
1158 }
1159 return true;
1160 }
1161
1162 private <U> CompletableFuture<Void> biAcceptStage(
1163 Executor e, CompletionStage<U> o,
1164 BiConsumer<? super T,? super U> f) {
1165 CompletableFuture<U> b;
1166 if (f == null || (b = o.toCompletableFuture()) == null)
1167 throw new NullPointerException();
1168 CompletableFuture<Void> d = new CompletableFuture<Void>();
1169 if (e != null || !d.biAccept(this, b, f, null)) {
1170 BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f);
1171 bipush(b, c);
1172 c.tryFire(SYNC);
1173 }
1174 return d;
1175 }
1176
1177 @SuppressWarnings("serial")
1178 static final class BiRun<T,U> extends BiCompletion<T,U,Void> {
1179 Runnable fn;
1180 BiRun(Executor executor, CompletableFuture<Void> dep,
1181 CompletableFuture<T> src,
1182 CompletableFuture<U> snd,
1183 Runnable fn) {
1184 super(executor, dep, src, snd); this.fn = fn;
1185 }
1186 final CompletableFuture<Void> tryFire(int mode) {
1187 CompletableFuture<Void> d;
1188 CompletableFuture<T> a;
1189 CompletableFuture<U> b;
1190 if ((d = dep) == null ||
1191 !d.biRun(a = src, b = snd, fn, mode > 0 ? null : this))
1192 return null;
1193 dep = null; src = null; snd = null; fn = null;
1194 return d.postFire(a, b, mode);
1195 }
1196 }
1197
1198 final boolean biRun(CompletableFuture<?> a, CompletableFuture<?> b,
1199 Runnable f, BiRun<?,?> c) {
1200 Object r, s; Throwable x;
1201 if (a == null || (r = a.result) == null ||
1202 b == null || (s = b.result) == null || f == null)
1203 return false;
1204 if (result == null) {
1205 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
1206 completeThrowable(x, r);
1207 else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
1208 completeThrowable(x, s);
1209 else
1210 try {
1211 if (c != null && !c.claim())
1212 return false;
1213 f.run();
1214 completeNull();
1215 } catch (Throwable ex) {
1216 completeThrowable(ex);
1217 }
1218 }
1219 return true;
1220 }
1221
1222 private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o,
1223 Runnable f) {
1224 CompletableFuture<?> b;
1225 if (f == null || (b = o.toCompletableFuture()) == null)
1226 throw new NullPointerException();
1227 CompletableFuture<Void> d = new CompletableFuture<Void>();
1228 if (e != null || !d.biRun(this, b, f, null)) {
1229 BiRun<T,?> c = new BiRun<>(e, d, this, b, f);
1230 bipush(b, c);
1231 c.tryFire(SYNC);
1232 }
1233 return d;
1234 }
1235
1236 @SuppressWarnings("serial")
1237 static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And
1238 BiRelay(CompletableFuture<Void> dep,
1239 CompletableFuture<T> src,
1240 CompletableFuture<U> snd) {
1241 super(null, dep, src, snd);
1242 }
1243 final CompletableFuture<Void> tryFire(int mode) {
1244 CompletableFuture<Void> d;
1245 CompletableFuture<T> a;
1246 CompletableFuture<U> b;
1247 if ((d = dep) == null || !d.biRelay(a = src, b = snd))
1248 return null;
1249 src = null; snd = null; dep = null;
1250 return d.postFire(a, b, mode);
1251 }
1252 }
1253
1254 boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
1255 Object r, s; Throwable x;
1256 if (a == null || (r = a.result) == null ||
1257 b == null || (s = b.result) == null)
1258 return false;
1259 if (result == null) {
1260 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
1261 completeThrowable(x, r);
1262 else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
1263 completeThrowable(x, s);
1264 else
1265 completeNull();
1266 }
1267 return true;
1268 }
1269
1270 /** Recursively constructs a tree of completions. */
1271 static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
1272 int lo, int hi) {
1273 CompletableFuture<Void> d = new CompletableFuture<Void>();
1274 if (lo > hi) // empty
1275 d.result = NIL;
1276 else {
1277 CompletableFuture<?> a, b;
1278 int mid = (lo + hi) >>> 1;
1279 if ((a = (lo == mid ? cfs[lo] :
1280 andTree(cfs, lo, mid))) == null ||
1281 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
1282 andTree(cfs, mid+1, hi))) == null)
1283 throw new NullPointerException();
1284 if (!d.biRelay(a, b)) {
1285 BiRelay<?,?> c = new BiRelay<>(d, a, b);
1286 a.bipush(b, c);
1287 c.tryFire(SYNC);
1288 }
1289 }
1290 return d;
1291 }
1292
1293 /* ------------- Projected (Ored) BiCompletions -------------- */
1294
1295 /** Pushes completion to this and b unless either done. */
1296 final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
1297 if (c != null) {
1298 while ((b == null || b.result == null) && result == null) {
1299 if (tryPushStack(c)) {
1300 if (b != null && b != this && b.result == null) {
1301 Completion q = new CoCompletion(c);
1302 while (result == null && b.result == null &&
1303 !b.tryPushStack(q))
1304 q.lazySetNext(null); // clear on failure
1305 }
1306 break;
1307 }
1308 c.lazySetNext(null); // clear on failure
1309 }
1310 }
1311 }
1312
1313 @SuppressWarnings("serial")
1314 static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> {
1315 Function<? super T,? extends V> fn;
1316 OrApply(Executor executor, CompletableFuture<V> dep,
1317 CompletableFuture<T> src,
1318 CompletableFuture<U> snd,
1319 Function<? super T,? extends V> fn) {
1320 super(executor, dep, src, snd); this.fn = fn;
1321 }
1322 final CompletableFuture<V> tryFire(int mode) {
1323 CompletableFuture<V> d;
1324 CompletableFuture<T> a;
1325 CompletableFuture<U> b;
1326 if ((d = dep) == null ||
1327 !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this))
1328 return null;
1329 dep = null; src = null; snd = null; fn = null;
1330 return d.postFire(a, b, mode);
1331 }
1332 }
1333
1334 final <R,S extends R> boolean orApply(CompletableFuture<R> a,
1335 CompletableFuture<S> b,
1336 Function<? super R, ? extends T> f,
1337 OrApply<R,S,T> c) {
1338 Object r; Throwable x;
1339 if (a == null || b == null ||
1340 ((r = a.result) == null && (r = b.result) == null) || f == null)
1341 return false;
1342 tryComplete: if (result == null) {
1343 try {
1344 if (c != null && !c.claim())
1345 return false;
1346 if (r instanceof AltResult) {
1347 if ((x = ((AltResult)r).ex) != null) {
1348 completeThrowable(x, r);
1349 break tryComplete;
1350 }
1351 r = null;
1352 }
1353 @SuppressWarnings("unchecked") R rr = (R) r;
1354 completeValue(f.apply(rr));
1355 } catch (Throwable ex) {
1356 completeThrowable(ex);
1357 }
1358 }
1359 return true;
1360 }
1361
1362 private <U extends T,V> CompletableFuture<V> orApplyStage(
1363 Executor e, CompletionStage<U> o,
1364 Function<? super T, ? extends V> f) {
1365 CompletableFuture<U> b;
1366 if (f == null || (b = o.toCompletableFuture()) == null)
1367 throw new NullPointerException();
1368 CompletableFuture<V> d = new CompletableFuture<V>();
1369 if (e != null || !d.orApply(this, b, f, null)) {
1370 OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f);
1371 orpush(b, c);
1372 c.tryFire(SYNC);
1373 }
1374 return d;
1375 }
1376
1377 @SuppressWarnings("serial")
1378 static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> {
1379 Consumer<? super T> fn;
1380 OrAccept(Executor executor, CompletableFuture<Void> dep,
1381 CompletableFuture<T> src,
1382 CompletableFuture<U> snd,
1383 Consumer<? super T> fn) {
1384 super(executor, dep, src, snd); this.fn = fn;
1385 }
1386 final CompletableFuture<Void> tryFire(int mode) {
1387 CompletableFuture<Void> d;
1388 CompletableFuture<T> a;
1389 CompletableFuture<U> b;
1390 if ((d = dep) == null ||
1391 !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this))
1392 return null;
1393 dep = null; src = null; snd = null; fn = null;
1394 return d.postFire(a, b, mode);
1395 }
1396 }
1397
1398 final <R,S extends R> boolean orAccept(CompletableFuture<R> a,
1399 CompletableFuture<S> b,
1400 Consumer<? super R> f,
1401 OrAccept<R,S> c) {
1402 Object r; Throwable x;
1403 if (a == null || b == null ||
1404 ((r = a.result) == null && (r = b.result) == null) || f == null)
1405 return false;
1406 tryComplete: if (result == null) {
1407 try {
1408 if (c != null && !c.claim())
1409 return false;
1410 if (r instanceof AltResult) {
1411 if ((x = ((AltResult)r).ex) != null) {
1412 completeThrowable(x, r);
1413 break tryComplete;
1414 }
1415 r = null;
1416 }
1417 @SuppressWarnings("unchecked") R rr = (R) r;
1418 f.accept(rr);
1419 completeNull();
1420 } catch (Throwable ex) {
1421 completeThrowable(ex);
1422 }
1423 }
1424 return true;
1425 }
1426
1427 private <U extends T> CompletableFuture<Void> orAcceptStage(
1428 Executor e, CompletionStage<U> o, Consumer<? super T> f) {
1429 CompletableFuture<U> b;
1430 if (f == null || (b = o.toCompletableFuture()) == null)
1431 throw new NullPointerException();
1432 CompletableFuture<Void> d = new CompletableFuture<Void>();
1433 if (e != null || !d.orAccept(this, b, f, null)) {
1434 OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f);
1435 orpush(b, c);
1436 c.tryFire(SYNC);
1437 }
1438 return d;
1439 }
1440
1441 @SuppressWarnings("serial")
1442 static final class OrRun<T,U> extends BiCompletion<T,U,Void> {
1443 Runnable fn;
1444 OrRun(Executor executor, CompletableFuture<Void> dep,
1445 CompletableFuture<T> src,
1446 CompletableFuture<U> snd,
1447 Runnable fn) {
1448 super(executor, dep, src, snd); this.fn = fn;
1449 }
1450 final CompletableFuture<Void> tryFire(int mode) {
1451 CompletableFuture<Void> d;
1452 CompletableFuture<T> a;
1453 CompletableFuture<U> b;
1454 if ((d = dep) == null ||
1455 !d.orRun(a = src, b = snd, fn, mode > 0 ? null : this))
1456 return null;
1457 dep = null; src = null; snd = null; fn = null;
1458 return d.postFire(a, b, mode);
1459 }
1460 }
1461
1462 final boolean orRun(CompletableFuture<?> a, CompletableFuture<?> b,
1463 Runnable f, OrRun<?,?> c) {
1464 Object r; Throwable x;
1465 if (a == null || b == null ||
1466 ((r = a.result) == null && (r = b.result) == null) || f == null)
1467 return false;
1468 if (result == null) {
1469 try {
1470 if (c != null && !c.claim())
1471 return false;
1472 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
1473 completeThrowable(x, r);
1474 else {
1475 f.run();
1476 completeNull();
1477 }
1478 } catch (Throwable ex) {
1479 completeThrowable(ex);
1480 }
1481 }
1482 return true;
1483 }
1484
1485 private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o,
1486 Runnable f) {
1487 CompletableFuture<?> b;
1488 if (f == null || (b = o.toCompletableFuture()) == null)
1489 throw new NullPointerException();
1490 CompletableFuture<Void> d = new CompletableFuture<Void>();
1491 if (e != null || !d.orRun(this, b, f, null)) {
1492 OrRun<T,?> c = new OrRun<>(e, d, this, b, f);
1493 orpush(b, c);
1494 c.tryFire(SYNC);
1495 }
1496 return d;
1497 }
1498
1499 @SuppressWarnings("serial")
1500 static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or
1501 OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src,
1502 CompletableFuture<U> snd) {
1503 super(null, dep, src, snd);
1504 }
1505 final CompletableFuture<Object> tryFire(int mode) {
1506 CompletableFuture<Object> d;
1507 CompletableFuture<T> a;
1508 CompletableFuture<U> b;
1509 if ((d = dep) == null || !d.orRelay(a = src, b = snd))
1510 return null;
1511 src = null; snd = null; dep = null;
1512 return d.postFire(a, b, mode);
1513 }
1514 }
1515
1516 final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
1517 Object r;
1518 if (a == null || b == null ||
1519 ((r = a.result) == null && (r = b.result) == null))
1520 return false;
1521 if (result == null)
1522 completeRelay(r);
1523 return true;
1524 }
1525
1526 /** Recursively constructs a tree of completions. */
1527 static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
1528 int lo, int hi) {
1529 CompletableFuture<Object> d = new CompletableFuture<Object>();
1530 if (lo <= hi) {
1531 CompletableFuture<?> a, b;
1532 int mid = (lo + hi) >>> 1;
1533 if ((a = (lo == mid ? cfs[lo] :
1534 orTree(cfs, lo, mid))) == null ||
1535 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
1536 orTree(cfs, mid+1, hi))) == null)
1537 throw new NullPointerException();
1538 if (!d.orRelay(a, b)) {
1539 OrRelay<?,?> c = new OrRelay<>(d, a, b);
1540 a.orpush(b, c);
1541 c.tryFire(SYNC);
1542 }
1543 }
1544 return d;
1545 }
1546
1547 /* ------------- Zero-input Async forms -------------- */
1548
1549 @SuppressWarnings("serial")
1550 static final class AsyncSupply<T> extends Completion {
1551 CompletableFuture<T> dep; Supplier<T> fn;
1552 AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
1553 this.dep = dep; this.fn = fn;
1554 }
1555
1556 final CompletableFuture<T> tryFire(int alwaysAsync) {
1557 // assert alwaysAsync == ASYNC;
1558 CompletableFuture<T> d; Supplier<T> f;
1559 if ((d = dep) != null && (f = fn) != null) {
1560 dep = null; fn = null;
1561 if (d.result == null) {
1562 try {
1563 d.completeValue(f.get());
1564 } catch (Throwable ex) {
1565 d.completeThrowable(ex);
1566 }
1567 }
1568 d.postComplete();
1569 }
1570 return d;
1571 }
1572 final boolean isLive() { return dep != null; }
1573 }
1574
1575 static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
1576 Supplier<U> f) {
1577 if (f == null) throw new NullPointerException();
1578 CompletableFuture<U> d = new CompletableFuture<U>();
1579 e.execute(new AsyncSupply<U>(d, f));
1580 return d;
1581 }
1582
1583 @SuppressWarnings("serial")
1584 static final class AsyncRun extends Completion {
1585 CompletableFuture<Void> dep; Runnable fn;
1586 AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
1587 this.dep = dep; this.fn = fn;
1588 }
1589
1590 final CompletableFuture<Void> tryFire(int alwaysAsync) {
1591 // assert alwaysAsync == ASYNC;
1592 CompletableFuture<Void> d; Runnable f;
1593 if ((d = dep) != null && (f = fn) != null) {
1594 dep = null; fn = null;
1595 if (d.result == null) {
1596 try {
1597 f.run();
1598 d.completeNull();
1599 } catch (Throwable ex) {
1600 d.completeThrowable(ex);
1601 }
1602 }
1603 d.postComplete();
1604 }
1605 return d;
1606 }
1607 final boolean isLive() { return dep != null; }
1608 }
1609
1610 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
1611 if (f == null) throw new NullPointerException();
1612 CompletableFuture<Void> d = new CompletableFuture<Void>();
1613 e.execute(new AsyncRun(d, f));
1614 return d;
1615 }
1616
1617 /* ------------- Signallers -------------- */
1618
1619 /**
1620 * Completion for recording and releasing a waiting thread. This
1621 * class implements ManagedBlocker to avoid starvation when
1622 * blocking actions pile up in ForkJoinPools.
1623 */
1624 @SuppressWarnings("serial")
1625 static final class Signaller extends Completion
1626 implements ForkJoinPool.ManagedBlocker {
1627 long nanos; // wait time if timed
1628 final long deadline; // non-zero if timed
1629 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
1630 volatile Thread thread;
1631
1632 Signaller(boolean interruptible, long nanos, long deadline) {
1633 this.thread = Thread.currentThread();
1634 this.interruptControl = interruptible ? 1 : 0;
1635 this.nanos = nanos;
1636 this.deadline = deadline;
1637 }
1638 final CompletableFuture<?> tryFire(int ignore) {
1639 Thread w; // no need to atomically claim
1640 if ((w = thread) != null) {
1641 thread = null;
1642 LockSupport.unpark(w);
1643 }
1644 return null;
1645 }
1646 public boolean isReleasable() {
1647 if (thread == null)
1648 return true;
1649 if (Thread.interrupted()) {
1650 int i = interruptControl;
1651 interruptControl = -1;
1652 if (i > 0)
1653 return true;
1654 }
1655 if (deadline != 0L &&
1656 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
1657 thread = null;
1658 return true;
1659 }
1660 return false;
1661 }
1662 public boolean block() {
1663 if (isReleasable())
1664 return true;
1665 else if (deadline == 0L)
1666 LockSupport.park(this);
1667 else if (nanos > 0L)
1668 LockSupport.parkNanos(this, nanos);
1669 return isReleasable();
1670 }
1671 final boolean isLive() { return thread != null; }
1672 }
1673
1674 /**
1675 * Returns raw result after waiting, or null if interruptible and
1676 * interrupted.
1677 */
1678 private Object waitingGet(boolean interruptible) {
1679 Signaller q = null;
1680 boolean queued = false;
1681 int spins = -1;
1682 Object r;
1683 while ((r = result) == null) {
1684 if (spins < 0)
1685 spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1686 1 << 8 : 0; // Use brief spin-wait on multiprocessors
1687 else if (spins > 0) {
1688 if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1689 --spins;
1690 }
1691 else if (q == null)
1692 q = new Signaller(interruptible, 0L, 0L);
1693 else if (!queued)
1694 queued = tryPushStack(q);
1695 else if (interruptible && q.interruptControl < 0) {
1696 q.thread = null;
1697 cleanStack();
1698 return null;
1699 }
1700 else if (q.thread != null && result == null) {
1701 try {
1702 ForkJoinPool.managedBlock(q);
1703 } catch (InterruptedException ie) {
1704 q.interruptControl = -1;
1705 }
1706 }
1707 }
1708 if (q != null) {
1709 q.thread = null;
1710 if (q.interruptControl < 0) {
1711 if (interruptible)
1712 r = null; // report interruption
1713 else
1714 Thread.currentThread().interrupt();
1715 }
1716 }
1717 postComplete();
1718 return r;
1719 }
1720
1721 /**
1722 * Returns raw result after waiting, or null if interrupted, or
1723 * throws TimeoutException on timeout.
1724 */
1725 private Object timedGet(long nanos) throws TimeoutException {
1726 if (Thread.interrupted())
1727 return null;
1728 if (nanos <= 0L)
1729 throw new TimeoutException();
1730 long d = System.nanoTime() + nanos;
1731 Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
1732 boolean queued = false;
1733 Object r;
1734 while ((r = result) == null) {
1735 if (!queued)
1736 queued = tryPushStack(q);
1737 else if (q.interruptControl < 0 || q.nanos <= 0L) {
1738 q.thread = null;
1739 cleanStack();
1740 if (q.interruptControl < 0)
1741 return null;
1742 throw new TimeoutException();
1743 }
1744 else if (q.thread != null && result == null) {
1745 try {
1746 ForkJoinPool.managedBlock(q);
1747 } catch (InterruptedException ie) {
1748 q.interruptControl = -1;
1749 }
1750 }
1751 }
1752 if (q.interruptControl < 0)
1753 r = null;
1754 q.thread = null;
1755 postComplete();
1756 return r;
1757 }
1758
1759 /* ------------- public methods -------------- */
1760
1761 /**
1762 * Creates a new incomplete CompletableFuture.
1763 */
1764 public CompletableFuture() {
1765 }
1766
1767 /**
1768 * Creates a new complete CompletableFuture with given encoded result.
1769 */
1770 private CompletableFuture(Object r) {
1771 this.result = r;
1772 }
1773
1774 /**
1775 * Returns a new CompletableFuture that is asynchronously completed
1776 * by a task running in the {@link ForkJoinPool#commonPool()} with
1777 * the value obtained by calling the given Supplier.
1778 *
1779 * @param supplier a function returning the value to be used
1780 * to complete the returned CompletableFuture
1781 * @param <U> the function's return type
1782 * @return the new CompletableFuture
1783 */
1784 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1785 return asyncSupplyStage(asyncPool, supplier);
1786 }
1787
1788 /**
1789 * Returns a new CompletableFuture that is asynchronously completed
1790 * by a task running in the given executor with the value obtained
1791 * by calling the given Supplier.
1792 *
1793 * @param supplier a function returning the value to be used
1794 * to complete the returned CompletableFuture
1795 * @param executor the executor to use for asynchronous execution
1796 * @param <U> the function's return type
1797 * @return the new CompletableFuture
1798 */
1799 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1800 Executor executor) {
1801 return asyncSupplyStage(screenExecutor(executor), supplier);
1802 }
1803
1804 /**
1805 * Returns a new CompletableFuture that is asynchronously completed
1806 * by a task running in the {@link ForkJoinPool#commonPool()} after
1807 * it runs the given action.
1808 *
1809 * @param runnable the action to run before completing the
1810 * returned CompletableFuture
1811 * @return the new CompletableFuture
1812 */
1813 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1814 return asyncRunStage(asyncPool, runnable);
1815 }
1816
1817 /**
1818 * Returns a new CompletableFuture that is asynchronously completed
1819 * by a task running in the given executor after it runs the given
1820 * action.
1821 *
1822 * @param runnable the action to run before completing the
1823 * returned CompletableFuture
1824 * @param executor the executor to use for asynchronous execution
1825 * @return the new CompletableFuture
1826 */
1827 public static CompletableFuture<Void> runAsync(Runnable runnable,
1828 Executor executor) {
1829 return asyncRunStage(screenExecutor(executor), runnable);
1830 }
1831
1832 /**
1833 * Returns a new CompletableFuture that is already completed with
1834 * the given value.
1835 *
1836 * @param value the value
1837 * @param <U> the type of the value
1838 * @return the completed CompletableFuture
1839 */
1840 public static <U> CompletableFuture<U> completedFuture(U value) {
1841 return new CompletableFuture<U>((value == null) ? NIL : value);
1842 }
1843
1844 /**
1845 * Returns {@code true} if completed in any fashion: normally,
1846 * exceptionally, or via cancellation.
1847 *
1848 * @return {@code true} if completed
1849 */
1850 public boolean isDone() {
1851 return result != null;
1852 }
1853
1854 /**
1855 * Waits if necessary for this future to complete, and then
1856 * returns its result.
1857 *
1858 * @return the result value
1859 * @throws CancellationException if this future was cancelled
1860 * @throws ExecutionException if this future completed exceptionally
1861 * @throws InterruptedException if the current thread was interrupted
1862 * while waiting
1863 */
1864 public T get() throws InterruptedException, ExecutionException {
1865 Object r;
1866 return reportGet((r = result) == null ? waitingGet(true) : r);
1867 }
1868
1869 /**
1870 * Waits if necessary for at most the given time for this future
1871 * to complete, and then returns its result, if available.
1872 *
1873 * @param timeout the maximum time to wait
1874 * @param unit the time unit of the timeout argument
1875 * @return the result value
1876 * @throws CancellationException if this future was cancelled
1877 * @throws ExecutionException if this future completed exceptionally
1878 * @throws InterruptedException if the current thread was interrupted
1879 * while waiting
1880 * @throws TimeoutException if the wait timed out
1881 */
1882 public T get(long timeout, TimeUnit unit)
1883 throws InterruptedException, ExecutionException, TimeoutException {
1884 Object r;
1885 long nanos = unit.toNanos(timeout);
1886 return reportGet((r = result) == null ? timedGet(nanos) : r);
1887 }
1888
1889 /**
1890 * Returns the result value when complete, or throws an
1891 * (unchecked) exception if completed exceptionally. To better
1892 * conform with the use of common functional forms, if a
1893 * computation involved in the completion of this
1894 * CompletableFuture threw an exception, this method throws an
1895 * (unchecked) {@link CompletionException} with the underlying
1896 * exception as its cause.
1897 *
1898 * @return the result value
1899 * @throws CancellationException if the computation was cancelled
1900 * @throws CompletionException if this future completed
1901 * exceptionally or a completion computation threw an exception
1902 */
1903 public T join() {
1904 Object r;
1905 return reportJoin((r = result) == null ? waitingGet(false) : r);
1906 }
1907
1908 /**
1909 * Returns the result value (or throws any encountered exception)
1910 * if completed, else returns the given valueIfAbsent.
1911 *
1912 * @param valueIfAbsent the value to return if not completed
1913 * @return the result value, if completed, else the given valueIfAbsent
1914 * @throws CancellationException if the computation was cancelled
1915 * @throws CompletionException if this future completed
1916 * exceptionally or a completion computation threw an exception
1917 */
1918 public T getNow(T valueIfAbsent) {
1919 Object r;
1920 return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
1921 }
1922
1923 /**
1924 * If not already completed, sets the value returned by {@link
1925 * #get()} and related methods to the given value.
1926 *
1927 * @param value the result value
1928 * @return {@code true} if this invocation caused this CompletableFuture
1929 * to transition to a completed state, else {@code false}
1930 */
1931 public boolean complete(T value) {
1932 boolean triggered = completeValue(value);
1933 postComplete();
1934 return triggered;
1935 }
1936
1937 /**
1938 * If not already completed, causes invocations of {@link #get()}
1939 * and related methods to throw the given exception.
1940 *
1941 * @param ex the exception
1942 * @return {@code true} if this invocation caused this CompletableFuture
1943 * to transition to a completed state, else {@code false}
1944 */
1945 public boolean completeExceptionally(Throwable ex) {
1946 if (ex == null) throw new NullPointerException();
1947 boolean triggered = internalComplete(new AltResult(ex));
1948 postComplete();
1949 return triggered;
1950 }
1951
1952 public <U> CompletableFuture<U> thenApply(
1953 Function<? super T,? extends U> fn) {
1954 return uniApplyStage(null, fn);
1955 }
1956
1957 public <U> CompletableFuture<U> thenApplyAsync(
1958 Function<? super T,? extends U> fn) {
1959 return uniApplyStage(asyncPool, fn);
1960 }
1961
1962 public <U> CompletableFuture<U> thenApplyAsync(
1963 Function<? super T,? extends U> fn, Executor executor) {
1964 return uniApplyStage(screenExecutor(executor), fn);
1965 }
1966
1967 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
1968 return uniAcceptStage(null, action);
1969 }
1970
1971 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
1972 return uniAcceptStage(asyncPool, action);
1973 }
1974
1975 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
1976 Executor executor) {
1977 return uniAcceptStage(screenExecutor(executor), action);
1978 }
1979
1980 public CompletableFuture<Void> thenRun(Runnable action) {
1981 return uniRunStage(null, action);
1982 }
1983
1984 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1985 return uniRunStage(asyncPool, action);
1986 }
1987
1988 public CompletableFuture<Void> thenRunAsync(Runnable action,
1989 Executor executor) {
1990 return uniRunStage(screenExecutor(executor), action);
1991 }
1992
1993 public <U,V> CompletableFuture<V> thenCombine(
1994 CompletionStage<? extends U> other,
1995 BiFunction<? super T,? super U,? extends V> fn) {
1996 return biApplyStage(null, other, fn);
1997 }
1998
1999 public <U,V> CompletableFuture<V> thenCombineAsync(
2000 CompletionStage<? extends U> other,
2001 BiFunction<? super T,? super U,? extends V> fn) {
2002 return biApplyStage(asyncPool, other, fn);
2003 }
2004
2005 public <U,V> CompletableFuture<V> thenCombineAsync(
2006 CompletionStage<? extends U> other,
2007 BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
2008 return biApplyStage(screenExecutor(executor), other, fn);
2009 }
2010
2011 public <U> CompletableFuture<Void> thenAcceptBoth(
2012 CompletionStage<? extends U> other,
2013 BiConsumer<? super T, ? super U> action) {
2014 return biAcceptStage(null, other, action);
2015 }
2016
2017 public <U> CompletableFuture<Void> thenAcceptBothAsync(
2018 CompletionStage<? extends U> other,
2019 BiConsumer<? super T, ? super U> action) {
2020 return biAcceptStage(asyncPool, other, action);
2021 }
2022
2023 public <U> CompletableFuture<Void> thenAcceptBothAsync(
2024 CompletionStage<? extends U> other,
2025 BiConsumer<? super T, ? super U> action, Executor executor) {
2026 return biAcceptStage(screenExecutor(executor), other, action);
2027 }
2028
2029 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
2030 Runnable action) {
2031 return biRunStage(null, other, action);
2032 }
2033
2034 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
2035 Runnable action) {
2036 return biRunStage(asyncPool, other, action);
2037 }
2038
2039 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
2040 Runnable action,
2041 Executor executor) {
2042 return biRunStage(screenExecutor(executor), other, action);
2043 }
2044
2045 public <U> CompletableFuture<U> applyToEither(
2046 CompletionStage<? extends T> other, Function<? super T, U> fn) {
2047 return orApplyStage(null, other, fn);
2048 }
2049
2050 public <U> CompletableFuture<U> applyToEitherAsync(
2051 CompletionStage<? extends T> other, Function<? super T, U> fn) {
2052 return orApplyStage(asyncPool, other, fn);
2053 }
2054
2055 public <U> CompletableFuture<U> applyToEitherAsync(
2056 CompletionStage<? extends T> other, Function<? super T, U> fn,
2057 Executor executor) {
2058 return orApplyStage(screenExecutor(executor), other, fn);
2059 }
2060
2061 public CompletableFuture<Void> acceptEither(
2062 CompletionStage<? extends T> other, Consumer<? super T> action) {
2063 return orAcceptStage(null, other, action);
2064 }
2065
2066 public CompletableFuture<Void> acceptEitherAsync(
2067 CompletionStage<? extends T> other, Consumer<? super T> action) {
2068 return orAcceptStage(asyncPool, other, action);
2069 }
2070
2071 public CompletableFuture<Void> acceptEitherAsync(
2072 CompletionStage<? extends T> other, Consumer<? super T> action,
2073 Executor executor) {
2074 return orAcceptStage(screenExecutor(executor), other, action);
2075 }
2076
2077 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2078 Runnable action) {
2079 return orRunStage(null, other, action);
2080 }
2081
2082 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2083 Runnable action) {
2084 return orRunStage(asyncPool, other, action);
2085 }
2086
2087 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2088 Runnable action,
2089 Executor executor) {
2090 return orRunStage(screenExecutor(executor), other, action);
2091 }
2092
2093 public <U> CompletableFuture<U> thenCompose(
2094 Function<? super T, ? extends CompletionStage<U>> fn) {
2095 return uniComposeStage(null, fn);
2096 }
2097
2098 public <U> CompletableFuture<U> thenComposeAsync(
2099 Function<? super T, ? extends CompletionStage<U>> fn) {
2100 return uniComposeStage(asyncPool, fn);
2101 }
2102
2103 public <U> CompletableFuture<U> thenComposeAsync(
2104 Function<? super T, ? extends CompletionStage<U>> fn,
2105 Executor executor) {
2106 return uniComposeStage(screenExecutor(executor), fn);
2107 }
2108
2109 public CompletableFuture<T> whenComplete(
2110 BiConsumer<? super T, ? super Throwable> action) {
2111 return uniWhenCompleteStage(null, action);
2112 }
2113
2114 public CompletableFuture<T> whenCompleteAsync(
2115 BiConsumer<? super T, ? super Throwable> action) {
2116 return uniWhenCompleteStage(asyncPool, action);
2117 }
2118
2119 public CompletableFuture<T> whenCompleteAsync(
2120 BiConsumer<? super T, ? super Throwable> action, Executor executor) {
2121 return uniWhenCompleteStage(screenExecutor(executor), action);
2122 }
2123
2124 public <U> CompletableFuture<U> handle(
2125 BiFunction<? super T, Throwable, ? extends U> fn) {
2126 return uniHandleStage(null, fn);
2127 }
2128
2129 public <U> CompletableFuture<U> handleAsync(
2130 BiFunction<? super T, Throwable, ? extends U> fn) {
2131 return uniHandleStage(asyncPool, fn);
2132 }
2133
2134 public <U> CompletableFuture<U> handleAsync(
2135 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
2136 return uniHandleStage(screenExecutor(executor), fn);
2137 }
2138
2139 /**
2140 * Returns this CompletableFuture.
2141 *
2142 * @return this CompletableFuture
2143 */
2144 public CompletableFuture<T> toCompletableFuture() {
2145 return this;
2146 }
2147
2148 // not in interface CompletionStage
2149
2150 /**
2151 * Returns a new CompletableFuture that is completed when this
2152 * CompletableFuture completes, with the result of the given
2153 * function of the exception triggering this CompletableFuture's
2154 * completion when it completes exceptionally; otherwise, if this
2155 * CompletableFuture completes normally, then the returned
2156 * CompletableFuture also completes normally with the same value.
2157 * Note: More flexible versions of this functionality are
2158 * available using methods {@code whenComplete} and {@code handle}.
2159 *
2160 * @param fn the function to use to compute the value of the
2161 * returned CompletableFuture if this CompletableFuture completed
2162 * exceptionally
2163 * @return the new CompletableFuture
2164 */
2165 public CompletableFuture<T> exceptionally(
2166 Function<Throwable, ? extends T> fn) {
2167 return uniExceptionallyStage(fn);
2168 }
2169
2170 /* ------------- Arbitrary-arity constructions -------------- */
2171
2172 /**
2173 * Returns a new CompletableFuture that is completed when all of
2174 * the given CompletableFutures complete. If any of the given
2175 * CompletableFutures complete exceptionally, then the returned
2176 * CompletableFuture also does so, with a CompletionException
2177 * holding this exception as its cause. Otherwise, the results,
2178 * if any, of the given CompletableFutures are not reflected in
2179 * the returned CompletableFuture, but may be obtained by
2180 * inspecting them individually. If no CompletableFutures are
2181 * provided, returns a CompletableFuture completed with the value
2182 * {@code null}.
2183 *
2184 * <p>Among the applications of this method is to await completion
2185 * of a set of independent CompletableFutures before continuing a
2186 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2187 * c3).join();}.
2188 *
2189 * @param cfs the CompletableFutures
2190 * @return a new CompletableFuture that is completed when all of the
2191 * given CompletableFutures complete
2192 * @throws NullPointerException if the array or any of its elements are
2193 * {@code null}
2194 */
2195 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2196 return andTree(cfs, 0, cfs.length - 1);
2197 }
2198
2199 /**
2200 * Returns a new CompletableFuture that is completed when any of
2201 * the given CompletableFutures complete, with the same result.
2202 * Otherwise, if it completed exceptionally, the returned
2203 * CompletableFuture also does so, with a CompletionException
2204 * holding this exception as its cause. If no CompletableFutures
2205 * are provided, returns an incomplete CompletableFuture.
2206 *
2207 * @param cfs the CompletableFutures
2208 * @return a new CompletableFuture that is completed with the
2209 * result or exception of any of the given CompletableFutures when
2210 * one completes
2211 * @throws NullPointerException if the array or any of its elements are
2212 * {@code null}
2213 */
2214 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2215 return orTree(cfs, 0, cfs.length - 1);
2216 }
2217
2218 /* ------------- Control and status methods -------------- */
2219
2220 /**
2221 * If not already completed, completes this CompletableFuture with
2222 * a {@link CancellationException}. Dependent CompletableFutures
2223 * that have not already completed will also complete
2224 * exceptionally, with a {@link CompletionException} caused by
2225 * this {@code CancellationException}.
2226 *
2227 * @param mayInterruptIfRunning this value has no effect in this
2228 * implementation because interrupts are not used to control
2229 * processing.
2230 *
2231 * @return {@code true} if this task is now cancelled
2232 */
2233 public boolean cancel(boolean mayInterruptIfRunning) {
2234 boolean cancelled = (result == null) &&
2235 internalComplete(new AltResult(new CancellationException()));
2236 postComplete();
2237 return cancelled || isCancelled();
2238 }
2239
2240 /**
2241 * Returns {@code true} if this CompletableFuture was cancelled
2242 * before it completed normally.
2243 *
2244 * @return {@code true} if this CompletableFuture was cancelled
2245 * before it completed normally
2246 */
2247 public boolean isCancelled() {
2248 Object r;
2249 return ((r = result) instanceof AltResult) &&
2250 (((AltResult)r).ex instanceof CancellationException);
2251 }
2252
2253 /**
2254 * Returns {@code true} if this CompletableFuture completed
2255 * exceptionally, in any way. Possible causes include
2256 * cancellation, explicit invocation of {@code
2257 * completeExceptionally}, and abrupt termination of a
2258 * CompletionStage action.
2259 *
2260 * @return {@code true} if this CompletableFuture completed
2261 * exceptionally
2262 */
2263 public boolean isCompletedExceptionally() {
2264 Object r;
2265 return ((r = result) instanceof AltResult) && r != NIL;
2266 }
2267
2268 /**
2269 * Forcibly sets or resets the value subsequently returned by
2270 * method {@link #get()} and related methods, whether or not
2271 * already completed. This method is designed for use only in
2272 * error recovery actions, and even in such situations may result
2273 * in ongoing dependent completions using established versus
2274 * overwritten outcomes.
2275 *
2276 * @param value the completion value
2277 */
2278 public void obtrudeValue(T value) {
2279 result = (value == null) ? NIL : value;
2280 postComplete();
2281 }
2282
2283 /**
2284 * Forcibly causes subsequent invocations of method {@link #get()}
2285 * and related methods to throw the given exception, whether or
2286 * not already completed. This method is designed for use only in
2287 * error recovery actions, and even in such situations may result
2288 * in ongoing dependent completions using established versus
2289 * overwritten outcomes.
2290 *
2291 * @param ex the exception
2292 * @throws NullPointerException if the exception is null
2293 */
2294 public void obtrudeException(Throwable ex) {
2295 if (ex == null) throw new NullPointerException();
2296 result = new AltResult(ex);
2297 postComplete();
2298 }
2299
2300 /**
2301 * Returns the estimated number of CompletableFutures whose
2302 * completions are awaiting completion of this CompletableFuture.
2303 * This method is designed for use in monitoring system state, not
2304 * for synchronization control.
2305 *
2306 * @return the number of dependent CompletableFutures
2307 */
2308 public int getNumberOfDependents() {
2309 int count = 0;
2310 for (Completion p = stack; p != null; p = p.next)
2311 ++count;
2312 return count;
2313 }
2314
2315 /**
2316 * Returns a string identifying this CompletableFuture, as well as
2317 * its completion state. The state, in brackets, contains the
2318 * String {@code "Completed Normally"} or the String {@code
2319 * "Completed Exceptionally"}, or the String {@code "Not
2320 * completed"} followed by the number of CompletableFutures
2321 * dependent upon its completion, if any.
2322 *
2323 * @return a string identifying this CompletableFuture, as well as its state
2324 */
2325 public String toString() {
2326 Object r = result;
2327 int count;
2328 return super.toString() +
2329 ((r == null) ?
2330 (((count = getNumberOfDependents()) == 0) ?
2331 "[Not completed]" :
2332 "[Not completed, " + count + " dependents]") :
2333 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2334 "[Completed exceptionally]" :
2335 "[Completed normally]"));
2336 }
2337
2338 // Unsafe mechanics
2339 private static final sun.misc.Unsafe UNSAFE;
2340 private static final long RESULT;
2341 private static final long STACK;
2342 static {
2343 try {
2344 UNSAFE = sun.misc.Unsafe.getUnsafe();
2345 Class<?> k = CompletableFuture.class;
2346 RESULT = UNSAFE.objectFieldOffset
2347 (k.getDeclaredField("result"));
2348 STACK = UNSAFE.objectFieldOffset
2349 (k.getDeclaredField("stack"));
2350 } catch (Exception x) {
2351 throw new Error(x);
2352 }
2353 }
2354 }