ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.128
Committed: Mon Jun 16 20:38:11 2014 UTC (9 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.127: +210 -159 lines
Log Message:
ensure that propagated exceptions reuse the same result object; introduce private CompletableFuture(Object r) constructor; rename completeNil to completeNull;

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.locks.LockSupport;
25
26 /**
27 * A {@link Future} that may be explicitly completed (setting its
28 * value and status), and may be used as a {@link CompletionStage},
29 * supporting dependent functions and actions that trigger upon its
30 * completion.
31 *
32 * <p>When two or more threads attempt to
33 * {@link #complete complete},
34 * {@link #completeExceptionally completeExceptionally}, or
35 * {@link #cancel cancel}
36 * a CompletableFuture, only one of them succeeds.
37 *
38 * <p>In addition to these and related methods for directly
39 * manipulating status and results, CompletableFuture implements
40 * interface {@link CompletionStage} with the following policies: <ul>
41 *
42 * <li>Actions supplied for dependent completions of
43 * <em>non-async</em> methods may be performed by the thread that
44 * completes the current CompletableFuture, or by any other caller of
45 * a completion method.</li>
46 *
47 * <li>All <em>async</em> methods without an explicit Executor
48 * argument are performed using the {@link ForkJoinPool#commonPool()}
49 * (unless it does not support a parallelism level of at least two, in
50 * which case, a new Thread is created to run each task). To simplify
51 * monitoring, debugging, and tracking, all generated asynchronous
52 * tasks are instances of the marker interface {@link
53 * AsynchronousCompletionTask}. </li>
54 *
55 * <li>All CompletionStage methods are implemented independently of
56 * other public methods, so the behavior of one method is not impacted
57 * by overrides of others in subclasses. </li> </ul>
58 *
59 * <p>CompletableFuture also implements {@link Future} with the following
60 * policies: <ul>
61 *
62 * <li>Since (unlike {@link FutureTask}) this class has no direct
63 * control over the computation that causes it to be completed,
64 * cancellation is treated as just another form of exceptional
65 * completion. Method {@link #cancel cancel} has the same effect as
66 * {@code completeExceptionally(new CancellationException())}. Method
67 * {@link #isCompletedExceptionally} can be used to determine if a
68 * CompletableFuture completed in any exceptional fashion.</li>
69 *
70 * <li>In case of exceptional completion with a CompletionException,
71 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
72 * {@link ExecutionException} with the same cause as held in the
73 * corresponding CompletionException. To simplify usage in most
74 * contexts, this class also defines methods {@link #join()} and
75 * {@link #getNow} that instead throw the CompletionException directly
76 * in these cases.</li> </ul>
77 *
78 * @author Doug Lea
79 * @since 1.8
80 */
81 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
82
83 /*
84 * Overview:
85 *
86 * A CompletableFuture may have dependent completion actions,
87 * collected in a linked stack. It atomically completes by CASing
88 * a result field, and then pops off and runs those actions. This
89 * applies across normal vs exceptional outcomes, sync vs async
90 * actions, binary triggers, and various forms of completions.
91 *
92 * Non-nullness of field result (set via CAS) indicates done. An
93 * AltResult is used to box null as a result, as well as to hold
94 * exceptions. Using a single field makes completion simple to
95 * detect and trigger. Encoding and decoding is straightforward
96 * but adds to the sprawl of trapping and associating exceptions
97 * with targets. Minor simplifications rely on (static) NIL (to
98 * box null results) being the only AltResult with a null
99 * exception field, so we don't usually need explicit comparisons.
100 * Even though some of the generics casts are unchecked (see
101 * SuppressWarnings annotations), they are placed to be
102 * appropriate even if checked.
103 *
104 * Dependent actions are represented by Completion objects linked
105 * as Treiber stacks headed by field "stack". There are Completion
106 * classes for each kind of action, grouped into single-input
107 * (UniCompletion), two-input (BiCompletion), projected
108 * (BiCompletions using either (not both) of two inputs), shared
109 * (CoCompletion, used by the second of two sources), zero-input
110 * source actions, and Signallers that unblock waiters. Class
111 * Completion extends ForkJoinTask to enable async execution
112 * (adding no space overhead because we exploit its "tag" methods
113 * to maintain claims). It is also declared as Runnable to allow
114 * usage with arbitrary executors.
115 *
116 * Support for each kind of CompletionStage relies on a separate
117 * class, along with two CompletableFuture methods:
118 *
119 * * A Completion class with name X corresponding to function,
120 * prefaced with "Uni", "Bi", or "Or". Each class contains
121 * fields for source(s), actions, and dependent. They are
122 * boringly similar, differing from others only with respect to
123 * underlying functional forms. We do this so that users don't
124 * encounter layers of adaptors in common usages. We also
125 * include "Relay" classes/methods that don't correspond to user
126 * methods; they copy results from one stage to another.
127 *
128 * * Boolean CompletableFuture method x(...) (for example
129 * uniApply) takes all of the arguments needed to check that an
130 * action is triggerable, and then either runs the action or
131 * arranges its async execution by executing its Completion
132 * argument, if present. The method returns true if known to be
133 * complete.
134 *
135 * * Completion method tryFire(int mode) invokes the associated x
136 * method with its held arguments, and on success cleans up.
137 * The mode argument allows exec to be called twice (SYNC, then
138 * ASYNC); the first to screen and trap exceptions while
139 * arranging to execute, and the second when called from a
140 * task. (A few classes are not used async so take slightly
141 * different forms.) The claim() callback suppresses function
142 * invocation if already claimed by another thread.
143 *
144 * * CompletableFuture method xStage(...) is called from a public
145 * stage method of CompletableFuture x. It screens user
146 * arguments and invokes and/or creates the stage object. If
147 * not async and x is already complete, the action is run
148 * immediately. Otherwise a Completion c is created, pushed to
149 * x's stack (unless done), and started or triggered via
150 * c.tryFire. This also covers races possible if x completes
151 * while pushing. Classes with two inputs (for example BiApply)
152 * deal with races across both while pushing actions. The
153 * second completion is a CoCompletion pointing to the first,
154 * shared so that at most one performs the action. The
155 * multiple-arity methods allOf and anyOf do this pairwise to
156 * form trees of completions.
157 *
158 * Note that the generic type parameters of methods vary according
159 * to whether "this" is a source, dependent, or completion.
160 *
161 * Method postComplete is called upon completion unless the target
162 * is guaranteed not to be observable (i.e., not yet returned or
163 * linked). Multiple threads can call postComplete, which
164 * atomically pops each dependent action, and tries to trigger it
165 * via method exec. Triggering can propagate recursively, so exec
166 * in NESTED mode returns its completed dependent (if one exists)
167 * for further processing by its caller (see method postFire).
168 *
169 * Blocking methods get() and join() rely on Signaller Completions
170 * that wake up waiting threads. The mechanics are similar to
171 * Treiber stack wait-nodes used in FutureTask, Phaser, and
172 * SynchronousQueue. See their internal documentation for
173 * algorithmic details.
174 *
175 * Without precautions, CompletableFutures would be prone to
176 * garbage accumulation as chains of Completions build up, each
177 * pointing back to its sources. So we null out fields as soon as
178 * possible (see especially method Completion.detach). The
179 * screening checks needed anyway harmlessly ignore null arguments
180 * that may have been obtained during races with threads nulling
181 * out fields. We also try to unlink fired Completions from
182 * stacks that might never be popped (see method postFire).
183 * Completion fields need not be declared as final or volatile
184 * because they are only visible to other threads upon safe
185 * publication.
186 */
187
188 volatile Object result; // Either the result or boxed AltResult
189 volatile Completion stack; // Top of Treiber stack of dependent actions
190
191 final boolean internalComplete(Object r) { // CAS from null to r
192 return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
193 }
194
195 final boolean casStack(Completion cmp, Completion val) {
196 return UNSAFE.compareAndSwapObject(this, STACK, cmp, val);
197 }
198
199 /* ------------- Encoding and decoding outcomes -------------- */
200
201 static final class AltResult { // See above
202 final Throwable ex; // null only for NIL
203 AltResult(Throwable x) { this.ex = x; }
204 }
205
206 /** The encoding of the null value. */
207 static final AltResult NIL = new AltResult(null);
208
209 /** Completes with the null value, unless already completed. */
210 final boolean completeNull() {
211 return UNSAFE.compareAndSwapObject(this, RESULT, null,
212 NIL);
213 }
214
215 /** Returns the encoding of the given non-exceptional value. */
216 final Object encodeValue(T t) {
217 return (t == null) ? NIL : t;
218 }
219
220 /** Completes with a non-exceptional result, unless already completed. */
221 final boolean completeValue(T t) {
222 return UNSAFE.compareAndSwapObject(this, RESULT, null,
223 (t == null) ? NIL : t);
224 }
225
226 /**
227 * Returns the encoding of the given (non-null) exception as a
228 * wrapped CompletionException unless it is one already.
229 */
230 static AltResult encodeThrowable(Throwable x) {
231 return new AltResult((x instanceof CompletionException) ? x :
232 new CompletionException(x));
233 }
234
235 /** Completes with an exceptional result, unless already completed. */
236 final boolean completeThrowable(Throwable x) {
237 return UNSAFE.compareAndSwapObject(this, RESULT, null,
238 encodeThrowable(x));
239 }
240
241 /**
242 * Returns the encoding of the given (non-null) exception as a
243 * wrapped CompletionException unless it is one already. May
244 * return the given Object r (which must have been the result of a
245 * source future) if it is equivalent, i.e. if this is a simple
246 * relay of an existing CompletionException.
247 */
248 static Object encodeThrowable(Throwable x, Object r) {
249 if (!(x instanceof CompletionException))
250 x = new CompletionException(x);
251 else if (r instanceof AltResult && x == ((AltResult)r).ex)
252 return r;
253 return new AltResult(x);
254 }
255
256 /**
257 * Completes with the given (non-null) exceptional result as a
258 * wrapped CompletionException unless it is one already, unless
259 * already completed. May complete with the given Object r
260 * (which must have been the result of a source future) if it is
261 * equivalent, i.e. if this is a simple propagation of an
262 * existing CompletionException.
263 */
264 final boolean completeThrowable(Throwable x, Object r) {
265 return UNSAFE.compareAndSwapObject(this, RESULT, null,
266 encodeThrowable(x, r));
267 }
268
269 /**
270 * Returns the encoding of the given arguments: if the exception
271 * is non-null, encodes as AltResult. Otherwise uses the given
272 * value, boxed as NIL if null.
273 */
274 Object encodeOutcome(T t, Throwable x) {
275 return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x);
276 }
277
278 /**
279 * Returns the encoding of a copied outcome; if exceptional,
280 * rewraps as a CompletionException, else returns argument.
281 */
282 static Object encodeRelay(Object r) {
283 Throwable x;
284 return (((r instanceof AltResult) &&
285 (x = ((AltResult)r).ex) != null &&
286 !(x instanceof CompletionException)) ?
287 new AltResult(new CompletionException(x)) : r);
288 }
289
290 /**
291 * Completes with r or a copy of r, unless already completed.
292 * If exceptional, r is first coerced to a CompletionException.
293 */
294 final boolean completeRelay(Object r) {
295 return UNSAFE.compareAndSwapObject(this, RESULT, null,
296 encodeRelay(r));
297 }
298
299 /**
300 * Reports result using Future.get conventions.
301 */
302 private static <T> T reportGet(Object r)
303 throws InterruptedException, ExecutionException {
304 if (r == null) // by convention below, null means interrupted
305 throw new InterruptedException();
306 if (r instanceof AltResult) {
307 Throwable x, cause;
308 if ((x = ((AltResult)r).ex) == null)
309 return null;
310 if (x instanceof CancellationException)
311 throw (CancellationException)x;
312 if ((x instanceof CompletionException) &&
313 (cause = x.getCause()) != null)
314 x = cause;
315 throw new ExecutionException(x);
316 }
317 @SuppressWarnings("unchecked") T t = (T) r;
318 return t;
319 }
320
321 /**
322 * Decodes outcome to return result or throw unchecked exception.
323 */
324 private static <T> T reportJoin(Object r) {
325 if (r instanceof AltResult) {
326 Throwable x;
327 if ((x = ((AltResult)r).ex) == null)
328 return null;
329 if (x instanceof CancellationException)
330 throw (CancellationException)x;
331 if (x instanceof CompletionException)
332 throw (CompletionException)x;
333 throw new CompletionException(x);
334 }
335 @SuppressWarnings("unchecked") T t = (T) r;
336 return t;
337 }
338
339 /* ------------- Async task preliminaries -------------- */
340
341 /**
342 * A marker interface identifying asynchronous tasks produced by
343 * {@code async} methods. This may be useful for monitoring,
344 * debugging, and tracking asynchronous activities.
345 *
346 * @since 1.8
347 */
348 public static interface AsynchronousCompletionTask {
349 }
350
351 private static final boolean useCommonPool =
352 (ForkJoinPool.getCommonPoolParallelism() > 1);
353
354 /**
355 * Default executor -- ForkJoinPool.commonPool() unless it cannot
356 * support parallelism.
357 */
358 private static final Executor asyncPool = useCommonPool ?
359 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
360
361 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
362 static final class ThreadPerTaskExecutor implements Executor {
363 public void execute(Runnable r) { new Thread(r).start(); }
364 }
365
366 /**
367 * Null-checks user executor argument, and translates uses of
368 * commonPool to asyncPool in case parallelism disabled.
369 */
370 static Executor screenExecutor(Executor e) {
371 if (!useCommonPool && e == ForkJoinPool.commonPool())
372 return asyncPool;
373 if (e == null) throw new NullPointerException();
374 return e;
375 }
376
377 // Modes for Completion.exec. Signedness matters.
378 static final int SYNC = 0;
379 static final int ASYNC = 1;
380 static final int NESTED = -1;
381
382 /* ------------- Base Completion classes and operations -------------- */
383
384 @SuppressWarnings("serial")
385 abstract static class Completion extends ForkJoinTask<Void>
386 implements Runnable, AsynchronousCompletionTask {
387 volatile Completion next; // Treiber stack link
388
389 /**
390 * Performs completion action if triggered, returning a
391 * dependent that may need propagation, if one exists.
392 *
393 * @param mode SYNC, ASYNC, or NESTED
394 */
395 abstract CompletableFuture<?> tryFire(int mode);
396
397 /** Returns true if possibly still triggerable. Used by cleanStack. */
398 abstract boolean isLive();
399
400 public final void run() { tryFire(ASYNC); }
401 public final boolean exec() { tryFire(ASYNC); return true; }
402 public final Void getRawResult() { return null; }
403 public final void setRawResult(Void v) {}
404 }
405
406 /**
407 * Pops and tries to trigger all reachable dependents. Call only
408 * when known to be done.
409 */
410 final void postComplete() {
411 /*
412 * On each step, variable f holds current dependents to pop
413 * and run. It is extended along only one path at a time,
414 * pushing others to avoid unbounded recursion.
415 */
416 CompletableFuture<?> f = this; Completion h;
417 while ((h = f.stack) != null ||
418 (f != this && (h = (f = this).stack) != null)) {
419 CompletableFuture<?> d; Completion t;
420 if (f.casStack(h, t = h.next)) {
421 if (t != null) {
422 if (f != this) { // push
423 do {} while (!casStack(h.next = stack, h));
424 continue;
425 }
426 h.next = null; // detach
427 }
428 f = (d = h.tryFire(NESTED)) == null ? this : d;
429 }
430 }
431 }
432
433 /** Traverses stack and unlinks dead Completions. */
434 final void cleanStack() {
435 for (Completion p = null, q = stack; q != null;) {
436 Completion s = q.next;
437 if (q.isLive()) {
438 p = q;
439 q = s;
440 }
441 else if (p == null) {
442 casStack(q, s);
443 q = stack;
444 }
445 else {
446 p.next = s;
447 if (p.isLive())
448 q = s;
449 else {
450 p = null; // restart
451 q = stack;
452 }
453 }
454 }
455 }
456
457 /* ------------- One-input Completions -------------- */
458
459 /** A Completion with a source, dependent, and executor. */
460 @SuppressWarnings("serial")
461 abstract static class UniCompletion<T,V> extends Completion {
462 Executor executor; // executor to use (null if none)
463 CompletableFuture<V> dep; // the dependent to complete
464 CompletableFuture<T> src; // source for action
465
466 UniCompletion(Executor executor, CompletableFuture<V> dep,
467 CompletableFuture<T> src) {
468 this.executor = executor; this.dep = dep; this.src = src;
469 }
470
471 /**
472 * Returns true if action can be run. Call only when known to
473 * be triggerable. Uses FJ tag bit to ensure that only one
474 * thread claims ownership. If async, starts as task -- a
475 * later call to tryFire will run action.
476 */
477 final boolean claim() {
478 Executor e = executor;
479 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
480 if (e == null)
481 return true;
482 executor = null; // disable
483 e.execute(this);
484 }
485 return false;
486 }
487
488 final boolean isLive() { return dep != null; }
489 }
490
491 /** Pushes the given completion (if it exists) unless done. */
492 final void push(UniCompletion<?,?> c) {
493 if (c != null) {
494 while (result == null && !casStack(c.next = stack, c))
495 c.next = null; // clear on failure
496 }
497 }
498
499 /**
500 * Post-processing by dependent after successful UniCompletion
501 * tryFire. Tries to clean stack of source a, and then either runs
502 * postComplete or returns this to caller, depending on mode.
503 */
504 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
505 if (a != null && a.stack != null) {
506 if (mode < 0 || a.result == null)
507 a.cleanStack();
508 else
509 a.postComplete();
510 }
511 if (result != null && stack != null) {
512 if (mode < 0)
513 return this;
514 else
515 postComplete();
516 }
517 return null;
518 }
519
520 @SuppressWarnings("serial")
521 static final class UniApply<T,V> extends UniCompletion<T,V> {
522 Function<? super T,? extends V> fn;
523 UniApply(Executor executor, CompletableFuture<V> dep,
524 CompletableFuture<T> src,
525 Function<? super T,? extends V> fn) {
526 super(executor, dep, src); this.fn = fn;
527 }
528 final CompletableFuture<V> tryFire(int mode) {
529 CompletableFuture<V> d; CompletableFuture<T> a;
530 if ((d = dep) == null ||
531 !d.uniApply(a = src, fn, mode > 0 ? null : this))
532 return null;
533 dep = null; src = null; fn = null;
534 return d.postFire(a, mode);
535 }
536 }
537
538 final <S> boolean uniApply(CompletableFuture<S> a,
539 Function<? super S,? extends T> f,
540 UniApply<S,T> c) {
541 Object r; Throwable x;
542 if (a == null || (r = a.result) == null || f == null)
543 return false;
544 tryComplete: if (result == null) {
545 if (r instanceof AltResult) {
546 if ((x = ((AltResult)r).ex) != null) {
547 completeThrowable(x, r);
548 break tryComplete;
549 }
550 r = null;
551 }
552 try {
553 if (c != null && !c.claim())
554 return false;
555 @SuppressWarnings("unchecked") S s = (S) r;
556 completeValue(f.apply(s));
557 } catch (Throwable ex) {
558 completeThrowable(ex);
559 }
560 }
561 return true;
562 }
563
564 private <V> CompletableFuture<V> uniApplyStage(
565 Executor e, Function<? super T,? extends V> f) {
566 if (f == null) throw new NullPointerException();
567 CompletableFuture<V> d = new CompletableFuture<V>();
568 if (e != null || !d.uniApply(this, f, null)) {
569 UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
570 push(c);
571 c.tryFire(SYNC);
572 }
573 return d;
574 }
575
576 @SuppressWarnings("serial")
577 static final class UniAccept<T> extends UniCompletion<T,Void> {
578 Consumer<? super T> fn;
579 UniAccept(Executor executor, CompletableFuture<Void> dep,
580 CompletableFuture<T> src, Consumer<? super T> fn) {
581 super(executor, dep, src); this.fn = fn;
582 }
583 final CompletableFuture<Void> tryFire(int mode) {
584 CompletableFuture<Void> d; CompletableFuture<T> a;
585 if ((d = dep) == null ||
586 !d.uniAccept(a = src, fn, mode > 0 ? null : this))
587 return null;
588 dep = null; src = null; fn = null;
589 return d.postFire(a, mode);
590 }
591 }
592
593 final <S> boolean uniAccept(CompletableFuture<S> a,
594 Consumer<? super S> f, UniAccept<S> c) {
595 Object r; Throwable x;
596 if (a == null || (r = a.result) == null || f == null)
597 return false;
598 tryComplete: if (result == null) {
599 if (r instanceof AltResult) {
600 if ((x = ((AltResult)r).ex) != null) {
601 completeThrowable(x, r);
602 break tryComplete;
603 }
604 r = null;
605 }
606 try {
607 if (c != null && !c.claim())
608 return false;
609 @SuppressWarnings("unchecked") S s = (S) r;
610 f.accept(s);
611 completeNull();
612 } catch (Throwable ex) {
613 completeThrowable(ex);
614 }
615 }
616 return true;
617 }
618
619 private CompletableFuture<Void> uniAcceptStage(Executor e,
620 Consumer<? super T> f) {
621 if (f == null) throw new NullPointerException();
622 CompletableFuture<Void> d = new CompletableFuture<Void>();
623 if (e != null || !d.uniAccept(this, f, null)) {
624 UniAccept<T> c = new UniAccept<T>(e, d, this, f);
625 push(c);
626 c.tryFire(SYNC);
627 }
628 return d;
629 }
630
631 @SuppressWarnings("serial")
632 static final class UniRun<T> extends UniCompletion<T,Void> {
633 Runnable fn;
634 UniRun(Executor executor, CompletableFuture<Void> dep,
635 CompletableFuture<T> src, Runnable fn) {
636 super(executor, dep, src); this.fn = fn;
637 }
638 final CompletableFuture<Void> tryFire(int mode) {
639 CompletableFuture<Void> d; CompletableFuture<T> a;
640 if ((d = dep) == null ||
641 !d.uniRun(a = src, fn, mode > 0 ? null : this))
642 return null;
643 dep = null; src = null; fn = null;
644 return d.postFire(a, mode);
645 }
646 }
647
648 final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
649 Object r; Throwable x;
650 if (a == null || (r = a.result) == null || f == null)
651 return false;
652 if (result == null) {
653 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
654 completeThrowable(x, r);
655 else
656 try {
657 if (c != null && !c.claim())
658 return false;
659 f.run();
660 completeNull();
661 } catch (Throwable ex) {
662 completeThrowable(ex);
663 }
664 }
665 return true;
666 }
667
668 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
669 if (f == null) throw new NullPointerException();
670 CompletableFuture<Void> d = new CompletableFuture<Void>();
671 if (e != null || !d.uniRun(this, f, null)) {
672 UniRun<T> c = new UniRun<T>(e, d, this, f);
673 push(c);
674 c.tryFire(SYNC);
675 }
676 return d;
677 }
678
679 @SuppressWarnings("serial")
680 static final class UniWhenComplete<T> extends UniCompletion<T,T> {
681 BiConsumer<? super T, ? super Throwable> fn;
682 UniWhenComplete(Executor executor, CompletableFuture<T> dep,
683 CompletableFuture<T> src,
684 BiConsumer<? super T, ? super Throwable> fn) {
685 super(executor, dep, src); this.fn = fn;
686 }
687 final CompletableFuture<T> tryFire(int mode) {
688 CompletableFuture<T> d; CompletableFuture<T> a;
689 if ((d = dep) == null ||
690 !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this))
691 return null;
692 dep = null; src = null; fn = null;
693 return d.postFire(a, mode);
694 }
695 }
696
697 final boolean uniWhenComplete(CompletableFuture<T> a,
698 BiConsumer<? super T,? super Throwable> f,
699 UniWhenComplete<T> c) {
700 Object r; T t; Throwable x = null;
701 if (a == null || (r = a.result) == null || f == null)
702 return false;
703 if (result == null) {
704 try {
705 if (c != null && !c.claim())
706 return false;
707 if (r instanceof AltResult) {
708 x = ((AltResult)r).ex;
709 t = null;
710 } else {
711 @SuppressWarnings("unchecked") T tr = (T) r;
712 t = tr;
713 }
714 f.accept(t, x);
715 if (x == null) {
716 internalComplete(r);
717 return true;
718 }
719 } catch (Throwable ex) {
720 if (x == null)
721 x = ex;
722 }
723 completeThrowable(x, r);
724 }
725 return true;
726 }
727
728 private CompletableFuture<T> uniWhenCompleteStage(
729 Executor e, BiConsumer<? super T, ? super Throwable> f) {
730 if (f == null) throw new NullPointerException();
731 CompletableFuture<T> d = new CompletableFuture<T>();
732 if (e != null || !d.uniWhenComplete(this, f, null)) {
733 UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
734 push(c);
735 c.tryFire(SYNC);
736 }
737 return d;
738 }
739
740 @SuppressWarnings("serial")
741 static final class UniHandle<T,V> extends UniCompletion<T,V> {
742 BiFunction<? super T, Throwable, ? extends V> fn;
743 UniHandle(Executor executor, CompletableFuture<V> dep,
744 CompletableFuture<T> src,
745 BiFunction<? super T, Throwable, ? extends V> fn) {
746 super(executor, dep, src); this.fn = fn;
747 }
748 final CompletableFuture<V> tryFire(int mode) {
749 CompletableFuture<V> d; CompletableFuture<T> a;
750 if ((d = dep) == null ||
751 !d.uniHandle(a = src, fn, mode > 0 ? null : this))
752 return null;
753 dep = null; src = null; fn = null;
754 return d.postFire(a, mode);
755 }
756 }
757
758 final <S> boolean uniHandle(CompletableFuture<S> a,
759 BiFunction<? super S, Throwable, ? extends T> f,
760 UniHandle<S,T> c) {
761 Object r; S s; Throwable x;
762 if (a == null || (r = a.result) == null || f == null)
763 return false;
764 if (result == null) {
765 try {
766 if (c != null && !c.claim())
767 return false;
768 if (r instanceof AltResult) {
769 x = ((AltResult)r).ex;
770 s = null;
771 } else {
772 x = null;
773 @SuppressWarnings("unchecked") S ss = (S) r;
774 s = ss;
775 }
776 completeValue(f.apply(s, x));
777 } catch (Throwable ex) {
778 completeThrowable(ex);
779 }
780 }
781 return true;
782 }
783
784 private <V> CompletableFuture<V> uniHandleStage(
785 Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
786 if (f == null) throw new NullPointerException();
787 CompletableFuture<V> d = new CompletableFuture<V>();
788 if (e != null || !d.uniHandle(this, f, null)) {
789 UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f);
790 push(c);
791 c.tryFire(SYNC);
792 }
793 return d;
794 }
795
796 @SuppressWarnings("serial")
797 static final class UniExceptionally<T> extends UniCompletion<T,T> {
798 Function<? super Throwable, ? extends T> fn;
799 UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src,
800 Function<? super Throwable, ? extends T> fn) {
801 super(null, dep, src); this.fn = fn;
802 }
803 final CompletableFuture<T> tryFire(int mode) { // never ASYNC
804 // assert mode != ASYNC;
805 CompletableFuture<T> d; CompletableFuture<T> a;
806 if ((d = dep) == null || !d.uniExceptionally(a = src, fn, this))
807 return null;
808 dep = null; src = null; fn = null;
809 return d.postFire(a, mode);
810 }
811 }
812
813 final boolean uniExceptionally(CompletableFuture<T> a,
814 Function<? super Throwable, ? extends T> f,
815 UniExceptionally<T> c) {
816 Object r; Throwable x;
817 if (a == null || (r = a.result) == null || f == null)
818 return false;
819 if (result == null) {
820 try {
821 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) {
822 if (c != null && !c.claim())
823 return false;
824 completeValue(f.apply(x));
825 } else
826 internalComplete(r);
827 } catch (Throwable ex) {
828 completeThrowable(ex);
829 }
830 }
831 return true;
832 }
833
834 private CompletableFuture<T> uniExceptionallyStage(
835 Function<Throwable, ? extends T> f) {
836 if (f == null) throw new NullPointerException();
837 CompletableFuture<T> d = new CompletableFuture<T>();
838 if (!d.uniExceptionally(this, f, null)) {
839 UniExceptionally<T> c = new UniExceptionally<T>(d, this, f);
840 push(c);
841 c.tryFire(SYNC);
842 }
843 return d;
844 }
845
846 @SuppressWarnings("serial")
847 static final class UniRelay<T> extends UniCompletion<T,T> { // for Compose
848 UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) {
849 super(null, dep, src);
850 }
851 final CompletableFuture<T> tryFire(int mode) {
852 CompletableFuture<T> d; CompletableFuture<T> a;
853 if ((d = dep) == null || !d.uniRelay(a = src))
854 return null;
855 src = null; dep = null;
856 return d.postFire(a, mode);
857 }
858 }
859
860 final boolean uniRelay(CompletableFuture<T> a) {
861 Object r;
862 if (a == null || (r = a.result) == null)
863 return false;
864 if (result == null) // no need to claim
865 completeRelay(r);
866 return true;
867 }
868
869 @SuppressWarnings("serial")
870 static final class UniCompose<T,V> extends UniCompletion<T,V> {
871 Function<? super T, ? extends CompletionStage<V>> fn;
872 UniCompose(Executor executor, CompletableFuture<V> dep,
873 CompletableFuture<T> src,
874 Function<? super T, ? extends CompletionStage<V>> fn) {
875 super(executor, dep, src); this.fn = fn;
876 }
877 final CompletableFuture<V> tryFire(int mode) {
878 CompletableFuture<V> d; CompletableFuture<T> a;
879 if ((d = dep) == null ||
880 !d.uniCompose(a = src, fn, mode > 0 ? null : this))
881 return null;
882 dep = null; src = null; fn = null;
883 return d.postFire(a, mode);
884 }
885 }
886
887 final <S> boolean uniCompose(
888 CompletableFuture<S> a,
889 Function<? super S, ? extends CompletionStage<T>> f,
890 UniCompose<S,T> c) {
891 Object r; Throwable x;
892 if (a == null || (r = a.result) == null || f == null)
893 return false;
894 tryComplete: if (result == null) {
895 if (r instanceof AltResult) {
896 if ((x = ((AltResult)r).ex) != null) {
897 completeThrowable(x, r);
898 break tryComplete;
899 }
900 r = null;
901 }
902 try {
903 if (c != null && !c.claim())
904 return false;
905 @SuppressWarnings("unchecked") S s = (S) r;
906 CompletableFuture<T> g = f.apply(s).toCompletableFuture();
907 if (g.result == null || !uniRelay(g)) {
908 UniRelay<T> copy = new UniRelay<T>(this, g);
909 g.push(copy);
910 copy.tryFire(SYNC);
911 if (result == null)
912 return false;
913 }
914 } catch (Throwable ex) {
915 completeThrowable(ex);
916 }
917 }
918 return true;
919 }
920
921 private <V> CompletableFuture<V> uniComposeStage(
922 Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
923 if (f == null) throw new NullPointerException();
924 Object r; Throwable x;
925 if (e == null && (r = result) != null) {
926 // try to return function result directly
927 if (r instanceof AltResult) {
928 if ((x = ((AltResult)r).ex) != null) {
929 return new CompletableFuture<V>(encodeThrowable(x, r));
930 }
931 r = null;
932 }
933 try {
934 @SuppressWarnings("unchecked") T t = (T) r;
935 return f.apply(t).toCompletableFuture();
936 } catch (Throwable ex) {
937 return new CompletableFuture<V>(encodeThrowable(ex));
938 }
939 }
940 CompletableFuture<V> d = new CompletableFuture<V>();
941 UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
942 push(c);
943 c.tryFire(SYNC);
944 return d;
945 }
946
947 /* ------------- Two-input Completions -------------- */
948
949 /** A Completion for an action with two sources */
950 @SuppressWarnings("serial")
951 abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {
952 CompletableFuture<U> snd; // second source for action
953 BiCompletion(Executor executor, CompletableFuture<V> dep,
954 CompletableFuture<T> src, CompletableFuture<U> snd) {
955 super(executor, dep, src); this.snd = snd;
956 }
957 }
958
959 /** A Completion delegating to a BiCompletion */
960 @SuppressWarnings("serial")
961 static final class CoCompletion extends Completion {
962 BiCompletion<?,?,?> base;
963 CoCompletion(BiCompletion<?,?,?> base) { this.base = base; }
964 final CompletableFuture<?> tryFire(int mode) {
965 BiCompletion<?,?,?> c; CompletableFuture<?> d;
966 if ((c = base) == null || (d = c.tryFire(mode)) == null)
967 return null;
968 base = null; // detach
969 return d;
970 }
971 final boolean isLive() {
972 BiCompletion<?,?,?> c;
973 return (c = base) != null && c.dep != null;
974 }
975 }
976
977 /** Pushes completion to this and b unless both done. */
978 final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
979 if (c != null) {
980 Object r;
981 while ((r = result) == null && !casStack(c.next = stack, c))
982 c.next = null;
983 if (b != null && b != this && b.result == null) {
984 Completion q = (r != null) ? c : new CoCompletion(c);
985 while (b.result == null && !b.casStack(q.next = b.stack, q))
986 q.next = null;
987 }
988 }
989 }
990
991 /** Post-processing after successful BiCompletion tryFire. */
992 final CompletableFuture<T> postFire(CompletableFuture<?> a,
993 CompletableFuture<?> b, int mode) {
994 if (b != null && b.stack != null) { // clean second source
995 if (mode < 0 || b.result == null)
996 b.cleanStack();
997 else
998 b.postComplete();
999 }
1000 return postFire(a, mode);
1001 }
1002
1003 @SuppressWarnings("serial")
1004 static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {
1005 BiFunction<? super T,? super U,? extends V> fn;
1006 BiApply(Executor executor, CompletableFuture<V> dep,
1007 CompletableFuture<T> src, CompletableFuture<U> snd,
1008 BiFunction<? super T,? super U,? extends V> fn) {
1009 super(executor, dep, src, snd); this.fn = fn;
1010 }
1011 final CompletableFuture<V> tryFire(int mode) {
1012 CompletableFuture<V> d;
1013 CompletableFuture<T> a;
1014 CompletableFuture<U> b;
1015 if ((d = dep) == null ||
1016 !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this))
1017 return null;
1018 dep = null; src = null; snd = null; fn = null;
1019 return d.postFire(a, b, mode);
1020 }
1021 }
1022
1023 final <R,S> boolean biApply(CompletableFuture<R> a,
1024 CompletableFuture<S> b,
1025 BiFunction<? super R,? super S,? extends T> f,
1026 BiApply<R,S,T> c) {
1027 Object r, s; Throwable x;
1028 if (a == null || (r = a.result) == null ||
1029 b == null || (s = b.result) == null || f == null)
1030 return false;
1031 tryComplete: if (result == null) {
1032 if (r instanceof AltResult) {
1033 if ((x = ((AltResult)r).ex) != null) {
1034 completeThrowable(x, r);
1035 break tryComplete;
1036 }
1037 r = null;
1038 }
1039 if (s instanceof AltResult) {
1040 if ((x = ((AltResult)s).ex) != null) {
1041 completeThrowable(x, s);
1042 break tryComplete;
1043 }
1044 s = null;
1045 }
1046 try {
1047 if (c != null && !c.claim())
1048 return false;
1049 @SuppressWarnings("unchecked") R rr = (R) r;
1050 @SuppressWarnings("unchecked") S ss = (S) s;
1051 completeValue(f.apply(rr, ss));
1052 } catch (Throwable ex) {
1053 completeThrowable(ex);
1054 }
1055 }
1056 return true;
1057 }
1058
1059 private <U,V> CompletableFuture<V> biApplyStage(
1060 Executor e, CompletionStage<U> o,
1061 BiFunction<? super T,? super U,? extends V> f) {
1062 CompletableFuture<U> b;
1063 if (f == null || (b = o.toCompletableFuture()) == null)
1064 throw new NullPointerException();
1065 CompletableFuture<V> d = new CompletableFuture<V>();
1066 if (e != null || !d.biApply(this, b, f, null)) {
1067 BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
1068 bipush(b, c);
1069 c.tryFire(SYNC);
1070 }
1071 return d;
1072 }
1073
1074 @SuppressWarnings("serial")
1075 static final class BiAccept<T,U> extends BiCompletion<T,U,Void> {
1076 BiConsumer<? super T,? super U> fn;
1077 BiAccept(Executor executor, CompletableFuture<Void> dep,
1078 CompletableFuture<T> src, CompletableFuture<U> snd,
1079 BiConsumer<? super T,? super U> fn) {
1080 super(executor, dep, src, snd); this.fn = fn;
1081 }
1082 final CompletableFuture<Void> tryFire(int mode) {
1083 CompletableFuture<Void> d;
1084 CompletableFuture<T> a;
1085 CompletableFuture<U> b;
1086 if ((d = dep) == null ||
1087 !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this))
1088 return null;
1089 dep = null; src = null; snd = null; fn = null;
1090 return d.postFire(a, b, mode);
1091 }
1092 }
1093
1094 final <R,S> boolean biAccept(CompletableFuture<R> a,
1095 CompletableFuture<S> b,
1096 BiConsumer<? super R,? super S> f,
1097 BiAccept<R,S> c) {
1098 Object r, s; Throwable x;
1099 if (a == null || (r = a.result) == null ||
1100 b == null || (s = b.result) == null || f == null)
1101 return false;
1102 tryComplete: if (result == null) {
1103 if (r instanceof AltResult) {
1104 if ((x = ((AltResult)r).ex) != null) {
1105 completeThrowable(x, r);
1106 break tryComplete;
1107 }
1108 r = null;
1109 }
1110 if (s instanceof AltResult) {
1111 if ((x = ((AltResult)s).ex) != null) {
1112 completeThrowable(x, s);
1113 break tryComplete;
1114 }
1115 s = null;
1116 }
1117 try {
1118 if (c != null && !c.claim())
1119 return false;
1120 @SuppressWarnings("unchecked") R rr = (R) r;
1121 @SuppressWarnings("unchecked") S ss = (S) s;
1122 f.accept(rr, ss);
1123 completeNull();
1124 } catch (Throwable ex) {
1125 completeThrowable(ex);
1126 }
1127 }
1128 return true;
1129 }
1130
1131 private <U> CompletableFuture<Void> biAcceptStage(
1132 Executor e, CompletionStage<U> o,
1133 BiConsumer<? super T,? super U> f) {
1134 CompletableFuture<U> b;
1135 if (f == null || (b = o.toCompletableFuture()) == null)
1136 throw new NullPointerException();
1137 CompletableFuture<Void> d = new CompletableFuture<Void>();
1138 if (e != null || !d.biAccept(this, b, f, null)) {
1139 BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f);
1140 bipush(b, c);
1141 c.tryFire(SYNC);
1142 }
1143 return d;
1144 }
1145
1146 @SuppressWarnings("serial")
1147 static final class BiRun<T,U> extends BiCompletion<T,U,Void> {
1148 Runnable fn;
1149 BiRun(Executor executor, CompletableFuture<Void> dep,
1150 CompletableFuture<T> src,
1151 CompletableFuture<U> snd,
1152 Runnable fn) {
1153 super(executor, dep, src, snd); this.fn = fn;
1154 }
1155 final CompletableFuture<Void> tryFire(int mode) {
1156 CompletableFuture<Void> d;
1157 CompletableFuture<T> a;
1158 CompletableFuture<U> b;
1159 if ((d = dep) == null ||
1160 !d.biRun(a = src, b = snd, fn, mode > 0 ? null : this))
1161 return null;
1162 dep = null; src = null; snd = null; fn = null;
1163 return d.postFire(a, b, mode);
1164 }
1165 }
1166
1167 final boolean biRun(CompletableFuture<?> a, CompletableFuture<?> b,
1168 Runnable f, BiRun<?,?> c) {
1169 Object r, s; Throwable x;
1170 if (a == null || (r = a.result) == null ||
1171 b == null || (s = b.result) == null || f == null)
1172 return false;
1173 if (result == null) {
1174 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
1175 completeThrowable(x, r);
1176 else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
1177 completeThrowable(x, s);
1178 else
1179 try {
1180 if (c != null && !c.claim())
1181 return false;
1182 f.run();
1183 completeNull();
1184 } catch (Throwable ex) {
1185 completeThrowable(ex);
1186 }
1187 }
1188 return true;
1189 }
1190
1191 private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o,
1192 Runnable f) {
1193 CompletableFuture<?> b;
1194 if (f == null || (b = o.toCompletableFuture()) == null)
1195 throw new NullPointerException();
1196 CompletableFuture<Void> d = new CompletableFuture<Void>();
1197 if (e != null || !d.biRun(this, b, f, null)) {
1198 BiRun<T,?> c = new BiRun<>(e, d, this, b, f);
1199 bipush(b, c);
1200 c.tryFire(SYNC);
1201 }
1202 return d;
1203 }
1204
1205 @SuppressWarnings("serial")
1206 static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And
1207 BiRelay(CompletableFuture<Void> dep,
1208 CompletableFuture<T> src,
1209 CompletableFuture<U> snd) {
1210 super(null, dep, src, snd);
1211 }
1212 final CompletableFuture<Void> tryFire(int mode) {
1213 CompletableFuture<Void> d;
1214 CompletableFuture<T> a;
1215 CompletableFuture<U> b;
1216 if ((d = dep) == null || !d.biRelay(a = src, b = snd))
1217 return null;
1218 src = null; snd = null; dep = null;
1219 return d.postFire(a, b, mode);
1220 }
1221 }
1222
1223 boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
1224 Object r, s; Throwable x;
1225 if (a == null || (r = a.result) == null ||
1226 b == null || (s = b.result) == null)
1227 return false;
1228 if (result == null) {
1229 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
1230 completeThrowable(x, r);
1231 else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
1232 completeThrowable(x, s);
1233 else
1234 completeNull();
1235 }
1236 return true;
1237 }
1238
1239 /** Recursively constructs a tree of completions. */
1240 static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
1241 int lo, int hi) {
1242 CompletableFuture<Void> d = new CompletableFuture<Void>();
1243 if (lo > hi) // empty
1244 d.result = NIL;
1245 else {
1246 CompletableFuture<?> a, b;
1247 int mid = (lo + hi) >>> 1;
1248 if ((a = (lo == mid ? cfs[lo] :
1249 andTree(cfs, lo, mid))) == null ||
1250 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
1251 andTree(cfs, mid+1, hi))) == null)
1252 throw new NullPointerException();
1253 if (!d.biRelay(a, b)) {
1254 BiRelay<?,?> c = new BiRelay<>(d, a, b);
1255 a.bipush(b, c);
1256 c.tryFire(SYNC);
1257 }
1258 }
1259 return d;
1260 }
1261
1262 /* ------------- Projected (Ored) BiCompletions -------------- */
1263
1264 /** Pushes completion to this and b unless either done. */
1265 final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
1266 if (c != null) {
1267 while ((b == null || b.result == null) && result == null) {
1268 if (casStack(c.next = stack, c)) {
1269 if (b != null && b != this && b.result == null) {
1270 Completion q = new CoCompletion(c);
1271 while (result == null && b.result == null &&
1272 !b.casStack(q.next = b.stack, q))
1273 q.next = null;
1274 }
1275 break;
1276 }
1277 c.next = null;
1278 }
1279 }
1280 }
1281
1282 @SuppressWarnings("serial")
1283 static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> {
1284 Function<? super T,? extends V> fn;
1285 OrApply(Executor executor, CompletableFuture<V> dep,
1286 CompletableFuture<T> src,
1287 CompletableFuture<U> snd,
1288 Function<? super T,? extends V> fn) {
1289 super(executor, dep, src, snd); this.fn = fn;
1290 }
1291 final CompletableFuture<V> tryFire(int mode) {
1292 CompletableFuture<V> d;
1293 CompletableFuture<T> a;
1294 CompletableFuture<U> b;
1295 if ((d = dep) == null ||
1296 !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this))
1297 return null;
1298 dep = null; src = null; snd = null; fn = null;
1299 return d.postFire(a, b, mode);
1300 }
1301 }
1302
1303 final <R,S extends R> boolean orApply(CompletableFuture<R> a,
1304 CompletableFuture<S> b,
1305 Function<? super R, ? extends T> f,
1306 OrApply<R,S,T> c) {
1307 Object r; Throwable x;
1308 if (a == null || b == null ||
1309 ((r = a.result) == null && (r = b.result) == null) || f == null)
1310 return false;
1311 tryComplete: if (result == null) {
1312 try {
1313 if (c != null && !c.claim())
1314 return false;
1315 if (r instanceof AltResult) {
1316 if ((x = ((AltResult)r).ex) != null) {
1317 completeThrowable(x, r);
1318 break tryComplete;
1319 }
1320 r = null;
1321 }
1322 @SuppressWarnings("unchecked") R rr = (R) r;
1323 completeValue(f.apply(rr));
1324 } catch (Throwable ex) {
1325 completeThrowable(ex);
1326 }
1327 }
1328 return true;
1329 }
1330
1331 private <U extends T,V> CompletableFuture<V> orApplyStage(
1332 Executor e, CompletionStage<U> o,
1333 Function<? super T, ? extends V> f) {
1334 CompletableFuture<U> b;
1335 if (f == null || (b = o.toCompletableFuture()) == null)
1336 throw new NullPointerException();
1337 CompletableFuture<V> d = new CompletableFuture<V>();
1338 if (e != null || !d.orApply(this, b, f, null)) {
1339 OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f);
1340 orpush(b, c);
1341 c.tryFire(SYNC);
1342 }
1343 return d;
1344 }
1345
1346 @SuppressWarnings("serial")
1347 static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> {
1348 Consumer<? super T> fn;
1349 OrAccept(Executor executor, CompletableFuture<Void> dep,
1350 CompletableFuture<T> src,
1351 CompletableFuture<U> snd,
1352 Consumer<? super T> fn) {
1353 super(executor, dep, src, snd); this.fn = fn;
1354 }
1355 final CompletableFuture<Void> tryFire(int mode) {
1356 CompletableFuture<Void> d;
1357 CompletableFuture<T> a;
1358 CompletableFuture<U> b;
1359 if ((d = dep) == null ||
1360 !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this))
1361 return null;
1362 dep = null; src = null; snd = null; fn = null;
1363 return d.postFire(a, b, mode);
1364 }
1365 }
1366
1367 final <R,S extends R> boolean orAccept(CompletableFuture<R> a,
1368 CompletableFuture<S> b,
1369 Consumer<? super R> f,
1370 OrAccept<R,S> c) {
1371 Object r; Throwable x;
1372 if (a == null || b == null ||
1373 ((r = a.result) == null && (r = b.result) == null) || f == null)
1374 return false;
1375 tryComplete: if (result == null) {
1376 try {
1377 if (c != null && !c.claim())
1378 return false;
1379 if (r instanceof AltResult) {
1380 if ((x = ((AltResult)r).ex) != null) {
1381 completeThrowable(x, r);
1382 break tryComplete;
1383 }
1384 r = null;
1385 }
1386 @SuppressWarnings("unchecked") R rr = (R) r;
1387 f.accept(rr);
1388 completeNull();
1389 } catch (Throwable ex) {
1390 completeThrowable(ex);
1391 }
1392 }
1393 return true;
1394 }
1395
1396 private <U extends T> CompletableFuture<Void> orAcceptStage(
1397 Executor e, CompletionStage<U> o, Consumer<? super T> f) {
1398 CompletableFuture<U> b;
1399 if (f == null || (b = o.toCompletableFuture()) == null)
1400 throw new NullPointerException();
1401 CompletableFuture<Void> d = new CompletableFuture<Void>();
1402 if (e != null || !d.orAccept(this, b, f, null)) {
1403 OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f);
1404 orpush(b, c);
1405 c.tryFire(SYNC);
1406 }
1407 return d;
1408 }
1409
1410 @SuppressWarnings("serial")
1411 static final class OrRun<T,U> extends BiCompletion<T,U,Void> {
1412 Runnable fn;
1413 OrRun(Executor executor, CompletableFuture<Void> dep,
1414 CompletableFuture<T> src,
1415 CompletableFuture<U> snd,
1416 Runnable fn) {
1417 super(executor, dep, src, snd); this.fn = fn;
1418 }
1419 final CompletableFuture<Void> tryFire(int mode) {
1420 CompletableFuture<Void> d;
1421 CompletableFuture<T> a;
1422 CompletableFuture<U> b;
1423 if ((d = dep) == null ||
1424 !d.orRun(a = src, b = snd, fn, mode > 0 ? null : this))
1425 return null;
1426 dep = null; src = null; snd = null; fn = null;
1427 return d.postFire(a, b, mode);
1428 }
1429 }
1430
1431 final boolean orRun(CompletableFuture<?> a, CompletableFuture<?> b,
1432 Runnable f, OrRun<?,?> c) {
1433 Object r; Throwable x;
1434 if (a == null || b == null ||
1435 ((r = a.result) == null && (r = b.result) == null) || f == null)
1436 return false;
1437 if (result == null) {
1438 try {
1439 if (c != null && !c.claim())
1440 return false;
1441 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
1442 completeThrowable(x, r);
1443 else {
1444 f.run();
1445 completeNull();
1446 }
1447 } catch (Throwable ex) {
1448 completeThrowable(ex);
1449 }
1450 }
1451 return true;
1452 }
1453
1454 private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o,
1455 Runnable f) {
1456 CompletableFuture<?> b;
1457 if (f == null || (b = o.toCompletableFuture()) == null)
1458 throw new NullPointerException();
1459 CompletableFuture<Void> d = new CompletableFuture<Void>();
1460 if (e != null || !d.orRun(this, b, f, null)) {
1461 OrRun<T,?> c = new OrRun<>(e, d, this, b, f);
1462 orpush(b, c);
1463 c.tryFire(SYNC);
1464 }
1465 return d;
1466 }
1467
1468 @SuppressWarnings("serial")
1469 static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or
1470 OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src,
1471 CompletableFuture<U> snd) {
1472 super(null, dep, src, snd);
1473 }
1474 final CompletableFuture<Object> tryFire(int mode) {
1475 CompletableFuture<Object> d;
1476 CompletableFuture<T> a;
1477 CompletableFuture<U> b;
1478 if ((d = dep) == null || !d.orRelay(a = src, b = snd))
1479 return null;
1480 src = null; snd = null; dep = null;
1481 return d.postFire(a, b, mode);
1482 }
1483 }
1484
1485 final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
1486 Object r;
1487 if (a == null || b == null ||
1488 ((r = a.result) == null && (r = b.result) == null))
1489 return false;
1490 if (result == null)
1491 completeRelay(r);
1492 return true;
1493 }
1494
1495 /** Recursively constructs a tree of completions. */
1496 static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
1497 int lo, int hi) {
1498 CompletableFuture<Object> d = new CompletableFuture<Object>();
1499 if (lo <= hi) {
1500 CompletableFuture<?> a, b;
1501 int mid = (lo + hi) >>> 1;
1502 if ((a = (lo == mid ? cfs[lo] :
1503 orTree(cfs, lo, mid))) == null ||
1504 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
1505 orTree(cfs, mid+1, hi))) == null)
1506 throw new NullPointerException();
1507 if (!d.orRelay(a, b)) {
1508 OrRelay<?,?> c = new OrRelay<>(d, a, b);
1509 a.orpush(b, c);
1510 c.tryFire(SYNC);
1511 }
1512 }
1513 return d;
1514 }
1515
1516 /* ------------- Zero-input Async forms -------------- */
1517
1518 @SuppressWarnings("serial")
1519 static final class AsyncSupply<T> extends Completion {
1520 CompletableFuture<T> dep; Supplier<T> fn;
1521 AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
1522 this.dep = dep; this.fn = fn;
1523 }
1524
1525 final CompletableFuture<T> tryFire(int alwaysAsync) {
1526 // assert alwaysAsync == ASYNC;
1527 CompletableFuture<T> d; Supplier<T> f;
1528 if ((d = dep) != null && (f = fn) != null) {
1529 dep = null; fn = null;
1530 if (d.result == null) {
1531 try {
1532 d.completeValue(f.get());
1533 } catch (Throwable ex) {
1534 d.completeThrowable(ex);
1535 }
1536 }
1537 d.postComplete();
1538 }
1539 return d;
1540 }
1541 final boolean isLive() { return dep != null; }
1542 }
1543
1544 static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
1545 Supplier<U> f) {
1546 if (f == null) throw new NullPointerException();
1547 CompletableFuture<U> d = new CompletableFuture<U>();
1548 e.execute(new AsyncSupply<U>(d, f));
1549 return d;
1550 }
1551
1552 @SuppressWarnings("serial")
1553 static final class AsyncRun extends Completion {
1554 CompletableFuture<Void> dep; Runnable fn;
1555 AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
1556 this.dep = dep; this.fn = fn;
1557 }
1558
1559 final CompletableFuture<Void> tryFire(int alwaysAsync) {
1560 // assert alwaysAsync == ASYNC;
1561 CompletableFuture<Void> d; Runnable f;
1562 if ((d = dep) != null && (f = fn) != null) {
1563 dep = null; fn = null;
1564 if (d.result == null) {
1565 try {
1566 f.run();
1567 d.completeNull();
1568 } catch (Throwable ex) {
1569 d.completeThrowable(ex);
1570 }
1571 }
1572 d.postComplete();
1573 }
1574 return d;
1575 }
1576 final boolean isLive() { return dep != null; }
1577 }
1578
1579 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
1580 if (f == null) throw new NullPointerException();
1581 CompletableFuture<Void> d = new CompletableFuture<Void>();
1582 e.execute(new AsyncRun(d, f));
1583 return d;
1584 }
1585
1586 /* ------------- Signallers -------------- */
1587
1588 /**
1589 * Completion for recording and releasing a waiting thread. This
1590 * class implements ManagedBlocker to avoid starvation when
1591 * blocking actions pile up in ForkJoinPools.
1592 */
1593 @SuppressWarnings("serial")
1594 static final class Signaller extends Completion
1595 implements ForkJoinPool.ManagedBlocker {
1596 long nanos; // wait time if timed
1597 final long deadline; // non-zero if timed
1598 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
1599 volatile Thread thread;
1600
1601 Signaller(boolean interruptible, long nanos, long deadline) {
1602 this.thread = Thread.currentThread();
1603 this.interruptControl = interruptible ? 1 : 0;
1604 this.nanos = nanos;
1605 this.deadline = deadline;
1606 }
1607 final CompletableFuture<?> tryFire(int ignore) {
1608 Thread w; // no need to atomically claim
1609 if ((w = thread) != null) {
1610 thread = null;
1611 LockSupport.unpark(w);
1612 }
1613 return null;
1614 }
1615 public boolean isReleasable() {
1616 if (thread == null)
1617 return true;
1618 if (Thread.interrupted()) {
1619 int i = interruptControl;
1620 interruptControl = -1;
1621 if (i > 0)
1622 return true;
1623 }
1624 if (deadline != 0L &&
1625 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
1626 thread = null;
1627 return true;
1628 }
1629 return false;
1630 }
1631 public boolean block() {
1632 if (isReleasable())
1633 return true;
1634 else if (deadline == 0L)
1635 LockSupport.park(this);
1636 else if (nanos > 0L)
1637 LockSupport.parkNanos(this, nanos);
1638 return isReleasable();
1639 }
1640 final boolean isLive() { return thread != null; }
1641 }
1642
1643 /**
1644 * Returns raw result after waiting, or null if interruptible and
1645 * interrupted.
1646 */
1647 private Object waitingGet(boolean interruptible) {
1648 Signaller q = null;
1649 boolean queued = false;
1650 int spins = -1;
1651 Object r;
1652 while ((r = result) == null) {
1653 if (spins < 0)
1654 spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1655 1 << 8 : 0; // Use brief spin-wait on multiprocessors
1656 else if (spins > 0) {
1657 if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1658 --spins;
1659 }
1660 else if (q == null)
1661 q = new Signaller(interruptible, 0L, 0L);
1662 else if (!queued)
1663 queued = casStack(q.next = stack, q);
1664 else if (interruptible && q.interruptControl < 0) {
1665 q.thread = null;
1666 cleanStack();
1667 return null;
1668 }
1669 else if (q.thread != null && result == null) {
1670 try {
1671 ForkJoinPool.managedBlock(q);
1672 } catch (InterruptedException ie) {
1673 q.interruptControl = -1;
1674 }
1675 }
1676 }
1677 if (q != null) {
1678 q.thread = null;
1679 if (q.interruptControl < 0) {
1680 if (interruptible)
1681 r = null; // report interruption
1682 else
1683 Thread.currentThread().interrupt();
1684 }
1685 }
1686 postComplete();
1687 return r;
1688 }
1689
1690 /**
1691 * Returns raw result after waiting, or null if interrupted, or
1692 * throws TimeoutException on timeout.
1693 */
1694 private Object timedGet(long nanos) throws TimeoutException {
1695 if (Thread.interrupted())
1696 return null;
1697 if (nanos <= 0L)
1698 throw new TimeoutException();
1699 long d = System.nanoTime() + nanos;
1700 Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
1701 boolean queued = false;
1702 Object r;
1703 while ((r = result) == null) {
1704 if (!queued)
1705 queued = casStack(q.next = stack, q);
1706 else if (q.interruptControl < 0 || q.nanos <= 0L) {
1707 q.thread = null;
1708 cleanStack();
1709 if (q.interruptControl < 0)
1710 return null;
1711 throw new TimeoutException();
1712 }
1713 else if (q.thread != null && result == null) {
1714 try {
1715 ForkJoinPool.managedBlock(q);
1716 } catch (InterruptedException ie) {
1717 q.interruptControl = -1;
1718 }
1719 }
1720 }
1721 if (q.interruptControl < 0)
1722 r = null;
1723 q.thread = null;
1724 postComplete();
1725 return r;
1726 }
1727
1728 /* ------------- public methods -------------- */
1729
1730 /**
1731 * Creates a new incomplete CompletableFuture.
1732 */
1733 public CompletableFuture() {
1734 }
1735
1736 /**
1737 * Creates a new complete CompletableFuture with given encoded result.
1738 */
1739 private CompletableFuture(Object r) {
1740 this.result = r;
1741 }
1742
1743 /**
1744 * Returns a new CompletableFuture that is asynchronously completed
1745 * by a task running in the {@link ForkJoinPool#commonPool()} with
1746 * the value obtained by calling the given Supplier.
1747 *
1748 * @param supplier a function returning the value to be used
1749 * to complete the returned CompletableFuture
1750 * @param <U> the function's return type
1751 * @return the new CompletableFuture
1752 */
1753 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1754 return asyncSupplyStage(asyncPool, supplier);
1755 }
1756
1757 /**
1758 * Returns a new CompletableFuture that is asynchronously completed
1759 * by a task running in the given executor with the value obtained
1760 * by calling the given Supplier.
1761 *
1762 * @param supplier a function returning the value to be used
1763 * to complete the returned CompletableFuture
1764 * @param executor the executor to use for asynchronous execution
1765 * @param <U> the function's return type
1766 * @return the new CompletableFuture
1767 */
1768 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1769 Executor executor) {
1770 return asyncSupplyStage(screenExecutor(executor), supplier);
1771 }
1772
1773 /**
1774 * Returns a new CompletableFuture that is asynchronously completed
1775 * by a task running in the {@link ForkJoinPool#commonPool()} after
1776 * it runs the given action.
1777 *
1778 * @param runnable the action to run before completing the
1779 * returned CompletableFuture
1780 * @return the new CompletableFuture
1781 */
1782 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1783 return asyncRunStage(asyncPool, runnable);
1784 }
1785
1786 /**
1787 * Returns a new CompletableFuture that is asynchronously completed
1788 * by a task running in the given executor after it runs the given
1789 * action.
1790 *
1791 * @param runnable the action to run before completing the
1792 * returned CompletableFuture
1793 * @param executor the executor to use for asynchronous execution
1794 * @return the new CompletableFuture
1795 */
1796 public static CompletableFuture<Void> runAsync(Runnable runnable,
1797 Executor executor) {
1798 return asyncRunStage(screenExecutor(executor), runnable);
1799 }
1800
1801 /**
1802 * Returns a new CompletableFuture that is already completed with
1803 * the given value.
1804 *
1805 * @param value the value
1806 * @param <U> the type of the value
1807 * @return the completed CompletableFuture
1808 */
1809 public static <U> CompletableFuture<U> completedFuture(U value) {
1810 return new CompletableFuture<U>((value == null) ? NIL : value);
1811 }
1812
1813 /**
1814 * Returns {@code true} if completed in any fashion: normally,
1815 * exceptionally, or via cancellation.
1816 *
1817 * @return {@code true} if completed
1818 */
1819 public boolean isDone() {
1820 return result != null;
1821 }
1822
1823 /**
1824 * Waits if necessary for this future to complete, and then
1825 * returns its result.
1826 *
1827 * @return the result value
1828 * @throws CancellationException if this future was cancelled
1829 * @throws ExecutionException if this future completed exceptionally
1830 * @throws InterruptedException if the current thread was interrupted
1831 * while waiting
1832 */
1833 public T get() throws InterruptedException, ExecutionException {
1834 Object r;
1835 return reportGet((r = result) == null ? waitingGet(true) : r);
1836 }
1837
1838 /**
1839 * Waits if necessary for at most the given time for this future
1840 * to complete, and then returns its result, if available.
1841 *
1842 * @param timeout the maximum time to wait
1843 * @param unit the time unit of the timeout argument
1844 * @return the result value
1845 * @throws CancellationException if this future was cancelled
1846 * @throws ExecutionException if this future completed exceptionally
1847 * @throws InterruptedException if the current thread was interrupted
1848 * while waiting
1849 * @throws TimeoutException if the wait timed out
1850 */
1851 public T get(long timeout, TimeUnit unit)
1852 throws InterruptedException, ExecutionException, TimeoutException {
1853 Object r;
1854 long nanos = unit.toNanos(timeout);
1855 return reportGet((r = result) == null ? timedGet(nanos) : r);
1856 }
1857
1858 /**
1859 * Returns the result value when complete, or throws an
1860 * (unchecked) exception if completed exceptionally. To better
1861 * conform with the use of common functional forms, if a
1862 * computation involved in the completion of this
1863 * CompletableFuture threw an exception, this method throws an
1864 * (unchecked) {@link CompletionException} with the underlying
1865 * exception as its cause.
1866 *
1867 * @return the result value
1868 * @throws CancellationException if the computation was cancelled
1869 * @throws CompletionException if this future completed
1870 * exceptionally or a completion computation threw an exception
1871 */
1872 public T join() {
1873 Object r;
1874 return reportJoin((r = result) == null ? waitingGet(false) : r);
1875 }
1876
1877 /**
1878 * Returns the result value (or throws any encountered exception)
1879 * if completed, else returns the given valueIfAbsent.
1880 *
1881 * @param valueIfAbsent the value to return if not completed
1882 * @return the result value, if completed, else the given valueIfAbsent
1883 * @throws CancellationException if the computation was cancelled
1884 * @throws CompletionException if this future completed
1885 * exceptionally or a completion computation threw an exception
1886 */
1887 public T getNow(T valueIfAbsent) {
1888 Object r;
1889 return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
1890 }
1891
1892 /**
1893 * If not already completed, sets the value returned by {@link
1894 * #get()} and related methods to the given value.
1895 *
1896 * @param value the result value
1897 * @return {@code true} if this invocation caused this CompletableFuture
1898 * to transition to a completed state, else {@code false}
1899 */
1900 public boolean complete(T value) {
1901 boolean triggered = completeValue(value);
1902 postComplete();
1903 return triggered;
1904 }
1905
1906 /**
1907 * If not already completed, causes invocations of {@link #get()}
1908 * and related methods to throw the given exception.
1909 *
1910 * @param ex the exception
1911 * @return {@code true} if this invocation caused this CompletableFuture
1912 * to transition to a completed state, else {@code false}
1913 */
1914 public boolean completeExceptionally(Throwable ex) {
1915 if (ex == null) throw new NullPointerException();
1916 boolean triggered = internalComplete(new AltResult(ex));
1917 postComplete();
1918 return triggered;
1919 }
1920
1921 public <U> CompletableFuture<U> thenApply(
1922 Function<? super T,? extends U> fn) {
1923 return uniApplyStage(null, fn);
1924 }
1925
1926 public <U> CompletableFuture<U> thenApplyAsync(
1927 Function<? super T,? extends U> fn) {
1928 return uniApplyStage(asyncPool, fn);
1929 }
1930
1931 public <U> CompletableFuture<U> thenApplyAsync(
1932 Function<? super T,? extends U> fn, Executor executor) {
1933 return uniApplyStage(screenExecutor(executor), fn);
1934 }
1935
1936 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
1937 return uniAcceptStage(null, action);
1938 }
1939
1940 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
1941 return uniAcceptStage(asyncPool, action);
1942 }
1943
1944 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
1945 Executor executor) {
1946 return uniAcceptStage(screenExecutor(executor), action);
1947 }
1948
1949 public CompletableFuture<Void> thenRun(Runnable action) {
1950 return uniRunStage(null, action);
1951 }
1952
1953 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1954 return uniRunStage(asyncPool, action);
1955 }
1956
1957 public CompletableFuture<Void> thenRunAsync(Runnable action,
1958 Executor executor) {
1959 return uniRunStage(screenExecutor(executor), action);
1960 }
1961
1962 public <U,V> CompletableFuture<V> thenCombine(
1963 CompletionStage<? extends U> other,
1964 BiFunction<? super T,? super U,? extends V> fn) {
1965 return biApplyStage(null, other, fn);
1966 }
1967
1968 public <U,V> CompletableFuture<V> thenCombineAsync(
1969 CompletionStage<? extends U> other,
1970 BiFunction<? super T,? super U,? extends V> fn) {
1971 return biApplyStage(asyncPool, other, fn);
1972 }
1973
1974 public <U,V> CompletableFuture<V> thenCombineAsync(
1975 CompletionStage<? extends U> other,
1976 BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
1977 return biApplyStage(screenExecutor(executor), other, fn);
1978 }
1979
1980 public <U> CompletableFuture<Void> thenAcceptBoth(
1981 CompletionStage<? extends U> other,
1982 BiConsumer<? super T, ? super U> action) {
1983 return biAcceptStage(null, other, action);
1984 }
1985
1986 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1987 CompletionStage<? extends U> other,
1988 BiConsumer<? super T, ? super U> action) {
1989 return biAcceptStage(asyncPool, other, action);
1990 }
1991
1992 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1993 CompletionStage<? extends U> other,
1994 BiConsumer<? super T, ? super U> action, Executor executor) {
1995 return biAcceptStage(screenExecutor(executor), other, action);
1996 }
1997
1998 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
1999 Runnable action) {
2000 return biRunStage(null, other, action);
2001 }
2002
2003 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
2004 Runnable action) {
2005 return biRunStage(asyncPool, other, action);
2006 }
2007
2008 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
2009 Runnable action,
2010 Executor executor) {
2011 return biRunStage(screenExecutor(executor), other, action);
2012 }
2013
2014 public <U> CompletableFuture<U> applyToEither(
2015 CompletionStage<? extends T> other, Function<? super T, U> fn) {
2016 return orApplyStage(null, other, fn);
2017 }
2018
2019 public <U> CompletableFuture<U> applyToEitherAsync(
2020 CompletionStage<? extends T> other, Function<? super T, U> fn) {
2021 return orApplyStage(asyncPool, other, fn);
2022 }
2023
2024 public <U> CompletableFuture<U> applyToEitherAsync(
2025 CompletionStage<? extends T> other, Function<? super T, U> fn,
2026 Executor executor) {
2027 return orApplyStage(screenExecutor(executor), other, fn);
2028 }
2029
2030 public CompletableFuture<Void> acceptEither(
2031 CompletionStage<? extends T> other, Consumer<? super T> action) {
2032 return orAcceptStage(null, other, action);
2033 }
2034
2035 public CompletableFuture<Void> acceptEitherAsync(
2036 CompletionStage<? extends T> other, Consumer<? super T> action) {
2037 return orAcceptStage(asyncPool, other, action);
2038 }
2039
2040 public CompletableFuture<Void> acceptEitherAsync(
2041 CompletionStage<? extends T> other, Consumer<? super T> action,
2042 Executor executor) {
2043 return orAcceptStage(screenExecutor(executor), other, action);
2044 }
2045
2046 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2047 Runnable action) {
2048 return orRunStage(null, other, action);
2049 }
2050
2051 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2052 Runnable action) {
2053 return orRunStage(asyncPool, other, action);
2054 }
2055
2056 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2057 Runnable action,
2058 Executor executor) {
2059 return orRunStage(screenExecutor(executor), other, action);
2060 }
2061
2062 public <U> CompletableFuture<U> thenCompose(
2063 Function<? super T, ? extends CompletionStage<U>> fn) {
2064 return uniComposeStage(null, fn);
2065 }
2066
2067 public <U> CompletableFuture<U> thenComposeAsync(
2068 Function<? super T, ? extends CompletionStage<U>> fn) {
2069 return uniComposeStage(asyncPool, fn);
2070 }
2071
2072 public <U> CompletableFuture<U> thenComposeAsync(
2073 Function<? super T, ? extends CompletionStage<U>> fn,
2074 Executor executor) {
2075 return uniComposeStage(screenExecutor(executor), fn);
2076 }
2077
2078 public CompletableFuture<T> whenComplete(
2079 BiConsumer<? super T, ? super Throwable> action) {
2080 return uniWhenCompleteStage(null, action);
2081 }
2082
2083 public CompletableFuture<T> whenCompleteAsync(
2084 BiConsumer<? super T, ? super Throwable> action) {
2085 return uniWhenCompleteStage(asyncPool, action);
2086 }
2087
2088 public CompletableFuture<T> whenCompleteAsync(
2089 BiConsumer<? super T, ? super Throwable> action, Executor executor) {
2090 return uniWhenCompleteStage(screenExecutor(executor), action);
2091 }
2092
2093 public <U> CompletableFuture<U> handle(
2094 BiFunction<? super T, Throwable, ? extends U> fn) {
2095 return uniHandleStage(null, fn);
2096 }
2097
2098 public <U> CompletableFuture<U> handleAsync(
2099 BiFunction<? super T, Throwable, ? extends U> fn) {
2100 return uniHandleStage(asyncPool, fn);
2101 }
2102
2103 public <U> CompletableFuture<U> handleAsync(
2104 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
2105 return uniHandleStage(screenExecutor(executor), fn);
2106 }
2107
2108 /**
2109 * Returns this CompletableFuture.
2110 *
2111 * @return this CompletableFuture
2112 */
2113 public CompletableFuture<T> toCompletableFuture() {
2114 return this;
2115 }
2116
2117 // not in interface CompletionStage
2118
2119 /**
2120 * Returns a new CompletableFuture that is completed when this
2121 * CompletableFuture completes, with the result of the given
2122 * function of the exception triggering this CompletableFuture's
2123 * completion when it completes exceptionally; otherwise, if this
2124 * CompletableFuture completes normally, then the returned
2125 * CompletableFuture also completes normally with the same value.
2126 * Note: More flexible versions of this functionality are
2127 * available using methods {@code whenComplete} and {@code handle}.
2128 *
2129 * @param fn the function to use to compute the value of the
2130 * returned CompletableFuture if this CompletableFuture completed
2131 * exceptionally
2132 * @return the new CompletableFuture
2133 */
2134 public CompletableFuture<T> exceptionally(
2135 Function<Throwable, ? extends T> fn) {
2136 return uniExceptionallyStage(fn);
2137 }
2138
2139 /* ------------- Arbitrary-arity constructions -------------- */
2140
2141 /**
2142 * Returns a new CompletableFuture that is completed when all of
2143 * the given CompletableFutures complete. If any of the given
2144 * CompletableFutures complete exceptionally, then the returned
2145 * CompletableFuture also does so, with a CompletionException
2146 * holding this exception as its cause. Otherwise, the results,
2147 * if any, of the given CompletableFutures are not reflected in
2148 * the returned CompletableFuture, but may be obtained by
2149 * inspecting them individually. If no CompletableFutures are
2150 * provided, returns a CompletableFuture completed with the value
2151 * {@code null}.
2152 *
2153 * <p>Among the applications of this method is to await completion
2154 * of a set of independent CompletableFutures before continuing a
2155 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2156 * c3).join();}.
2157 *
2158 * @param cfs the CompletableFutures
2159 * @return a new CompletableFuture that is completed when all of the
2160 * given CompletableFutures complete
2161 * @throws NullPointerException if the array or any of its elements are
2162 * {@code null}
2163 */
2164 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2165 return andTree(cfs, 0, cfs.length - 1);
2166 }
2167
2168 /**
2169 * Returns a new CompletableFuture that is completed when any of
2170 * the given CompletableFutures complete, with the same result.
2171 * Otherwise, if it completed exceptionally, the returned
2172 * CompletableFuture also does so, with a CompletionException
2173 * holding this exception as its cause. If no CompletableFutures
2174 * are provided, returns an incomplete CompletableFuture.
2175 *
2176 * @param cfs the CompletableFutures
2177 * @return a new CompletableFuture that is completed with the
2178 * result or exception of any of the given CompletableFutures when
2179 * one completes
2180 * @throws NullPointerException if the array or any of its elements are
2181 * {@code null}
2182 */
2183 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2184 return orTree(cfs, 0, cfs.length - 1);
2185 }
2186
2187 /* ------------- Control and status methods -------------- */
2188
2189 /**
2190 * If not already completed, completes this CompletableFuture with
2191 * a {@link CancellationException}. Dependent CompletableFutures
2192 * that have not already completed will also complete
2193 * exceptionally, with a {@link CompletionException} caused by
2194 * this {@code CancellationException}.
2195 *
2196 * @param mayInterruptIfRunning this value has no effect in this
2197 * implementation because interrupts are not used to control
2198 * processing.
2199 *
2200 * @return {@code true} if this task is now cancelled
2201 */
2202 public boolean cancel(boolean mayInterruptIfRunning) {
2203 boolean cancelled = (result == null) &&
2204 internalComplete(new AltResult(new CancellationException()));
2205 postComplete();
2206 return cancelled || isCancelled();
2207 }
2208
2209 /**
2210 * Returns {@code true} if this CompletableFuture was cancelled
2211 * before it completed normally.
2212 *
2213 * @return {@code true} if this CompletableFuture was cancelled
2214 * before it completed normally
2215 */
2216 public boolean isCancelled() {
2217 Object r;
2218 return ((r = result) instanceof AltResult) &&
2219 (((AltResult)r).ex instanceof CancellationException);
2220 }
2221
2222 /**
2223 * Returns {@code true} if this CompletableFuture completed
2224 * exceptionally, in any way. Possible causes include
2225 * cancellation, explicit invocation of {@code
2226 * completeExceptionally}, and abrupt termination of a
2227 * CompletionStage action.
2228 *
2229 * @return {@code true} if this CompletableFuture completed
2230 * exceptionally
2231 */
2232 public boolean isCompletedExceptionally() {
2233 Object r;
2234 return ((r = result) instanceof AltResult) && r != NIL;
2235 }
2236
2237 /**
2238 * Forcibly sets or resets the value subsequently returned by
2239 * method {@link #get()} and related methods, whether or not
2240 * already completed. This method is designed for use only in
2241 * error recovery actions, and even in such situations may result
2242 * in ongoing dependent completions using established versus
2243 * overwritten outcomes.
2244 *
2245 * @param value the completion value
2246 */
2247 public void obtrudeValue(T value) {
2248 result = (value == null) ? NIL : value;
2249 postComplete();
2250 }
2251
2252 /**
2253 * Forcibly causes subsequent invocations of method {@link #get()}
2254 * and related methods to throw the given exception, whether or
2255 * not already completed. This method is designed for use only in
2256 * error recovery actions, and even in such situations may result
2257 * in ongoing dependent completions using established versus
2258 * overwritten outcomes.
2259 *
2260 * @param ex the exception
2261 * @throws NullPointerException if the exception is null
2262 */
2263 public void obtrudeException(Throwable ex) {
2264 if (ex == null) throw new NullPointerException();
2265 result = new AltResult(ex);
2266 postComplete();
2267 }
2268
2269 /**
2270 * Returns the estimated number of CompletableFutures whose
2271 * completions are awaiting completion of this CompletableFuture.
2272 * This method is designed for use in monitoring system state, not
2273 * for synchronization control.
2274 *
2275 * @return the number of dependent CompletableFutures
2276 */
2277 public int getNumberOfDependents() {
2278 int count = 0;
2279 for (Completion p = stack; p != null; p = p.next)
2280 ++count;
2281 return count;
2282 }
2283
2284 /**
2285 * Returns a string identifying this CompletableFuture, as well as
2286 * its completion state. The state, in brackets, contains the
2287 * String {@code "Completed Normally"} or the String {@code
2288 * "Completed Exceptionally"}, or the String {@code "Not
2289 * completed"} followed by the number of CompletableFutures
2290 * dependent upon its completion, if any.
2291 *
2292 * @return a string identifying this CompletableFuture, as well as its state
2293 */
2294 public String toString() {
2295 Object r = result;
2296 int count;
2297 return super.toString() +
2298 ((r == null) ?
2299 (((count = getNumberOfDependents()) == 0) ?
2300 "[Not completed]" :
2301 "[Not completed, " + count + " dependents]") :
2302 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2303 "[Completed exceptionally]" :
2304 "[Completed normally]"));
2305 }
2306
2307 // Unsafe mechanics
2308 private static final sun.misc.Unsafe UNSAFE;
2309 private static final long RESULT;
2310 private static final long STACK;
2311 static {
2312 try {
2313 UNSAFE = sun.misc.Unsafe.getUnsafe();
2314 Class<?> k = CompletableFuture.class;
2315 RESULT = UNSAFE.objectFieldOffset
2316 (k.getDeclaredField("result"));
2317 STACK = UNSAFE.objectFieldOffset
2318 (k.getDeclaredField("stack"));
2319 } catch (Exception x) {
2320 throw new Error(x);
2321 }
2322 }
2323 }