ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.205
Committed: Sun Jul 3 15:15:33 2016 UTC (7 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.204: +24 -14 lines
Log Message:
cleanStack: no need to re-read stack after successful stack cas; unlinked should not become false after being true

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.invoke.MethodHandles;
10 import java.lang.invoke.VarHandle;
11 import java.util.concurrent.locks.LockSupport;
12 import java.util.function.BiConsumer;
13 import java.util.function.BiFunction;
14 import java.util.function.Consumer;
15 import java.util.function.Function;
16 import java.util.function.Supplier;
17
18 /**
19 * A {@link Future} that may be explicitly completed (setting its
20 * value and status), and may be used as a {@link CompletionStage},
21 * supporting dependent functions and actions that trigger upon its
22 * completion.
23 *
24 * <p>When two or more threads attempt to
25 * {@link #complete complete},
26 * {@link #completeExceptionally completeExceptionally}, or
27 * {@link #cancel cancel}
28 * a CompletableFuture, only one of them succeeds.
29 *
30 * <p>In addition to these and related methods for directly
31 * manipulating status and results, CompletableFuture implements
32 * interface {@link CompletionStage} with the following policies: <ul>
33 *
34 * <li>Actions supplied for dependent completions of
35 * <em>non-async</em> methods may be performed by the thread that
36 * completes the current CompletableFuture, or by any other caller of
37 * a completion method.
38 *
39 * <li>All <em>async</em> methods without an explicit Executor
40 * argument are performed using the {@link ForkJoinPool#commonPool()}
41 * (unless it does not support a parallelism level of at least two, in
42 * which case, a new Thread is created to run each task). This may be
43 * overridden for non-static methods in subclasses by defining method
44 * {@link #defaultExecutor()}. To simplify monitoring, debugging,
45 * and tracking, all generated asynchronous tasks are instances of the
46 * marker interface {@link AsynchronousCompletionTask}. Operations
47 * with time-delays can use adapter methods defined in this class, for
48 * example: {@code supplyAsync(supplier, delayedExecutor(timeout,
49 * timeUnit))}. To support methods with delays and timeouts, this
50 * class maintains at most one daemon thread for triggering and
51 * cancelling actions, not for running them.
52 *
53 * <li>All CompletionStage methods are implemented independently of
54 * other public methods, so the behavior of one method is not impacted
55 * by overrides of others in subclasses.
56 *
57 * <li>All CompletionStage methods return CompletableFutures. To
58 * restrict usages to only those methods defined in interface
59 * CompletionStage, use method {@link #minimalCompletionStage}. Or to
60 * ensure only that clients do not themselves modify a future, use
61 * method {@link #copy}.
62 * </ul>
63 *
64 * <p>CompletableFuture also implements {@link Future} with the following
65 * policies: <ul>
66 *
67 * <li>Since (unlike {@link FutureTask}) this class has no direct
68 * control over the computation that causes it to be completed,
69 * cancellation is treated as just another form of exceptional
70 * completion. Method {@link #cancel cancel} has the same effect as
71 * {@code completeExceptionally(new CancellationException())}. Method
72 * {@link #isCompletedExceptionally} can be used to determine if a
73 * CompletableFuture completed in any exceptional fashion.
74 *
75 * <li>In case of exceptional completion with a CompletionException,
76 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
77 * {@link ExecutionException} with the same cause as held in the
78 * corresponding CompletionException. To simplify usage in most
79 * contexts, this class also defines methods {@link #join()} and
80 * {@link #getNow} that instead throw the CompletionException directly
81 * in these cases.
82 * </ul>
83 *
84 * <p>Arguments used to pass a completion result (that is, for
85 * parameters of type {@code T}) for methods accepting them may be
86 * null, but passing a null value for any other parameter will result
87 * in a {@link NullPointerException} being thrown.
88 *
89 * <p>Subclasses of this class should normally override the "virtual
90 * constructor" method {@link #newIncompleteFuture}, which establishes
91 * the concrete type returned by CompletionStage methods. For example,
92 * here is a class that substitutes a different default Executor and
93 * disables the {@code obtrude} methods:
94 *
95 * <pre> {@code
96 * class MyCompletableFuture<T> extends CompletableFuture<T> {
97 * static final Executor myExecutor = ...;
98 * public MyCompletableFuture() { }
99 * public <U> CompletableFuture<U> newIncompleteFuture() {
100 * return new MyCompletableFuture<U>(); }
101 * public Executor defaultExecutor() {
102 * return myExecutor; }
103 * public void obtrudeValue(T value) {
104 * throw new UnsupportedOperationException(); }
105 * public void obtrudeException(Throwable ex) {
106 * throw new UnsupportedOperationException(); }
107 * }}</pre>
108 *
109 * @author Doug Lea
110 * @since 1.8
111 * @param <T> The result type returned by this future's {@code join}
112 * and {@code get} methods
113 */
114 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
115
116 /*
117 * Overview:
118 *
119 * A CompletableFuture may have dependent completion actions,
120 * collected in a linked stack. It atomically completes by CASing
121 * a result field, and then pops off and runs those actions. This
122 * applies across normal vs exceptional outcomes, sync vs async
123 * actions, binary triggers, and various forms of completions.
124 *
125 * Non-nullness of volatile field "result" indicates done. It may
126 * be set directly if known to be thread-confined, else via CAS.
127 * An AltResult is used to box null as a result, as well as to
128 * hold exceptions. Using a single field makes completion simple
129 * to detect and trigger. Result encoding and decoding is
130 * straightforward but tedious and adds to the sprawl of trapping
131 * and associating exceptions with targets. Minor simplifications
132 * rely on (static) NIL (to box null results) being the only
133 * AltResult with a null exception field, so we don't usually need
134 * explicit comparisons. Even though some of the generics casts
135 * are unchecked (see SuppressWarnings annotations), they are
136 * placed to be appropriate even if checked.
137 *
138 * Dependent actions are represented by Completion objects linked
139 * as Treiber stacks headed by field "stack". There are Completion
140 * classes for each kind of action, grouped into:
141 * - single-input (UniCompletion),
142 * - two-input (BiCompletion),
143 * - projected (BiCompletions using exactly one of two inputs),
144 * - shared (CoCompletion, used by the second of two sources),
145 * - zero-input source actions,
146 * - Signallers that unblock waiters.
147 * Class Completion extends ForkJoinTask to enable async execution
148 * (adding no space overhead because we exploit its "tag" methods
149 * to maintain claims). It is also declared as Runnable to allow
150 * usage with arbitrary executors.
151 *
152 * Support for each kind of CompletionStage relies on a separate
153 * class, along with two CompletableFuture methods:
154 *
155 * * A Completion class with name X corresponding to function,
156 * prefaced with "Uni", "Bi", or "Or". Each class contains
157 * fields for source(s), actions, and dependent. They are
158 * boringly similar, differing from others only with respect to
159 * underlying functional forms. We do this so that users don't
160 * encounter layers of adapters in common usages.
161 *
162 * * Boolean CompletableFuture method x(...) (for example
163 * biApply) takes all of the arguments needed to check that an
164 * action is triggerable, and then either runs the action or
165 * arranges its async execution by executing its Completion
166 * argument, if present. The method returns true if known to be
167 * complete.
168 *
169 * * Completion method tryFire(int mode) invokes the associated x
170 * method with its held arguments, and on success cleans up.
171 * The mode argument allows tryFire to be called twice (SYNC,
172 * then ASYNC); the first to screen and trap exceptions while
173 * arranging to execute, and the second when called from a task.
174 * (A few classes are not used async so take slightly different
175 * forms.) The claim() callback suppresses function invocation
176 * if already claimed by another thread.
177 *
178 * * Some classes (for example UniApply) have separate handling
179 * code for when known to be thread-confined ("now" methods) and
180 * for when shared (in tryFire), for efficiency.
181 *
182 * * CompletableFuture method xStage(...) is called from a public
183 * stage method of CompletableFuture f. It screens user
184 * arguments and invokes and/or creates the stage object. If
185 * not async and already triggerable, the action is run
186 * immediately. Otherwise a Completion c is created, and
187 * submitted to the executor if triggerable, or pushed onto f's
188 * stack if not. Completion actions are started via c.tryFire.
189 * We recheck after pushing to a source future's stack to cover
190 * possible races if the source completes while pushing.
191 * Classes with two inputs (for example BiApply) deal with races
192 * across both while pushing actions. The second completion is
193 * a CoCompletion pointing to the first, shared so that at most
194 * one performs the action. The multiple-arity methods allOf
195 * and anyOf do this pairwise to form trees of completions.
196 *
197 * Note that the generic type parameters of methods vary according
198 * to whether "this" is a source, dependent, or completion.
199 *
200 * Method postComplete is called upon completion unless the target
201 * is guaranteed not to be observable (i.e., not yet returned or
202 * linked). Multiple threads can call postComplete, which
203 * atomically pops each dependent action, and tries to trigger it
204 * via method tryFire, in NESTED mode. Triggering can propagate
205 * recursively, so NESTED mode returns its completed dependent (if
206 * one exists) for further processing by its caller (see method
207 * postFire).
208 *
209 * Blocking methods get() and join() rely on Signaller Completions
210 * that wake up waiting threads. The mechanics are similar to
211 * Treiber stack wait-nodes used in FutureTask, Phaser, and
212 * SynchronousQueue. See their internal documentation for
213 * algorithmic details.
214 *
215 * Without precautions, CompletableFutures would be prone to
216 * garbage accumulation as chains of Completions build up, each
217 * pointing back to its sources. So we null out fields as soon as
218 * possible. The screening checks needed anyway harmlessly ignore
219 * null arguments that may have been obtained during races with
220 * threads nulling out fields. We also try to unlink non-isLive
221 * (fired or cancelled) Completions from stacks that might
222 * otherwise never be popped: Method cleanStack always unlinks non
223 * isLive completions from the head of stack; others may
224 * occasionally remain if racing with other cancellations or
225 * removals.
226 *
227 * Completion fields need not be declared as final or volatile
228 * because they are only visible to other threads upon safe
229 * publication.
230 */
231
232 volatile Object result; // Either the result or boxed AltResult
233 volatile Completion stack; // Top of Treiber stack of dependent actions
234
235 final boolean internalComplete(Object r) { // CAS from null to r
236 return RESULT.compareAndSet(this, null, r);
237 }
238
239 /** Returns true if successfully pushed c onto stack. */
240 final boolean tryPushStack(Completion c) {
241 Completion h = stack;
242 NEXT.set(c, h); // CAS piggyback
243 return STACK.compareAndSet(this, h, c);
244 }
245
246 /** Unconditionally pushes c onto stack, retrying if necessary. */
247 final void pushStack(Completion c) {
248 do {} while (!tryPushStack(c));
249 }
250
251 /* ------------- Encoding and decoding outcomes -------------- */
252
253 static final class AltResult { // See above
254 final Throwable ex; // null only for NIL
255 AltResult(Throwable x) { this.ex = x; }
256 }
257
258 /** The encoding of the null value. */
259 static final AltResult NIL = new AltResult(null);
260
261 /** Completes with the null value, unless already completed. */
262 final boolean completeNull() {
263 return RESULT.compareAndSet(this, null, NIL);
264 }
265
266 /** Returns the encoding of the given non-exceptional value. */
267 final Object encodeValue(T t) {
268 return (t == null) ? NIL : t;
269 }
270
271 /** Completes with a non-exceptional result, unless already completed. */
272 final boolean completeValue(T t) {
273 return RESULT.compareAndSet(this, null, (t == null) ? NIL : t);
274 }
275
276 /**
277 * Returns the encoding of the given (non-null) exception as a
278 * wrapped CompletionException unless it is one already.
279 */
280 static AltResult encodeThrowable(Throwable x) {
281 return new AltResult((x instanceof CompletionException) ? x :
282 new CompletionException(x));
283 }
284
285 /** Completes with an exceptional result, unless already completed. */
286 final boolean completeThrowable(Throwable x) {
287 return RESULT.compareAndSet(this, null, encodeThrowable(x));
288 }
289
290 /**
291 * Returns the encoding of the given (non-null) exception as a
292 * wrapped CompletionException unless it is one already. May
293 * return the given Object r (which must have been the result of a
294 * source future) if it is equivalent, i.e. if this is a simple
295 * relay of an existing CompletionException.
296 */
297 static Object encodeThrowable(Throwable x, Object r) {
298 if (!(x instanceof CompletionException))
299 x = new CompletionException(x);
300 else if (r instanceof AltResult && x == ((AltResult)r).ex)
301 return r;
302 return new AltResult(x);
303 }
304
305 /**
306 * Completes with the given (non-null) exceptional result as a
307 * wrapped CompletionException unless it is one already, unless
308 * already completed. May complete with the given Object r
309 * (which must have been the result of a source future) if it is
310 * equivalent, i.e. if this is a simple propagation of an
311 * existing CompletionException.
312 */
313 final boolean completeThrowable(Throwable x, Object r) {
314 return RESULT.compareAndSet(this, null, encodeThrowable(x, r));
315 }
316
317 /**
318 * Returns the encoding of the given arguments: if the exception
319 * is non-null, encodes as AltResult. Otherwise uses the given
320 * value, boxed as NIL if null.
321 */
322 Object encodeOutcome(T t, Throwable x) {
323 return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x);
324 }
325
326 /**
327 * Returns the encoding of a copied outcome; if exceptional,
328 * rewraps as a CompletionException, else returns argument.
329 */
330 static Object encodeRelay(Object r) {
331 Throwable x;
332 if (r instanceof AltResult
333 && (x = ((AltResult)r).ex) != null
334 && !(x instanceof CompletionException))
335 r = new AltResult(new CompletionException(x));
336 return r;
337 }
338
339 /**
340 * Completes with r or a copy of r, unless already completed.
341 * If exceptional, r is first coerced to a CompletionException.
342 */
343 final boolean completeRelay(Object r) {
344 return RESULT.compareAndSet(this, null, encodeRelay(r));
345 }
346
347 /**
348 * Reports result using Future.get conventions.
349 */
350 private static Object reportGet(Object r)
351 throws InterruptedException, ExecutionException {
352 if (r == null) // by convention below, null means interrupted
353 throw new InterruptedException();
354 if (r instanceof AltResult) {
355 Throwable x, cause;
356 if ((x = ((AltResult)r).ex) == null)
357 return null;
358 if (x instanceof CancellationException)
359 throw (CancellationException)x;
360 if ((x instanceof CompletionException) &&
361 (cause = x.getCause()) != null)
362 x = cause;
363 throw new ExecutionException(x);
364 }
365 return r;
366 }
367
368 /**
369 * Decodes outcome to return result or throw unchecked exception.
370 */
371 private static Object reportJoin(Object r) {
372 if (r instanceof AltResult) {
373 Throwable x;
374 if ((x = ((AltResult)r).ex) == null)
375 return null;
376 if (x instanceof CancellationException)
377 throw (CancellationException)x;
378 if (x instanceof CompletionException)
379 throw (CompletionException)x;
380 throw new CompletionException(x);
381 }
382 return r;
383 }
384
385 /* ------------- Async task preliminaries -------------- */
386
387 /**
388 * A marker interface identifying asynchronous tasks produced by
389 * {@code async} methods. This may be useful for monitoring,
390 * debugging, and tracking asynchronous activities.
391 *
392 * @since 1.8
393 */
394 public static interface AsynchronousCompletionTask {
395 }
396
397 private static final boolean USE_COMMON_POOL =
398 (ForkJoinPool.getCommonPoolParallelism() > 1);
399
400 /**
401 * Default executor -- ForkJoinPool.commonPool() unless it cannot
402 * support parallelism.
403 */
404 private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
405 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
406
407 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
408 static final class ThreadPerTaskExecutor implements Executor {
409 public void execute(Runnable r) { new Thread(r).start(); }
410 }
411
412 /**
413 * Null-checks user executor argument, and translates uses of
414 * commonPool to ASYNC_POOL in case parallelism disabled.
415 */
416 static Executor screenExecutor(Executor e) {
417 if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool())
418 return ASYNC_POOL;
419 if (e == null) throw new NullPointerException();
420 return e;
421 }
422
423 // Modes for Completion.tryFire. Signedness matters.
424 static final int SYNC = 0;
425 static final int ASYNC = 1;
426 static final int NESTED = -1;
427
428 /* ------------- Base Completion classes and operations -------------- */
429
430 @SuppressWarnings("serial")
431 abstract static class Completion extends ForkJoinTask<Void>
432 implements Runnable, AsynchronousCompletionTask {
433 volatile Completion next; // Treiber stack link
434
435 /**
436 * Performs completion action if triggered, returning a
437 * dependent that may need propagation, if one exists.
438 *
439 * @param mode SYNC, ASYNC, or NESTED
440 */
441 abstract CompletableFuture<?> tryFire(int mode);
442
443 /** Returns true if possibly still triggerable. Used by cleanStack. */
444 abstract boolean isLive();
445
446 public final void run() { tryFire(ASYNC); }
447 public final boolean exec() { tryFire(ASYNC); return false; }
448 public final Void getRawResult() { return null; }
449 public final void setRawResult(Void v) {}
450 }
451
452 /**
453 * Pops and tries to trigger all reachable dependents. Call only
454 * when known to be done.
455 */
456 final void postComplete() {
457 /*
458 * On each step, variable f holds current dependents to pop
459 * and run. It is extended along only one path at a time,
460 * pushing others to avoid unbounded recursion.
461 */
462 CompletableFuture<?> f = this; Completion h;
463 while ((h = f.stack) != null ||
464 (f != this && (h = (f = this).stack) != null)) {
465 CompletableFuture<?> d; Completion t;
466 if (STACK.compareAndSet(f, h, t = h.next)) {
467 if (t != null) {
468 if (f != this) {
469 pushStack(h);
470 continue;
471 }
472 NEXT.compareAndSet(h, t, null); // try to detach
473 }
474 f = (d = h.tryFire(NESTED)) == null ? this : d;
475 }
476 }
477 }
478
479 /** Traverses stack and unlinks one or more dead Completions, if found. */
480 final void cleanStack() {
481 Completion p = stack;
482 // ensure head of stack live
483 for (boolean unlinked = false;;) {
484 if (p == null)
485 return;
486 else if (p.isLive()) {
487 if (unlinked)
488 return;
489 else
490 break;
491 }
492 else if (STACK.weakCompareAndSetVolatile(this, p, (p = p.next)))
493 unlinked = true;
494 else
495 p = stack;
496 }
497 // try to unlink first non-live
498 for (Completion q = p.next; q != null;) {
499 Completion s = q.next;
500 if (q.isLive()) {
501 p = q;
502 q = s;
503 } else if (NEXT.weakCompareAndSetVolatile(p, q, s))
504 break;
505 else
506 q = p.next;
507 }
508 }
509
510 /* ------------- One-input Completions -------------- */
511
512 /** A Completion with a source, dependent, and executor. */
513 @SuppressWarnings("serial")
514 abstract static class UniCompletion<T,V> extends Completion {
515 Executor executor; // executor to use (null if none)
516 CompletableFuture<V> dep; // the dependent to complete
517 CompletableFuture<T> src; // source for action
518
519 UniCompletion(Executor executor, CompletableFuture<V> dep,
520 CompletableFuture<T> src) {
521 this.executor = executor; this.dep = dep; this.src = src;
522 }
523
524 /**
525 * Returns true if action can be run. Call only when known to
526 * be triggerable. Uses FJ tag bit to ensure that only one
527 * thread claims ownership. If async, starts as task -- a
528 * later call to tryFire will run action.
529 */
530 final boolean claim() {
531 Executor e = executor;
532 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
533 if (e == null)
534 return true;
535 executor = null; // disable
536 e.execute(this);
537 }
538 return false;
539 }
540
541 final boolean isLive() { return dep != null; }
542 }
543
544 /**
545 * Pushes the given completion unless it completes while trying.
546 * Caller should first check that result is null.
547 */
548 final void unipush(Completion c) {
549 if (c != null) {
550 while (!tryPushStack(c)) {
551 if (result != null) {
552 NEXT.set(c, null);
553 break;
554 }
555 }
556 if (result != null)
557 c.tryFire(SYNC);
558 }
559 }
560
561 /**
562 * Post-processing by dependent after successful UniCompletion
563 * tryFire. Tries to clean stack of source a, and then either runs
564 * postComplete or returns this to caller, depending on mode.
565 */
566 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
567 if (a != null && a.stack != null) {
568 Object r;
569 if ((r = a.result) == null)
570 a.cleanStack();
571 if (mode >= 0 && (r != null || a.result != null))
572 a.postComplete();
573 }
574 if (result != null && stack != null) {
575 if (mode < 0)
576 return this;
577 else
578 postComplete();
579 }
580 return null;
581 }
582
583 @SuppressWarnings("serial")
584 static final class UniApply<T,V> extends UniCompletion<T,V> {
585 Function<? super T,? extends V> fn;
586 UniApply(Executor executor, CompletableFuture<V> dep,
587 CompletableFuture<T> src,
588 Function<? super T,? extends V> fn) {
589 super(executor, dep, src); this.fn = fn;
590 }
591 final CompletableFuture<V> tryFire(int mode) {
592 CompletableFuture<V> d; CompletableFuture<T> a;
593 Object r; Throwable x; Function<? super T,? extends V> f;
594 if ((d = dep) == null || (f = fn) == null
595 || (a = src) == null || (r = a.result) == null)
596 return null;
597 tryComplete: if (d.result == null) {
598 if (r instanceof AltResult) {
599 if ((x = ((AltResult)r).ex) != null) {
600 d.completeThrowable(x, r);
601 break tryComplete;
602 }
603 r = null;
604 }
605 try {
606 if (mode <= 0 && !claim())
607 return null;
608 else {
609 @SuppressWarnings("unchecked") T t = (T) r;
610 d.completeValue(f.apply(t));
611 }
612 } catch (Throwable ex) {
613 d.completeThrowable(ex);
614 }
615 }
616 dep = null; src = null; fn = null;
617 return d.postFire(a, mode);
618 }
619 }
620
621 private <V> CompletableFuture<V> uniApplyStage(
622 Executor e, Function<? super T,? extends V> f) {
623 if (f == null) throw new NullPointerException();
624 Object r;
625 if ((r = result) != null)
626 return uniApplyNow(r, e, f);
627 CompletableFuture<V> d = newIncompleteFuture();
628 unipush(new UniApply<T,V>(e, d, this, f));
629 return d;
630 }
631
632 private <V> CompletableFuture<V> uniApplyNow(
633 Object r, Executor e, Function<? super T,? extends V> f) {
634 Throwable x;
635 CompletableFuture<V> d = newIncompleteFuture();
636 if (r instanceof AltResult) {
637 if ((x = ((AltResult)r).ex) != null) {
638 d.result = encodeThrowable(x, r);
639 return d;
640 }
641 r = null;
642 }
643 try {
644 if (e != null) {
645 e.execute(new UniApply<T,V>(null, d, this, f));
646 } else {
647 @SuppressWarnings("unchecked") T t = (T) r;
648 d.result = d.encodeValue(f.apply(t));
649 }
650 } catch (Throwable ex) {
651 d.result = encodeThrowable(ex);
652 }
653 return d;
654 }
655
656 @SuppressWarnings("serial")
657 static final class UniAccept<T> extends UniCompletion<T,Void> {
658 Consumer<? super T> fn;
659 UniAccept(Executor executor, CompletableFuture<Void> dep,
660 CompletableFuture<T> src, Consumer<? super T> fn) {
661 super(executor, dep, src); this.fn = fn;
662 }
663 final CompletableFuture<Void> tryFire(int mode) {
664 CompletableFuture<Void> d; CompletableFuture<T> a;
665 Object r; Throwable x; Consumer<? super T> f;
666 if ((d = dep) == null || (f = fn) == null
667 || (a = src) == null || (r = a.result) == null)
668 return null;
669 tryComplete: if (d.result == null) {
670 if (r instanceof AltResult) {
671 if ((x = ((AltResult)r).ex) != null) {
672 d.completeThrowable(x, r);
673 break tryComplete;
674 }
675 r = null;
676 }
677 try {
678 if (mode <= 0 && !claim())
679 return null;
680 else {
681 @SuppressWarnings("unchecked") T t = (T) r;
682 f.accept(t);
683 d.completeNull();
684 }
685 } catch (Throwable ex) {
686 d.completeThrowable(ex);
687 }
688 }
689 dep = null; src = null; fn = null;
690 return d.postFire(a, mode);
691 }
692 }
693
694 private CompletableFuture<Void> uniAcceptStage(Executor e,
695 Consumer<? super T> f) {
696 if (f == null) throw new NullPointerException();
697 Object r;
698 if ((r = result) != null)
699 return uniAcceptNow(r, e, f);
700 CompletableFuture<Void> d = newIncompleteFuture();
701 unipush(new UniAccept<T>(e, d, this, f));
702 return d;
703 }
704
705 private CompletableFuture<Void> uniAcceptNow(
706 Object r, Executor e, Consumer<? super T> f) {
707 Throwable x;
708 CompletableFuture<Void> d = newIncompleteFuture();
709 if (r instanceof AltResult) {
710 if ((x = ((AltResult)r).ex) != null) {
711 d.result = encodeThrowable(x, r);
712 return d;
713 }
714 r = null;
715 }
716 try {
717 if (e != null) {
718 e.execute(new UniAccept<T>(null, d, this, f));
719 } else {
720 @SuppressWarnings("unchecked") T t = (T) r;
721 f.accept(t);
722 d.result = NIL;
723 }
724 } catch (Throwable ex) {
725 d.result = encodeThrowable(ex);
726 }
727 return d;
728 }
729
730 @SuppressWarnings("serial")
731 static final class UniRun<T> extends UniCompletion<T,Void> {
732 Runnable fn;
733 UniRun(Executor executor, CompletableFuture<Void> dep,
734 CompletableFuture<T> src, Runnable fn) {
735 super(executor, dep, src); this.fn = fn;
736 }
737 final CompletableFuture<Void> tryFire(int mode) {
738 CompletableFuture<Void> d; CompletableFuture<T> a;
739 Object r; Throwable x; Runnable f;
740 if ((d = dep) == null || (f = fn) == null
741 || (a = src) == null || (r = a.result) == null)
742 return null;
743 if (d.result == null) {
744 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
745 d.completeThrowable(x, r);
746 else
747 try {
748 if (mode <= 0 && !claim())
749 return null;
750 else {
751 f.run();
752 d.completeNull();
753 }
754 } catch (Throwable ex) {
755 d.completeThrowable(ex);
756 }
757 }
758 dep = null; src = null; fn = null;
759 return d.postFire(a, mode);
760 }
761 }
762
763 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
764 if (f == null) throw new NullPointerException();
765 Object r;
766 if ((r = result) != null)
767 return uniRunNow(r, e, f);
768 CompletableFuture<Void> d = newIncompleteFuture();
769 unipush(new UniRun<T>(e, d, this, f));
770 return d;
771 }
772
773 private CompletableFuture<Void> uniRunNow(Object r, Executor e, Runnable f) {
774 Throwable x;
775 CompletableFuture<Void> d = newIncompleteFuture();
776 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
777 d.result = encodeThrowable(x, r);
778 else
779 try {
780 if (e != null) {
781 e.execute(new UniRun<T>(null, d, this, f));
782 } else {
783 f.run();
784 d.result = NIL;
785 }
786 } catch (Throwable ex) {
787 d.result = encodeThrowable(ex);
788 }
789 return d;
790 }
791
792 @SuppressWarnings("serial")
793 static final class UniWhenComplete<T> extends UniCompletion<T,T> {
794 BiConsumer<? super T, ? super Throwable> fn;
795 UniWhenComplete(Executor executor, CompletableFuture<T> dep,
796 CompletableFuture<T> src,
797 BiConsumer<? super T, ? super Throwable> fn) {
798 super(executor, dep, src); this.fn = fn;
799 }
800 final CompletableFuture<T> tryFire(int mode) {
801 CompletableFuture<T> d; CompletableFuture<T> a;
802 Object r; BiConsumer<? super T, ? super Throwable> f;
803 if ((d = dep) == null || (f = fn) == null
804 || (a = src) == null || (r = a.result) == null
805 || !d.uniWhenComplete(r, f, mode > 0 ? null : this))
806 return null;
807 dep = null; src = null; fn = null;
808 return d.postFire(a, mode);
809 }
810 }
811
812 final boolean uniWhenComplete(Object r,
813 BiConsumer<? super T,? super Throwable> f,
814 UniWhenComplete<T> c) {
815 T t; Throwable x = null;
816 if (result == null) {
817 try {
818 if (c != null && !c.claim())
819 return false;
820 if (r instanceof AltResult) {
821 x = ((AltResult)r).ex;
822 t = null;
823 } else {
824 @SuppressWarnings("unchecked") T tr = (T) r;
825 t = tr;
826 }
827 f.accept(t, x);
828 if (x == null) {
829 internalComplete(r);
830 return true;
831 }
832 } catch (Throwable ex) {
833 if (x == null)
834 x = ex;
835 else if (x != ex)
836 x.addSuppressed(ex);
837 }
838 completeThrowable(x, r);
839 }
840 return true;
841 }
842
843 private CompletableFuture<T> uniWhenCompleteStage(
844 Executor e, BiConsumer<? super T, ? super Throwable> f) {
845 if (f == null) throw new NullPointerException();
846 CompletableFuture<T> d = newIncompleteFuture();
847 Object r;
848 if ((r = result) == null)
849 unipush(new UniWhenComplete<T>(e, d, this, f));
850 else if (e == null)
851 d.uniWhenComplete(r, f, null);
852 else {
853 try {
854 e.execute(new UniWhenComplete<T>(null, d, this, f));
855 } catch (Throwable ex) {
856 d.result = encodeThrowable(ex);
857 }
858 }
859 return d;
860 }
861
862 @SuppressWarnings("serial")
863 static final class UniHandle<T,V> extends UniCompletion<T,V> {
864 BiFunction<? super T, Throwable, ? extends V> fn;
865 UniHandle(Executor executor, CompletableFuture<V> dep,
866 CompletableFuture<T> src,
867 BiFunction<? super T, Throwable, ? extends V> fn) {
868 super(executor, dep, src); this.fn = fn;
869 }
870 final CompletableFuture<V> tryFire(int mode) {
871 CompletableFuture<V> d; CompletableFuture<T> a;
872 Object r; BiFunction<? super T, Throwable, ? extends V> f;
873 if ((d = dep) == null || (f = fn) == null
874 || (a = src) == null || (r = a.result) == null
875 || !d.uniHandle(r, f, mode > 0 ? null : this))
876 return null;
877 dep = null; src = null; fn = null;
878 return d.postFire(a, mode);
879 }
880 }
881
882 final <S> boolean uniHandle(Object r,
883 BiFunction<? super S, Throwable, ? extends T> f,
884 UniHandle<S,T> c) {
885 S s; Throwable x;
886 if (result == null) {
887 try {
888 if (c != null && !c.claim())
889 return false;
890 if (r instanceof AltResult) {
891 x = ((AltResult)r).ex;
892 s = null;
893 } else {
894 x = null;
895 @SuppressWarnings("unchecked") S ss = (S) r;
896 s = ss;
897 }
898 completeValue(f.apply(s, x));
899 } catch (Throwable ex) {
900 completeThrowable(ex);
901 }
902 }
903 return true;
904 }
905
906 private <V> CompletableFuture<V> uniHandleStage(
907 Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
908 if (f == null) throw new NullPointerException();
909 CompletableFuture<V> d = newIncompleteFuture();
910 Object r;
911 if ((r = result) == null)
912 unipush(new UniHandle<T,V>(e, d, this, f));
913 else if (e == null)
914 d.uniHandle(r, f, null);
915 else {
916 try {
917 e.execute(new UniHandle<T,V>(null, d, this, f));
918 } catch (Throwable ex) {
919 d.result = encodeThrowable(ex);
920 }
921 }
922 return d;
923 }
924
925 @SuppressWarnings("serial")
926 static final class UniExceptionally<T> extends UniCompletion<T,T> {
927 Function<? super Throwable, ? extends T> fn;
928 UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src,
929 Function<? super Throwable, ? extends T> fn) {
930 super(null, dep, src); this.fn = fn;
931 }
932 final CompletableFuture<T> tryFire(int mode) { // never ASYNC
933 // assert mode != ASYNC;
934 CompletableFuture<T> d; CompletableFuture<T> a;
935 Object r; Function<? super Throwable, ? extends T> f;
936 if ((d = dep) == null || (f = fn) == null
937 || (a = src) == null || (r = a.result) == null
938 || !d.uniExceptionally(r, f, this))
939 return null;
940 dep = null; src = null; fn = null;
941 return d.postFire(a, mode);
942 }
943 }
944
945 final boolean uniExceptionally(Object r,
946 Function<? super Throwable, ? extends T> f,
947 UniExceptionally<T> c) {
948 Throwable x;
949 if (result == null) {
950 try {
951 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) {
952 if (c != null && !c.claim())
953 return false;
954 completeValue(f.apply(x));
955 } else
956 internalComplete(r);
957 } catch (Throwable ex) {
958 completeThrowable(ex);
959 }
960 }
961 return true;
962 }
963
964 private CompletableFuture<T> uniExceptionallyStage(
965 Function<Throwable, ? extends T> f) {
966 if (f == null) throw new NullPointerException();
967 CompletableFuture<T> d = newIncompleteFuture();
968 Object r;
969 if ((r = result) == null)
970 unipush(new UniExceptionally<T>(d, this, f));
971 else
972 d.uniExceptionally(r, f, null);
973 return d;
974 }
975
976 @SuppressWarnings("serial")
977 static final class UniRelay<T> extends UniCompletion<T,T> {
978 UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) {
979 super(null, dep, src);
980 }
981 final CompletableFuture<T> tryFire(int mode) {
982 CompletableFuture<T> d; CompletableFuture<T> a; Object r;
983 if ((d = dep) == null
984 || (a = src) == null || (r = a.result) == null)
985 return null;
986 if (d.result == null)
987 d.completeRelay(r);
988 src = null; dep = null;
989 return d.postFire(a, mode);
990 }
991 }
992
993 private CompletableFuture<T> uniCopyStage() {
994 Object r;
995 CompletableFuture<T> d = newIncompleteFuture();
996 if ((r = result) != null)
997 d.result = encodeRelay(r);
998 else
999 unipush(new UniRelay<T>(d, this));
1000 return d;
1001 }
1002
1003 private MinimalStage<T> uniAsMinimalStage() {
1004 Object r;
1005 if ((r = result) != null)
1006 return new MinimalStage<T>(encodeRelay(r));
1007 MinimalStage<T> d = new MinimalStage<T>();
1008 unipush(new UniRelay<T>(d, this));
1009 return d;
1010 }
1011
1012 @SuppressWarnings("serial")
1013 static final class UniCompose<T,V> extends UniCompletion<T,V> {
1014 Function<? super T, ? extends CompletionStage<V>> fn;
1015 UniCompose(Executor executor, CompletableFuture<V> dep,
1016 CompletableFuture<T> src,
1017 Function<? super T, ? extends CompletionStage<V>> fn) {
1018 super(executor, dep, src); this.fn = fn;
1019 }
1020 final CompletableFuture<V> tryFire(int mode) {
1021 CompletableFuture<V> d; CompletableFuture<T> a;
1022 Function<? super T, ? extends CompletionStage<V>> f;
1023 Object r; Throwable x;
1024 if ((d = dep) == null || (f = fn) == null
1025 || (a = src) == null || (r = a.result) == null)
1026 return null;
1027 tryComplete: if (d.result == null) {
1028 if (r instanceof AltResult) {
1029 if ((x = ((AltResult)r).ex) != null) {
1030 d.completeThrowable(x, r);
1031 break tryComplete;
1032 }
1033 r = null;
1034 }
1035 try {
1036 if (mode <= 0 && !claim())
1037 return null;
1038 @SuppressWarnings("unchecked") T t = (T) r;
1039 CompletableFuture<V> g = f.apply(t).toCompletableFuture();
1040 if ((r = g.result) != null)
1041 d.completeRelay(r);
1042 else {
1043 g.unipush(new UniRelay<V>(d, g));
1044 if (d.result == null)
1045 return null;
1046 }
1047 } catch (Throwable ex) {
1048 d.completeThrowable(ex);
1049 }
1050 }
1051 dep = null; src = null; fn = null;
1052 return d.postFire(a, mode);
1053 }
1054 }
1055
1056 private <V> CompletableFuture<V> uniComposeStage(
1057 Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
1058 if (f == null) throw new NullPointerException();
1059 CompletableFuture<V> d = newIncompleteFuture();
1060 Object r, s; Throwable x;
1061 if ((r = result) == null)
1062 unipush(new UniCompose<T,V>(e, d, this, f));
1063 else if (e == null) {
1064 if (r instanceof AltResult) {
1065 if ((x = ((AltResult)r).ex) != null) {
1066 d.result = encodeThrowable(x, r);
1067 return d;
1068 }
1069 r = null;
1070 }
1071 try {
1072 @SuppressWarnings("unchecked") T t = (T) r;
1073 CompletableFuture<V> g = f.apply(t).toCompletableFuture();
1074 if ((s = g.result) != null)
1075 d.result = encodeRelay(s);
1076 else {
1077 g.unipush(new UniRelay<V>(d, g));
1078 }
1079 } catch (Throwable ex) {
1080 d.result = encodeThrowable(ex);
1081 }
1082 }
1083 else
1084 try {
1085 e.execute(new UniCompose<T,V>(null, d, this, f));
1086 } catch (Throwable ex) {
1087 d.result = encodeThrowable(ex);
1088 }
1089 return d;
1090 }
1091
1092 /* ------------- Two-input Completions -------------- */
1093
1094 /** A Completion for an action with two sources */
1095 @SuppressWarnings("serial")
1096 abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {
1097 CompletableFuture<U> snd; // second source for action
1098 BiCompletion(Executor executor, CompletableFuture<V> dep,
1099 CompletableFuture<T> src, CompletableFuture<U> snd) {
1100 super(executor, dep, src); this.snd = snd;
1101 }
1102 }
1103
1104 /** A Completion delegating to a BiCompletion */
1105 @SuppressWarnings("serial")
1106 static final class CoCompletion extends Completion {
1107 BiCompletion<?,?,?> base;
1108 CoCompletion(BiCompletion<?,?,?> base) { this.base = base; }
1109 final CompletableFuture<?> tryFire(int mode) {
1110 BiCompletion<?,?,?> c; CompletableFuture<?> d;
1111 if ((c = base) == null || (d = c.tryFire(mode)) == null)
1112 return null;
1113 base = null; // detach
1114 return d;
1115 }
1116 final boolean isLive() {
1117 BiCompletion<?,?,?> c;
1118 return (c = base) != null
1119 // && c.isLive()
1120 && c.dep != null;
1121 }
1122 }
1123
1124 /**
1125 * Pushes completion to this and b unless both done.
1126 * Caller should first check that either result or b.result is null.
1127 */
1128 final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
1129 if (c != null) {
1130 while (result == null) {
1131 if (tryPushStack(c)) {
1132 if (b.result == null)
1133 b.unipush(new CoCompletion(c));
1134 else if (result != null)
1135 c.tryFire(SYNC);
1136 return;
1137 }
1138 }
1139 b.unipush(c);
1140 }
1141 }
1142
1143 /** Post-processing after successful BiCompletion tryFire. */
1144 final CompletableFuture<T> postFire(CompletableFuture<?> a,
1145 CompletableFuture<?> b, int mode) {
1146 if (b != null && b.stack != null) { // clean second source
1147 Object r;
1148 if ((r = b.result) == null)
1149 b.cleanStack();
1150 if (mode >= 0 && (r != null || b.result != null))
1151 b.postComplete();
1152 }
1153 return postFire(a, mode);
1154 }
1155
1156 @SuppressWarnings("serial")
1157 static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {
1158 BiFunction<? super T,? super U,? extends V> fn;
1159 BiApply(Executor executor, CompletableFuture<V> dep,
1160 CompletableFuture<T> src, CompletableFuture<U> snd,
1161 BiFunction<? super T,? super U,? extends V> fn) {
1162 super(executor, dep, src, snd); this.fn = fn;
1163 }
1164 final CompletableFuture<V> tryFire(int mode) {
1165 CompletableFuture<V> d;
1166 CompletableFuture<T> a;
1167 CompletableFuture<U> b;
1168 Object r, s; BiFunction<? super T,? super U,? extends V> f;
1169 if ((d = dep) == null || (f = fn) == null
1170 || (a = src) == null || (r = a.result) == null
1171 || (b = snd) == null || (s = b.result) == null
1172 || !d.biApply(r, s, f, mode > 0 ? null : this))
1173 return null;
1174 dep = null; src = null; snd = null; fn = null;
1175 return d.postFire(a, b, mode);
1176 }
1177 }
1178
1179 final <R,S> boolean biApply(Object r, Object s,
1180 BiFunction<? super R,? super S,? extends T> f,
1181 BiApply<R,S,T> c) {
1182 Throwable x;
1183 tryComplete: if (result == null) {
1184 if (r instanceof AltResult) {
1185 if ((x = ((AltResult)r).ex) != null) {
1186 completeThrowable(x, r);
1187 break tryComplete;
1188 }
1189 r = null;
1190 }
1191 if (s instanceof AltResult) {
1192 if ((x = ((AltResult)s).ex) != null) {
1193 completeThrowable(x, s);
1194 break tryComplete;
1195 }
1196 s = null;
1197 }
1198 try {
1199 if (c != null && !c.claim())
1200 return false;
1201 @SuppressWarnings("unchecked") R rr = (R) r;
1202 @SuppressWarnings("unchecked") S ss = (S) s;
1203 completeValue(f.apply(rr, ss));
1204 } catch (Throwable ex) {
1205 completeThrowable(ex);
1206 }
1207 }
1208 return true;
1209 }
1210
1211 private <U,V> CompletableFuture<V> biApplyStage(
1212 Executor e, CompletionStage<U> o,
1213 BiFunction<? super T,? super U,? extends V> f) {
1214 CompletableFuture<U> b; Object r, s;
1215 if (f == null || (b = o.toCompletableFuture()) == null)
1216 throw new NullPointerException();
1217 CompletableFuture<V> d = newIncompleteFuture();
1218 if ((r = result) == null || (s = b.result) == null)
1219 bipush(b, new BiApply<T,U,V>(e, d, this, b, f));
1220 else if (e == null)
1221 d.biApply(r, s, f, null);
1222 else
1223 try {
1224 e.execute(new BiApply<T,U,V>(null, d, this, b, f));
1225 } catch (Throwable ex) {
1226 d.result = encodeThrowable(ex);
1227 }
1228 return d;
1229 }
1230
1231 @SuppressWarnings("serial")
1232 static final class BiAccept<T,U> extends BiCompletion<T,U,Void> {
1233 BiConsumer<? super T,? super U> fn;
1234 BiAccept(Executor executor, CompletableFuture<Void> dep,
1235 CompletableFuture<T> src, CompletableFuture<U> snd,
1236 BiConsumer<? super T,? super U> fn) {
1237 super(executor, dep, src, snd); this.fn = fn;
1238 }
1239 final CompletableFuture<Void> tryFire(int mode) {
1240 CompletableFuture<Void> d;
1241 CompletableFuture<T> a;
1242 CompletableFuture<U> b;
1243 Object r, s; BiConsumer<? super T,? super U> f;
1244 if ((d = dep) == null || (f = fn) == null
1245 || (a = src) == null || (r = a.result) == null
1246 || (b = snd) == null || (s = b.result) == null
1247 || !d.biAccept(r, s, f, mode > 0 ? null : this))
1248 return null;
1249 dep = null; src = null; snd = null; fn = null;
1250 return d.postFire(a, b, mode);
1251 }
1252 }
1253
1254 final <R,S> boolean biAccept(Object r, Object s,
1255 BiConsumer<? super R,? super S> f,
1256 BiAccept<R,S> c) {
1257 Throwable x;
1258 tryComplete: if (result == null) {
1259 if (r instanceof AltResult) {
1260 if ((x = ((AltResult)r).ex) != null) {
1261 completeThrowable(x, r);
1262 break tryComplete;
1263 }
1264 r = null;
1265 }
1266 if (s instanceof AltResult) {
1267 if ((x = ((AltResult)s).ex) != null) {
1268 completeThrowable(x, s);
1269 break tryComplete;
1270 }
1271 s = null;
1272 }
1273 try {
1274 if (c != null && !c.claim())
1275 return false;
1276 @SuppressWarnings("unchecked") R rr = (R) r;
1277 @SuppressWarnings("unchecked") S ss = (S) s;
1278 f.accept(rr, ss);
1279 completeNull();
1280 } catch (Throwable ex) {
1281 completeThrowable(ex);
1282 }
1283 }
1284 return true;
1285 }
1286
1287 private <U> CompletableFuture<Void> biAcceptStage(
1288 Executor e, CompletionStage<U> o,
1289 BiConsumer<? super T,? super U> f) {
1290 CompletableFuture<U> b; Object r, s;
1291 if (f == null || (b = o.toCompletableFuture()) == null)
1292 throw new NullPointerException();
1293 CompletableFuture<Void> d = newIncompleteFuture();
1294 if ((r = result) == null || (s = b.result) == null)
1295 bipush(b, new BiAccept<T,U>(e, d, this, b, f));
1296 else if (e == null)
1297 d.biAccept(r, s, f, null);
1298 else
1299 try {
1300 e.execute(new BiAccept<T,U>(null, d, this, b, f));
1301 } catch (Throwable ex) {
1302 d.result = encodeThrowable(ex);
1303 }
1304 return d;
1305 }
1306
1307 @SuppressWarnings("serial")
1308 static final class BiRun<T,U> extends BiCompletion<T,U,Void> {
1309 Runnable fn;
1310 BiRun(Executor executor, CompletableFuture<Void> dep,
1311 CompletableFuture<T> src, CompletableFuture<U> snd,
1312 Runnable fn) {
1313 super(executor, dep, src, snd); this.fn = fn;
1314 }
1315 final CompletableFuture<Void> tryFire(int mode) {
1316 CompletableFuture<Void> d;
1317 CompletableFuture<T> a;
1318 CompletableFuture<U> b;
1319 Object r, s; Runnable f;
1320 if ((d = dep) == null || (f = fn) == null
1321 || (a = src) == null || (r = a.result) == null
1322 || (b = snd) == null || (s = b.result) == null
1323 || !d.biRun(r, s, f, mode > 0 ? null : this))
1324 return null;
1325 dep = null; src = null; snd = null; fn = null;
1326 return d.postFire(a, b, mode);
1327 }
1328 }
1329
1330 final boolean biRun(Object r, Object s, Runnable f, BiRun<?,?> c) {
1331 Throwable x; Object z;
1332 if (result == null) {
1333 if ((r instanceof AltResult
1334 && (x = ((AltResult)(z = r)).ex) != null) ||
1335 (s instanceof AltResult
1336 && (x = ((AltResult)(z = s)).ex) != null))
1337 completeThrowable(x, z);
1338 else
1339 try {
1340 if (c != null && !c.claim())
1341 return false;
1342 f.run();
1343 completeNull();
1344 } catch (Throwable ex) {
1345 completeThrowable(ex);
1346 }
1347 }
1348 return true;
1349 }
1350
1351 private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o,
1352 Runnable f) {
1353 CompletableFuture<?> b; Object r, s;
1354 if (f == null || (b = o.toCompletableFuture()) == null)
1355 throw new NullPointerException();
1356 CompletableFuture<Void> d = newIncompleteFuture();
1357 if ((r = result) == null || (s = b.result) == null)
1358 bipush(b, new BiRun<>(e, d, this, b, f));
1359 else if (e == null)
1360 d.biRun(r, s, f, null);
1361 else
1362 try {
1363 e.execute(new BiRun<>(null, d, this, b, f));
1364 } catch (Throwable ex) {
1365 d.result = encodeThrowable(ex);
1366 }
1367 return d;
1368 }
1369
1370 @SuppressWarnings("serial")
1371 static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And
1372 BiRelay(CompletableFuture<Void> dep,
1373 CompletableFuture<T> src, CompletableFuture<U> snd) {
1374 super(null, dep, src, snd);
1375 }
1376 final CompletableFuture<Void> tryFire(int mode) {
1377 CompletableFuture<Void> d;
1378 CompletableFuture<T> a;
1379 CompletableFuture<U> b;
1380 Object r, s, z; Throwable x;
1381 if ((d = dep) == null
1382 || (a = src) == null || (r = a.result) == null
1383 || (b = snd) == null || (s = b.result) == null)
1384 return null;
1385 if (d.result == null) {
1386 if ((r instanceof AltResult
1387 && (x = ((AltResult)(z = r)).ex) != null) ||
1388 (s instanceof AltResult
1389 && (x = ((AltResult)(z = s)).ex) != null))
1390 d.completeThrowable(x, z);
1391 else
1392 d.completeNull();
1393 }
1394 src = null; snd = null; dep = null;
1395 return d.postFire(a, b, mode);
1396 }
1397 }
1398
1399 /** Recursively constructs a tree of completions. */
1400 static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
1401 int lo, int hi) {
1402 CompletableFuture<Void> d = new CompletableFuture<Void>();
1403 if (lo > hi) // empty
1404 d.result = NIL;
1405 else {
1406 CompletableFuture<?> a, b; Object r, s, z; Throwable x;
1407 int mid = (lo + hi) >>> 1;
1408 if ((a = (lo == mid ? cfs[lo] :
1409 andTree(cfs, lo, mid))) == null ||
1410 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
1411 andTree(cfs, mid+1, hi))) == null)
1412 throw new NullPointerException();
1413 if ((r = a.result) == null || (s = b.result) == null)
1414 a.bipush(b, new BiRelay<>(d, a, b));
1415 else if ((r instanceof AltResult
1416 && (x = ((AltResult)(z = r)).ex) != null) ||
1417 (s instanceof AltResult
1418 && (x = ((AltResult)(z = s)).ex) != null))
1419 d.result = encodeThrowable(x, z);
1420 else
1421 d.result = NIL;
1422 }
1423 return d;
1424 }
1425
1426 /* ------------- Projected (Ored) BiCompletions -------------- */
1427
1428 /**
1429 * Pushes completion to this and b unless either done.
1430 * Caller should first check that result and b.result are both null.
1431 */
1432 final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
1433 if (c != null) {
1434 while (!tryPushStack(c)) {
1435 if (result != null) {
1436 NEXT.set(c, null);
1437 break;
1438 }
1439 }
1440 if (result != null)
1441 c.tryFire(SYNC);
1442 else
1443 b.unipush(new CoCompletion(c));
1444 }
1445 }
1446
1447 @SuppressWarnings("serial")
1448 static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> {
1449 Function<? super T,? extends V> fn;
1450 OrApply(Executor executor, CompletableFuture<V> dep,
1451 CompletableFuture<T> src, CompletableFuture<U> snd,
1452 Function<? super T,? extends V> fn) {
1453 super(executor, dep, src, snd); this.fn = fn;
1454 }
1455 final CompletableFuture<V> tryFire(int mode) {
1456 CompletableFuture<V> d;
1457 CompletableFuture<T> a;
1458 CompletableFuture<U> b;
1459 Object r; Throwable x; Function<? super T,? extends V> f;
1460 if ((d = dep) == null || (f = fn) == null
1461 || (a = src) == null || (b = snd) == null
1462 || ((r = a.result) == null && (r = b.result) == null))
1463 return null;
1464 tryComplete: if (d.result == null) {
1465 try {
1466 if (mode <= 0 && !claim())
1467 return null;
1468 if (r instanceof AltResult) {
1469 if ((x = ((AltResult)r).ex) != null) {
1470 d.completeThrowable(x, r);
1471 break tryComplete;
1472 }
1473 r = null;
1474 }
1475 @SuppressWarnings("unchecked") T t = (T) r;
1476 d.completeValue(f.apply(t));
1477 } catch (Throwable ex) {
1478 d.completeThrowable(ex);
1479 }
1480 }
1481 dep = null; src = null; snd = null; fn = null;
1482 return d.postFire(a, b, mode);
1483 }
1484 }
1485
1486 private <U extends T,V> CompletableFuture<V> orApplyStage(
1487 Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) {
1488 CompletableFuture<U> b;
1489 if (f == null || (b = o.toCompletableFuture()) == null)
1490 throw new NullPointerException();
1491
1492 Object r; CompletableFuture<? extends T> z;
1493 if ((r = (z = this).result) != null ||
1494 (r = (z = b).result) != null)
1495 return z.uniApplyNow(r, e, f);
1496
1497 CompletableFuture<V> d = newIncompleteFuture();
1498 orpush(b, new OrApply<T,U,V>(e, d, this, b, f));
1499 return d;
1500 }
1501
1502 @SuppressWarnings("serial")
1503 static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> {
1504 Consumer<? super T> fn;
1505 OrAccept(Executor executor, CompletableFuture<Void> dep,
1506 CompletableFuture<T> src, CompletableFuture<U> snd,
1507 Consumer<? super T> fn) {
1508 super(executor, dep, src, snd); this.fn = fn;
1509 }
1510 final CompletableFuture<Void> tryFire(int mode) {
1511 CompletableFuture<Void> d;
1512 CompletableFuture<T> a;
1513 CompletableFuture<U> b;
1514 Object r; Throwable x; Consumer<? super T> f;
1515 if ((d = dep) == null || (f = fn) == null
1516 || (a = src) == null || (b = snd) == null
1517 || ((r = a.result) == null && (r = b.result) == null))
1518 return null;
1519 tryComplete: if (d.result == null) {
1520 try {
1521 if (mode <= 0 && !claim())
1522 return null;
1523 if (r instanceof AltResult) {
1524 if ((x = ((AltResult)r).ex) != null) {
1525 d.completeThrowable(x, r);
1526 break tryComplete;
1527 }
1528 r = null;
1529 }
1530 @SuppressWarnings("unchecked") T t = (T) r;
1531 f.accept(t);
1532 d.completeNull();
1533 } catch (Throwable ex) {
1534 d.completeThrowable(ex);
1535 }
1536 }
1537 dep = null; src = null; snd = null; fn = null;
1538 return d.postFire(a, b, mode);
1539 }
1540 }
1541
1542 private <U extends T> CompletableFuture<Void> orAcceptStage(
1543 Executor e, CompletionStage<U> o, Consumer<? super T> f) {
1544 CompletableFuture<U> b;
1545 if (f == null || (b = o.toCompletableFuture()) == null)
1546 throw new NullPointerException();
1547
1548 Object r; CompletableFuture<? extends T> z;
1549 if ((r = (z = this).result) != null ||
1550 (r = (z = b).result) != null)
1551 return z.uniAcceptNow(r, e, f);
1552
1553 CompletableFuture<Void> d = newIncompleteFuture();
1554 orpush(b, new OrAccept<T,U>(e, d, this, b, f));
1555 return d;
1556 }
1557
1558 @SuppressWarnings("serial")
1559 static final class OrRun<T,U> extends BiCompletion<T,U,Void> {
1560 Runnable fn;
1561 OrRun(Executor executor, CompletableFuture<Void> dep,
1562 CompletableFuture<T> src, CompletableFuture<U> snd,
1563 Runnable fn) {
1564 super(executor, dep, src, snd); this.fn = fn;
1565 }
1566 final CompletableFuture<Void> tryFire(int mode) {
1567 CompletableFuture<Void> d;
1568 CompletableFuture<T> a;
1569 CompletableFuture<U> b;
1570 Object r; Throwable x; Runnable f;
1571 if ((d = dep) == null || (f = fn) == null
1572 || (a = src) == null || (b = snd) == null
1573 || ((r = a.result) == null && (r = b.result) == null))
1574 return null;
1575 if (d.result == null) {
1576 try {
1577 if (mode <= 0 && !claim())
1578 return null;
1579 else if (r instanceof AltResult
1580 && (x = ((AltResult)r).ex) != null)
1581 d.completeThrowable(x, r);
1582 else {
1583 f.run();
1584 d.completeNull();
1585 }
1586 } catch (Throwable ex) {
1587 d.completeThrowable(ex);
1588 }
1589 }
1590 dep = null; src = null; snd = null; fn = null;
1591 return d.postFire(a, b, mode);
1592 }
1593 }
1594
1595 private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o,
1596 Runnable f) {
1597 CompletableFuture<?> b;
1598 if (f == null || (b = o.toCompletableFuture()) == null)
1599 throw new NullPointerException();
1600
1601 Object r; CompletableFuture<?> z;
1602 if ((r = (z = this).result) != null ||
1603 (r = (z = b).result) != null)
1604 return z.uniRunNow(r, e, f);
1605
1606 CompletableFuture<Void> d = newIncompleteFuture();
1607 orpush(b, new OrRun<>(e, d, this, b, f));
1608 return d;
1609 }
1610
1611 @SuppressWarnings("serial")
1612 static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or
1613 OrRelay(CompletableFuture<Object> dep,
1614 CompletableFuture<T> src, CompletableFuture<U> snd) {
1615 super(null, dep, src, snd);
1616 }
1617 final CompletableFuture<Object> tryFire(int mode) {
1618 CompletableFuture<Object> d;
1619 CompletableFuture<T> a;
1620 CompletableFuture<U> b;
1621 Object r;
1622 if ((d = dep) == null
1623 || (a = src) == null || (b = snd) == null
1624 || ((r = a.result) == null && (r = b.result) == null))
1625 return null;
1626 d.completeRelay(r);
1627 src = null; snd = null; dep = null;
1628 return d.postFire(a, b, mode);
1629 }
1630 }
1631
1632 /** Recursively constructs a tree of completions. */
1633 static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
1634 int lo, int hi) {
1635 CompletableFuture<Object> d = new CompletableFuture<Object>();
1636 if (lo <= hi) {
1637 CompletableFuture<?> a, b; Object r;
1638 int mid = (lo + hi) >>> 1;
1639 if ((a = (lo == mid ? cfs[lo] :
1640 orTree(cfs, lo, mid))) == null ||
1641 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
1642 orTree(cfs, mid+1, hi))) == null)
1643 throw new NullPointerException();
1644 if ((r = a.result) != null && (r = b.result) != null)
1645 d.result = encodeRelay(r);
1646 else
1647 a.orpush(b, new OrRelay<>(d, a, b));
1648 }
1649 return d;
1650 }
1651
1652 /* ------------- Zero-input Async forms -------------- */
1653
1654 @SuppressWarnings("serial")
1655 static final class AsyncSupply<T> extends ForkJoinTask<Void>
1656 implements Runnable, AsynchronousCompletionTask {
1657 CompletableFuture<T> dep; Supplier<? extends T> fn;
1658 AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {
1659 this.dep = dep; this.fn = fn;
1660 }
1661
1662 public final Void getRawResult() { return null; }
1663 public final void setRawResult(Void v) {}
1664 public final boolean exec() { run(); return false; }
1665
1666 public void run() {
1667 CompletableFuture<T> d; Supplier<? extends T> f;
1668 if ((d = dep) != null && (f = fn) != null) {
1669 dep = null; fn = null;
1670 if (d.result == null) {
1671 try {
1672 d.completeValue(f.get());
1673 } catch (Throwable ex) {
1674 d.completeThrowable(ex);
1675 }
1676 }
1677 d.postComplete();
1678 }
1679 }
1680 }
1681
1682 static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
1683 Supplier<U> f) {
1684 if (f == null) throw new NullPointerException();
1685 CompletableFuture<U> d = new CompletableFuture<U>();
1686 e.execute(new AsyncSupply<U>(d, f));
1687 return d;
1688 }
1689
1690 @SuppressWarnings("serial")
1691 static final class AsyncRun extends ForkJoinTask<Void>
1692 implements Runnable, AsynchronousCompletionTask {
1693 CompletableFuture<Void> dep; Runnable fn;
1694 AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
1695 this.dep = dep; this.fn = fn;
1696 }
1697
1698 public final Void getRawResult() { return null; }
1699 public final void setRawResult(Void v) {}
1700 public final boolean exec() { run(); return false; }
1701
1702 public void run() {
1703 CompletableFuture<Void> d; Runnable f;
1704 if ((d = dep) != null && (f = fn) != null) {
1705 dep = null; fn = null;
1706 if (d.result == null) {
1707 try {
1708 f.run();
1709 d.completeNull();
1710 } catch (Throwable ex) {
1711 d.completeThrowable(ex);
1712 }
1713 }
1714 d.postComplete();
1715 }
1716 }
1717 }
1718
1719 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
1720 if (f == null) throw new NullPointerException();
1721 CompletableFuture<Void> d = new CompletableFuture<Void>();
1722 e.execute(new AsyncRun(d, f));
1723 return d;
1724 }
1725
1726 /* ------------- Signallers -------------- */
1727
1728 /**
1729 * Completion for recording and releasing a waiting thread. This
1730 * class implements ManagedBlocker to avoid starvation when
1731 * blocking actions pile up in ForkJoinPools.
1732 */
1733 @SuppressWarnings("serial")
1734 static final class Signaller extends Completion
1735 implements ForkJoinPool.ManagedBlocker {
1736 long nanos; // remaining wait time if timed
1737 final long deadline; // non-zero if timed
1738 final boolean interruptible;
1739 boolean interrupted;
1740 volatile Thread thread;
1741
1742 Signaller(boolean interruptible, long nanos, long deadline) {
1743 this.thread = Thread.currentThread();
1744 this.interruptible = interruptible;
1745 this.nanos = nanos;
1746 this.deadline = deadline;
1747 }
1748 final CompletableFuture<?> tryFire(int ignore) {
1749 Thread w; // no need to atomically claim
1750 if ((w = thread) != null) {
1751 thread = null;
1752 LockSupport.unpark(w);
1753 }
1754 return null;
1755 }
1756 public boolean isReleasable() {
1757 if (Thread.interrupted())
1758 interrupted = true;
1759 return ((interrupted && interruptible) ||
1760 (deadline != 0L &&
1761 (nanos <= 0L ||
1762 (nanos = deadline - System.nanoTime()) <= 0L)) ||
1763 thread == null);
1764 }
1765 public boolean block() {
1766 while (!isReleasable()) {
1767 if (deadline == 0L)
1768 LockSupport.park(this);
1769 else
1770 LockSupport.parkNanos(this, nanos);
1771 }
1772 return true;
1773 }
1774 final boolean isLive() { return thread != null; }
1775 }
1776
1777 /**
1778 * Returns raw result after waiting, or null if interruptible and
1779 * interrupted.
1780 */
1781 private Object waitingGet(boolean interruptible) {
1782 Signaller q = null;
1783 boolean queued = false;
1784 Object r;
1785 while ((r = result) == null) {
1786 if (q == null) {
1787 q = new Signaller(interruptible, 0L, 0L);
1788 if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1789 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1790 }
1791 else if (!queued)
1792 queued = tryPushStack(q);
1793 else {
1794 try {
1795 ForkJoinPool.managedBlock(q);
1796 } catch (InterruptedException ie) { // currently cannot happen
1797 q.interrupted = true;
1798 }
1799 if (q.interrupted && interruptible)
1800 break;
1801 }
1802 }
1803 if (q != null && queued) {
1804 q.thread = null;
1805 if (!interruptible && q.interrupted)
1806 Thread.currentThread().interrupt();
1807 if (r == null)
1808 cleanStack();
1809 }
1810 if (r != null || (r = result) != null)
1811 postComplete();
1812 return r;
1813 }
1814
1815 /**
1816 * Returns raw result after waiting, or null if interrupted, or
1817 * throws TimeoutException on timeout.
1818 */
1819 private Object timedGet(long nanos) throws TimeoutException {
1820 if (Thread.interrupted())
1821 return null;
1822 if (nanos > 0L) {
1823 long d = System.nanoTime() + nanos;
1824 long deadline = (d == 0L) ? 1L : d; // avoid 0
1825 Signaller q = null;
1826 boolean queued = false;
1827 Object r;
1828 while ((r = result) == null) { // similar to untimed
1829 if (q == null) {
1830 q = new Signaller(true, nanos, deadline);
1831 if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1832 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1833 }
1834 else if (!queued)
1835 queued = tryPushStack(q);
1836 else if (q.nanos <= 0L)
1837 break;
1838 else {
1839 try {
1840 ForkJoinPool.managedBlock(q);
1841 } catch (InterruptedException ie) {
1842 q.interrupted = true;
1843 }
1844 if (q.interrupted)
1845 break;
1846 }
1847 }
1848 if (q != null && queued) {
1849 q.thread = null;
1850 if (r == null)
1851 cleanStack();
1852 }
1853 if (r != null || (r = result) != null)
1854 postComplete();
1855 if (r != null || (q != null && q.interrupted))
1856 return r;
1857 }
1858 throw new TimeoutException();
1859 }
1860
1861 /* ------------- public methods -------------- */
1862
1863 /**
1864 * Creates a new incomplete CompletableFuture.
1865 */
1866 public CompletableFuture() {
1867 }
1868
1869 /**
1870 * Creates a new complete CompletableFuture with given encoded result.
1871 */
1872 CompletableFuture(Object r) {
1873 this.result = r;
1874 }
1875
1876 /**
1877 * Returns a new CompletableFuture that is asynchronously completed
1878 * by a task running in the {@link ForkJoinPool#commonPool()} with
1879 * the value obtained by calling the given Supplier.
1880 *
1881 * @param supplier a function returning the value to be used
1882 * to complete the returned CompletableFuture
1883 * @param <U> the function's return type
1884 * @return the new CompletableFuture
1885 */
1886 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1887 return asyncSupplyStage(ASYNC_POOL, supplier);
1888 }
1889
1890 /**
1891 * Returns a new CompletableFuture that is asynchronously completed
1892 * by a task running in the given executor with the value obtained
1893 * by calling the given Supplier.
1894 *
1895 * @param supplier a function returning the value to be used
1896 * to complete the returned CompletableFuture
1897 * @param executor the executor to use for asynchronous execution
1898 * @param <U> the function's return type
1899 * @return the new CompletableFuture
1900 */
1901 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1902 Executor executor) {
1903 return asyncSupplyStage(screenExecutor(executor), supplier);
1904 }
1905
1906 /**
1907 * Returns a new CompletableFuture that is asynchronously completed
1908 * by a task running in the {@link ForkJoinPool#commonPool()} after
1909 * it runs the given action.
1910 *
1911 * @param runnable the action to run before completing the
1912 * returned CompletableFuture
1913 * @return the new CompletableFuture
1914 */
1915 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1916 return asyncRunStage(ASYNC_POOL, runnable);
1917 }
1918
1919 /**
1920 * Returns a new CompletableFuture that is asynchronously completed
1921 * by a task running in the given executor after it runs the given
1922 * action.
1923 *
1924 * @param runnable the action to run before completing the
1925 * returned CompletableFuture
1926 * @param executor the executor to use for asynchronous execution
1927 * @return the new CompletableFuture
1928 */
1929 public static CompletableFuture<Void> runAsync(Runnable runnable,
1930 Executor executor) {
1931 return asyncRunStage(screenExecutor(executor), runnable);
1932 }
1933
1934 /**
1935 * Returns a new CompletableFuture that is already completed with
1936 * the given value.
1937 *
1938 * @param value the value
1939 * @param <U> the type of the value
1940 * @return the completed CompletableFuture
1941 */
1942 public static <U> CompletableFuture<U> completedFuture(U value) {
1943 return new CompletableFuture<U>((value == null) ? NIL : value);
1944 }
1945
1946 /**
1947 * Returns {@code true} if completed in any fashion: normally,
1948 * exceptionally, or via cancellation.
1949 *
1950 * @return {@code true} if completed
1951 */
1952 public boolean isDone() {
1953 return result != null;
1954 }
1955
1956 /**
1957 * Waits if necessary for this future to complete, and then
1958 * returns its result.
1959 *
1960 * @return the result value
1961 * @throws CancellationException if this future was cancelled
1962 * @throws ExecutionException if this future completed exceptionally
1963 * @throws InterruptedException if the current thread was interrupted
1964 * while waiting
1965 */
1966 @SuppressWarnings("unchecked")
1967 public T get() throws InterruptedException, ExecutionException {
1968 Object r;
1969 if ((r = result) == null)
1970 r = waitingGet(true);
1971 return (T) reportGet(r);
1972 }
1973
1974 /**
1975 * Waits if necessary for at most the given time for this future
1976 * to complete, and then returns its result, if available.
1977 *
1978 * @param timeout the maximum time to wait
1979 * @param unit the time unit of the timeout argument
1980 * @return the result value
1981 * @throws CancellationException if this future was cancelled
1982 * @throws ExecutionException if this future completed exceptionally
1983 * @throws InterruptedException if the current thread was interrupted
1984 * while waiting
1985 * @throws TimeoutException if the wait timed out
1986 */
1987 @SuppressWarnings("unchecked")
1988 public T get(long timeout, TimeUnit unit)
1989 throws InterruptedException, ExecutionException, TimeoutException {
1990 long nanos = unit.toNanos(timeout);
1991 Object r;
1992 if ((r = result) == null)
1993 r = timedGet(nanos);
1994 return (T) reportGet(r);
1995 }
1996
1997 /**
1998 * Returns the result value when complete, or throws an
1999 * (unchecked) exception if completed exceptionally. To better
2000 * conform with the use of common functional forms, if a
2001 * computation involved in the completion of this
2002 * CompletableFuture threw an exception, this method throws an
2003 * (unchecked) {@link CompletionException} with the underlying
2004 * exception as its cause.
2005 *
2006 * @return the result value
2007 * @throws CancellationException if the computation was cancelled
2008 * @throws CompletionException if this future completed
2009 * exceptionally or a completion computation threw an exception
2010 */
2011 @SuppressWarnings("unchecked")
2012 public T join() {
2013 Object r;
2014 if ((r = result) == null)
2015 r = waitingGet(false);
2016 return (T) reportJoin(r);
2017 }
2018
2019 /**
2020 * Returns the result value (or throws any encountered exception)
2021 * if completed, else returns the given valueIfAbsent.
2022 *
2023 * @param valueIfAbsent the value to return if not completed
2024 * @return the result value, if completed, else the given valueIfAbsent
2025 * @throws CancellationException if the computation was cancelled
2026 * @throws CompletionException if this future completed
2027 * exceptionally or a completion computation threw an exception
2028 */
2029 @SuppressWarnings("unchecked")
2030 public T getNow(T valueIfAbsent) {
2031 Object r;
2032 return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r);
2033 }
2034
2035 /**
2036 * If not already completed, sets the value returned by {@link
2037 * #get()} and related methods to the given value.
2038 *
2039 * @param value the result value
2040 * @return {@code true} if this invocation caused this CompletableFuture
2041 * to transition to a completed state, else {@code false}
2042 */
2043 public boolean complete(T value) {
2044 boolean triggered = completeValue(value);
2045 postComplete();
2046 return triggered;
2047 }
2048
2049 /**
2050 * If not already completed, causes invocations of {@link #get()}
2051 * and related methods to throw the given exception.
2052 *
2053 * @param ex the exception
2054 * @return {@code true} if this invocation caused this CompletableFuture
2055 * to transition to a completed state, else {@code false}
2056 */
2057 public boolean completeExceptionally(Throwable ex) {
2058 if (ex == null) throw new NullPointerException();
2059 boolean triggered = internalComplete(new AltResult(ex));
2060 postComplete();
2061 return triggered;
2062 }
2063
2064 public <U> CompletableFuture<U> thenApply(
2065 Function<? super T,? extends U> fn) {
2066 return uniApplyStage(null, fn);
2067 }
2068
2069 public <U> CompletableFuture<U> thenApplyAsync(
2070 Function<? super T,? extends U> fn) {
2071 return uniApplyStage(defaultExecutor(), fn);
2072 }
2073
2074 public <U> CompletableFuture<U> thenApplyAsync(
2075 Function<? super T,? extends U> fn, Executor executor) {
2076 return uniApplyStage(screenExecutor(executor), fn);
2077 }
2078
2079 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
2080 return uniAcceptStage(null, action);
2081 }
2082
2083 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
2084 return uniAcceptStage(defaultExecutor(), action);
2085 }
2086
2087 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
2088 Executor executor) {
2089 return uniAcceptStage(screenExecutor(executor), action);
2090 }
2091
2092 public CompletableFuture<Void> thenRun(Runnable action) {
2093 return uniRunStage(null, action);
2094 }
2095
2096 public CompletableFuture<Void> thenRunAsync(Runnable action) {
2097 return uniRunStage(defaultExecutor(), action);
2098 }
2099
2100 public CompletableFuture<Void> thenRunAsync(Runnable action,
2101 Executor executor) {
2102 return uniRunStage(screenExecutor(executor), action);
2103 }
2104
2105 public <U,V> CompletableFuture<V> thenCombine(
2106 CompletionStage<? extends U> other,
2107 BiFunction<? super T,? super U,? extends V> fn) {
2108 return biApplyStage(null, other, fn);
2109 }
2110
2111 public <U,V> CompletableFuture<V> thenCombineAsync(
2112 CompletionStage<? extends U> other,
2113 BiFunction<? super T,? super U,? extends V> fn) {
2114 return biApplyStage(defaultExecutor(), other, fn);
2115 }
2116
2117 public <U,V> CompletableFuture<V> thenCombineAsync(
2118 CompletionStage<? extends U> other,
2119 BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
2120 return biApplyStage(screenExecutor(executor), other, fn);
2121 }
2122
2123 public <U> CompletableFuture<Void> thenAcceptBoth(
2124 CompletionStage<? extends U> other,
2125 BiConsumer<? super T, ? super U> action) {
2126 return biAcceptStage(null, other, action);
2127 }
2128
2129 public <U> CompletableFuture<Void> thenAcceptBothAsync(
2130 CompletionStage<? extends U> other,
2131 BiConsumer<? super T, ? super U> action) {
2132 return biAcceptStage(defaultExecutor(), other, action);
2133 }
2134
2135 public <U> CompletableFuture<Void> thenAcceptBothAsync(
2136 CompletionStage<? extends U> other,
2137 BiConsumer<? super T, ? super U> action, Executor executor) {
2138 return biAcceptStage(screenExecutor(executor), other, action);
2139 }
2140
2141 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
2142 Runnable action) {
2143 return biRunStage(null, other, action);
2144 }
2145
2146 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
2147 Runnable action) {
2148 return biRunStage(defaultExecutor(), other, action);
2149 }
2150
2151 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
2152 Runnable action,
2153 Executor executor) {
2154 return biRunStage(screenExecutor(executor), other, action);
2155 }
2156
2157 public <U> CompletableFuture<U> applyToEither(
2158 CompletionStage<? extends T> other, Function<? super T, U> fn) {
2159 return orApplyStage(null, other, fn);
2160 }
2161
2162 public <U> CompletableFuture<U> applyToEitherAsync(
2163 CompletionStage<? extends T> other, Function<? super T, U> fn) {
2164 return orApplyStage(defaultExecutor(), other, fn);
2165 }
2166
2167 public <U> CompletableFuture<U> applyToEitherAsync(
2168 CompletionStage<? extends T> other, Function<? super T, U> fn,
2169 Executor executor) {
2170 return orApplyStage(screenExecutor(executor), other, fn);
2171 }
2172
2173 public CompletableFuture<Void> acceptEither(
2174 CompletionStage<? extends T> other, Consumer<? super T> action) {
2175 return orAcceptStage(null, other, action);
2176 }
2177
2178 public CompletableFuture<Void> acceptEitherAsync(
2179 CompletionStage<? extends T> other, Consumer<? super T> action) {
2180 return orAcceptStage(defaultExecutor(), other, action);
2181 }
2182
2183 public CompletableFuture<Void> acceptEitherAsync(
2184 CompletionStage<? extends T> other, Consumer<? super T> action,
2185 Executor executor) {
2186 return orAcceptStage(screenExecutor(executor), other, action);
2187 }
2188
2189 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2190 Runnable action) {
2191 return orRunStage(null, other, action);
2192 }
2193
2194 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2195 Runnable action) {
2196 return orRunStage(defaultExecutor(), other, action);
2197 }
2198
2199 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2200 Runnable action,
2201 Executor executor) {
2202 return orRunStage(screenExecutor(executor), other, action);
2203 }
2204
2205 public <U> CompletableFuture<U> thenCompose(
2206 Function<? super T, ? extends CompletionStage<U>> fn) {
2207 return uniComposeStage(null, fn);
2208 }
2209
2210 public <U> CompletableFuture<U> thenComposeAsync(
2211 Function<? super T, ? extends CompletionStage<U>> fn) {
2212 return uniComposeStage(defaultExecutor(), fn);
2213 }
2214
2215 public <U> CompletableFuture<U> thenComposeAsync(
2216 Function<? super T, ? extends CompletionStage<U>> fn,
2217 Executor executor) {
2218 return uniComposeStage(screenExecutor(executor), fn);
2219 }
2220
2221 public CompletableFuture<T> whenComplete(
2222 BiConsumer<? super T, ? super Throwable> action) {
2223 return uniWhenCompleteStage(null, action);
2224 }
2225
2226 public CompletableFuture<T> whenCompleteAsync(
2227 BiConsumer<? super T, ? super Throwable> action) {
2228 return uniWhenCompleteStage(defaultExecutor(), action);
2229 }
2230
2231 public CompletableFuture<T> whenCompleteAsync(
2232 BiConsumer<? super T, ? super Throwable> action, Executor executor) {
2233 return uniWhenCompleteStage(screenExecutor(executor), action);
2234 }
2235
2236 public <U> CompletableFuture<U> handle(
2237 BiFunction<? super T, Throwable, ? extends U> fn) {
2238 return uniHandleStage(null, fn);
2239 }
2240
2241 public <U> CompletableFuture<U> handleAsync(
2242 BiFunction<? super T, Throwable, ? extends U> fn) {
2243 return uniHandleStage(defaultExecutor(), fn);
2244 }
2245
2246 public <U> CompletableFuture<U> handleAsync(
2247 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
2248 return uniHandleStage(screenExecutor(executor), fn);
2249 }
2250
2251 /**
2252 * Returns this CompletableFuture.
2253 *
2254 * @return this CompletableFuture
2255 */
2256 public CompletableFuture<T> toCompletableFuture() {
2257 return this;
2258 }
2259
2260 // not in interface CompletionStage
2261
2262 /**
2263 * Returns a new CompletableFuture that is completed when this
2264 * CompletableFuture completes, with the result of the given
2265 * function of the exception triggering this CompletableFuture's
2266 * completion when it completes exceptionally; otherwise, if this
2267 * CompletableFuture completes normally, then the returned
2268 * CompletableFuture also completes normally with the same value.
2269 * Note: More flexible versions of this functionality are
2270 * available using methods {@code whenComplete} and {@code handle}.
2271 *
2272 * @param fn the function to use to compute the value of the
2273 * returned CompletableFuture if this CompletableFuture completed
2274 * exceptionally
2275 * @return the new CompletableFuture
2276 */
2277 public CompletableFuture<T> exceptionally(
2278 Function<Throwable, ? extends T> fn) {
2279 return uniExceptionallyStage(fn);
2280 }
2281
2282
2283 /* ------------- Arbitrary-arity constructions -------------- */
2284
2285 /**
2286 * Returns a new CompletableFuture that is completed when all of
2287 * the given CompletableFutures complete. If any of the given
2288 * CompletableFutures complete exceptionally, then the returned
2289 * CompletableFuture also does so, with a CompletionException
2290 * holding this exception as its cause. Otherwise, the results,
2291 * if any, of the given CompletableFutures are not reflected in
2292 * the returned CompletableFuture, but may be obtained by
2293 * inspecting them individually. If no CompletableFutures are
2294 * provided, returns a CompletableFuture completed with the value
2295 * {@code null}.
2296 *
2297 * <p>Among the applications of this method is to await completion
2298 * of a set of independent CompletableFutures before continuing a
2299 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2300 * c3).join();}.
2301 *
2302 * @param cfs the CompletableFutures
2303 * @return a new CompletableFuture that is completed when all of the
2304 * given CompletableFutures complete
2305 * @throws NullPointerException if the array or any of its elements are
2306 * {@code null}
2307 */
2308 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2309 return andTree(cfs, 0, cfs.length - 1);
2310 }
2311
2312 /**
2313 * Returns a new CompletableFuture that is completed when any of
2314 * the given CompletableFutures complete, with the same result.
2315 * Otherwise, if it completed exceptionally, the returned
2316 * CompletableFuture also does so, with a CompletionException
2317 * holding this exception as its cause. If no CompletableFutures
2318 * are provided, returns an incomplete CompletableFuture.
2319 *
2320 * @param cfs the CompletableFutures
2321 * @return a new CompletableFuture that is completed with the
2322 * result or exception of any of the given CompletableFutures when
2323 * one completes
2324 * @throws NullPointerException if the array or any of its elements are
2325 * {@code null}
2326 */
2327 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2328 return orTree(cfs, 0, cfs.length - 1);
2329 }
2330
2331 /* ------------- Control and status methods -------------- */
2332
2333 /**
2334 * If not already completed, completes this CompletableFuture with
2335 * a {@link CancellationException}. Dependent CompletableFutures
2336 * that have not already completed will also complete
2337 * exceptionally, with a {@link CompletionException} caused by
2338 * this {@code CancellationException}.
2339 *
2340 * @param mayInterruptIfRunning this value has no effect in this
2341 * implementation because interrupts are not used to control
2342 * processing.
2343 *
2344 * @return {@code true} if this task is now cancelled
2345 */
2346 public boolean cancel(boolean mayInterruptIfRunning) {
2347 boolean cancelled = (result == null) &&
2348 internalComplete(new AltResult(new CancellationException()));
2349 postComplete();
2350 return cancelled || isCancelled();
2351 }
2352
2353 /**
2354 * Returns {@code true} if this CompletableFuture was cancelled
2355 * before it completed normally.
2356 *
2357 * @return {@code true} if this CompletableFuture was cancelled
2358 * before it completed normally
2359 */
2360 public boolean isCancelled() {
2361 Object r;
2362 return ((r = result) instanceof AltResult) &&
2363 (((AltResult)r).ex instanceof CancellationException);
2364 }
2365
2366 /**
2367 * Returns {@code true} if this CompletableFuture completed
2368 * exceptionally, in any way. Possible causes include
2369 * cancellation, explicit invocation of {@code
2370 * completeExceptionally}, and abrupt termination of a
2371 * CompletionStage action.
2372 *
2373 * @return {@code true} if this CompletableFuture completed
2374 * exceptionally
2375 */
2376 public boolean isCompletedExceptionally() {
2377 Object r;
2378 return ((r = result) instanceof AltResult) && r != NIL;
2379 }
2380
2381 /**
2382 * Forcibly sets or resets the value subsequently returned by
2383 * method {@link #get()} and related methods, whether or not
2384 * already completed. This method is designed for use only in
2385 * error recovery actions, and even in such situations may result
2386 * in ongoing dependent completions using established versus
2387 * overwritten outcomes.
2388 *
2389 * @param value the completion value
2390 */
2391 public void obtrudeValue(T value) {
2392 result = (value == null) ? NIL : value;
2393 postComplete();
2394 }
2395
2396 /**
2397 * Forcibly causes subsequent invocations of method {@link #get()}
2398 * and related methods to throw the given exception, whether or
2399 * not already completed. This method is designed for use only in
2400 * error recovery actions, and even in such situations may result
2401 * in ongoing dependent completions using established versus
2402 * overwritten outcomes.
2403 *
2404 * @param ex the exception
2405 * @throws NullPointerException if the exception is null
2406 */
2407 public void obtrudeException(Throwable ex) {
2408 if (ex == null) throw new NullPointerException();
2409 result = new AltResult(ex);
2410 postComplete();
2411 }
2412
2413 /**
2414 * Returns the estimated number of CompletableFutures whose
2415 * completions are awaiting completion of this CompletableFuture.
2416 * This method is designed for use in monitoring system state, not
2417 * for synchronization control.
2418 *
2419 * @return the number of dependent CompletableFutures
2420 */
2421 public int getNumberOfDependents() {
2422 int count = 0;
2423 for (Completion p = stack; p != null; p = p.next)
2424 ++count;
2425 return count;
2426 }
2427
2428 /**
2429 * Returns a string identifying this CompletableFuture, as well as
2430 * its completion state. The state, in brackets, contains the
2431 * String {@code "Completed Normally"} or the String {@code
2432 * "Completed Exceptionally"}, or the String {@code "Not
2433 * completed"} followed by the number of CompletableFutures
2434 * dependent upon its completion, if any.
2435 *
2436 * @return a string identifying this CompletableFuture, as well as its state
2437 */
2438 public String toString() {
2439 Object r = result;
2440 int count = 0; // avoid call to getNumberOfDependents in case disabled
2441 for (Completion p = stack; p != null; p = p.next)
2442 ++count;
2443 return super.toString() +
2444 ((r == null) ?
2445 ((count == 0) ?
2446 "[Not completed]" :
2447 "[Not completed, " + count + " dependents]") :
2448 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2449 "[Completed exceptionally]" :
2450 "[Completed normally]"));
2451 }
2452
2453 // jdk9 additions
2454
2455 /**
2456 * Returns a new incomplete CompletableFuture of the type to be
2457 * returned by a CompletionStage method. Subclasses should
2458 * normally override this method to return an instance of the same
2459 * class as this CompletableFuture. The default implementation
2460 * returns an instance of class CompletableFuture.
2461 *
2462 * @param <U> the type of the value
2463 * @return a new CompletableFuture
2464 * @since 9
2465 */
2466 public <U> CompletableFuture<U> newIncompleteFuture() {
2467 return new CompletableFuture<U>();
2468 }
2469
2470 /**
2471 * Returns the default Executor used for async methods that do not
2472 * specify an Executor. This class uses the {@link
2473 * ForkJoinPool#commonPool()} if it supports more than one
2474 * parallel thread, or else an Executor using one thread per async
2475 * task. This method may be overridden in subclasses to return
2476 * an Executor that provides at least one independent thread.
2477 *
2478 * @return the executor
2479 * @since 9
2480 */
2481 public Executor defaultExecutor() {
2482 return ASYNC_POOL;
2483 }
2484
2485 /**
2486 * Returns a new CompletableFuture that is completed normally with
2487 * the same value as this CompletableFuture when it completes
2488 * normally. If this CompletableFuture completes exceptionally,
2489 * then the returned CompletableFuture completes exceptionally
2490 * with a CompletionException with this exception as cause. The
2491 * behavior is equivalent to {@code thenApply(x -> x)}. This
2492 * method may be useful as a form of "defensive copying", to
2493 * prevent clients from completing, while still being able to
2494 * arrange dependent actions.
2495 *
2496 * @return the new CompletableFuture
2497 * @since 9
2498 */
2499 public CompletableFuture<T> copy() {
2500 return uniCopyStage();
2501 }
2502
2503 /**
2504 * Returns a new CompletionStage that is completed normally with
2505 * the same value as this CompletableFuture when it completes
2506 * normally, and cannot be independently completed or otherwise
2507 * used in ways not defined by the methods of interface {@link
2508 * CompletionStage}. If this CompletableFuture completes
2509 * exceptionally, then the returned CompletionStage completes
2510 * exceptionally with a CompletionException with this exception as
2511 * cause.
2512 *
2513 * @return the new CompletionStage
2514 * @since 9
2515 */
2516 public CompletionStage<T> minimalCompletionStage() {
2517 return uniAsMinimalStage();
2518 }
2519
2520 /**
2521 * Completes this CompletableFuture with the result of
2522 * the given Supplier function invoked from an asynchronous
2523 * task using the given executor.
2524 *
2525 * @param supplier a function returning the value to be used
2526 * to complete this CompletableFuture
2527 * @param executor the executor to use for asynchronous execution
2528 * @return this CompletableFuture
2529 * @since 9
2530 */
2531 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier,
2532 Executor executor) {
2533 if (supplier == null || executor == null)
2534 throw new NullPointerException();
2535 executor.execute(new AsyncSupply<T>(this, supplier));
2536 return this;
2537 }
2538
2539 /**
2540 * Completes this CompletableFuture with the result of the given
2541 * Supplier function invoked from an asynchronous task using the
2542 * default executor.
2543 *
2544 * @param supplier a function returning the value to be used
2545 * to complete this CompletableFuture
2546 * @return this CompletableFuture
2547 * @since 9
2548 */
2549 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {
2550 return completeAsync(supplier, defaultExecutor());
2551 }
2552
2553 /**
2554 * Exceptionally completes this CompletableFuture with
2555 * a {@link TimeoutException} if not otherwise completed
2556 * before the given timeout.
2557 *
2558 * @param timeout how long to wait before completing exceptionally
2559 * with a TimeoutException, in units of {@code unit}
2560 * @param unit a {@code TimeUnit} determining how to interpret the
2561 * {@code timeout} parameter
2562 * @return this CompletableFuture
2563 * @since 9
2564 */
2565 public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
2566 if (unit == null)
2567 throw new NullPointerException();
2568 if (result == null)
2569 whenComplete(new Canceller(Delayer.delay(new Timeout(this),
2570 timeout, unit)));
2571 return this;
2572 }
2573
2574 /**
2575 * Completes this CompletableFuture with the given value if not
2576 * otherwise completed before the given timeout.
2577 *
2578 * @param value the value to use upon timeout
2579 * @param timeout how long to wait before completing normally
2580 * with the given value, in units of {@code unit}
2581 * @param unit a {@code TimeUnit} determining how to interpret the
2582 * {@code timeout} parameter
2583 * @return this CompletableFuture
2584 * @since 9
2585 */
2586 public CompletableFuture<T> completeOnTimeout(T value, long timeout,
2587 TimeUnit unit) {
2588 if (unit == null)
2589 throw new NullPointerException();
2590 if (result == null)
2591 whenComplete(new Canceller(Delayer.delay(
2592 new DelayedCompleter<T>(this, value),
2593 timeout, unit)));
2594 return this;
2595 }
2596
2597 /**
2598 * Returns a new Executor that submits a task to the given base
2599 * executor after the given delay (or no delay if non-positive).
2600 * Each delay commences upon invocation of the returned executor's
2601 * {@code execute} method.
2602 *
2603 * @param delay how long to delay, in units of {@code unit}
2604 * @param unit a {@code TimeUnit} determining how to interpret the
2605 * {@code delay} parameter
2606 * @param executor the base executor
2607 * @return the new delayed executor
2608 * @since 9
2609 */
2610 public static Executor delayedExecutor(long delay, TimeUnit unit,
2611 Executor executor) {
2612 if (unit == null || executor == null)
2613 throw new NullPointerException();
2614 return new DelayedExecutor(delay, unit, executor);
2615 }
2616
2617 /**
2618 * Returns a new Executor that submits a task to the default
2619 * executor after the given delay (or no delay if non-positive).
2620 * Each delay commences upon invocation of the returned executor's
2621 * {@code execute} method.
2622 *
2623 * @param delay how long to delay, in units of {@code unit}
2624 * @param unit a {@code TimeUnit} determining how to interpret the
2625 * {@code delay} parameter
2626 * @return the new delayed executor
2627 * @since 9
2628 */
2629 public static Executor delayedExecutor(long delay, TimeUnit unit) {
2630 if (unit == null)
2631 throw new NullPointerException();
2632 return new DelayedExecutor(delay, unit, ASYNC_POOL);
2633 }
2634
2635 /**
2636 * Returns a new CompletionStage that is already completed with
2637 * the given value and supports only those methods in
2638 * interface {@link CompletionStage}.
2639 *
2640 * @param value the value
2641 * @param <U> the type of the value
2642 * @return the completed CompletionStage
2643 * @since 9
2644 */
2645 public static <U> CompletionStage<U> completedStage(U value) {
2646 return new MinimalStage<U>((value == null) ? NIL : value);
2647 }
2648
2649 /**
2650 * Returns a new CompletableFuture that is already completed
2651 * exceptionally with the given exception.
2652 *
2653 * @param ex the exception
2654 * @param <U> the type of the value
2655 * @return the exceptionally completed CompletableFuture
2656 * @since 9
2657 */
2658 public static <U> CompletableFuture<U> failedFuture(Throwable ex) {
2659 if (ex == null) throw new NullPointerException();
2660 return new CompletableFuture<U>(new AltResult(ex));
2661 }
2662
2663 /**
2664 * Returns a new CompletionStage that is already completed
2665 * exceptionally with the given exception and supports only those
2666 * methods in interface {@link CompletionStage}.
2667 *
2668 * @param ex the exception
2669 * @param <U> the type of the value
2670 * @return the exceptionally completed CompletionStage
2671 * @since 9
2672 */
2673 public static <U> CompletionStage<U> failedStage(Throwable ex) {
2674 if (ex == null) throw new NullPointerException();
2675 return new MinimalStage<U>(new AltResult(ex));
2676 }
2677
2678 /**
2679 * Singleton delay scheduler, used only for starting and
2680 * cancelling tasks.
2681 */
2682 static final class Delayer {
2683 static ScheduledFuture<?> delay(Runnable command, long delay,
2684 TimeUnit unit) {
2685 return delayer.schedule(command, delay, unit);
2686 }
2687
2688 static final class DaemonThreadFactory implements ThreadFactory {
2689 public Thread newThread(Runnable r) {
2690 Thread t = new Thread(r);
2691 t.setDaemon(true);
2692 t.setName("CompletableFutureDelayScheduler");
2693 return t;
2694 }
2695 }
2696
2697 static final ScheduledThreadPoolExecutor delayer;
2698 static {
2699 (delayer = new ScheduledThreadPoolExecutor(
2700 1, new DaemonThreadFactory())).
2701 setRemoveOnCancelPolicy(true);
2702 }
2703 }
2704
2705 // Little class-ified lambdas to better support monitoring
2706
2707 static final class DelayedExecutor implements Executor {
2708 final long delay;
2709 final TimeUnit unit;
2710 final Executor executor;
2711 DelayedExecutor(long delay, TimeUnit unit, Executor executor) {
2712 this.delay = delay; this.unit = unit; this.executor = executor;
2713 }
2714 public void execute(Runnable r) {
2715 Delayer.delay(new TaskSubmitter(executor, r), delay, unit);
2716 }
2717 }
2718
2719 /** Action to submit user task */
2720 static final class TaskSubmitter implements Runnable {
2721 final Executor executor;
2722 final Runnable action;
2723 TaskSubmitter(Executor executor, Runnable action) {
2724 this.executor = executor;
2725 this.action = action;
2726 }
2727 public void run() { executor.execute(action); }
2728 }
2729
2730 /** Action to completeExceptionally on timeout */
2731 static final class Timeout implements Runnable {
2732 final CompletableFuture<?> f;
2733 Timeout(CompletableFuture<?> f) { this.f = f; }
2734 public void run() {
2735 if (f != null && !f.isDone())
2736 f.completeExceptionally(new TimeoutException());
2737 }
2738 }
2739
2740 /** Action to complete on timeout */
2741 static final class DelayedCompleter<U> implements Runnable {
2742 final CompletableFuture<U> f;
2743 final U u;
2744 DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; }
2745 public void run() {
2746 if (f != null)
2747 f.complete(u);
2748 }
2749 }
2750
2751 /** Action to cancel unneeded timeouts */
2752 static final class Canceller implements BiConsumer<Object, Throwable> {
2753 final Future<?> f;
2754 Canceller(Future<?> f) { this.f = f; }
2755 public void accept(Object ignore, Throwable ex) {
2756 if (ex == null && f != null && !f.isDone())
2757 f.cancel(false);
2758 }
2759 }
2760
2761 /**
2762 * A subclass that just throws UOE for most non-CompletionStage methods.
2763 */
2764 static final class MinimalStage<T> extends CompletableFuture<T> {
2765 MinimalStage() { }
2766 MinimalStage(Object r) { super(r); }
2767 @Override public <U> CompletableFuture<U> newIncompleteFuture() {
2768 return new MinimalStage<U>(); }
2769 @Override public T get() {
2770 throw new UnsupportedOperationException(); }
2771 @Override public T get(long timeout, TimeUnit unit) {
2772 throw new UnsupportedOperationException(); }
2773 @Override public T getNow(T valueIfAbsent) {
2774 throw new UnsupportedOperationException(); }
2775 @Override public T join() {
2776 throw new UnsupportedOperationException(); }
2777 @Override public boolean complete(T value) {
2778 throw new UnsupportedOperationException(); }
2779 @Override public boolean completeExceptionally(Throwable ex) {
2780 throw new UnsupportedOperationException(); }
2781 @Override public boolean cancel(boolean mayInterruptIfRunning) {
2782 throw new UnsupportedOperationException(); }
2783 @Override public void obtrudeValue(T value) {
2784 throw new UnsupportedOperationException(); }
2785 @Override public void obtrudeException(Throwable ex) {
2786 throw new UnsupportedOperationException(); }
2787 @Override public boolean isDone() {
2788 throw new UnsupportedOperationException(); }
2789 @Override public boolean isCancelled() {
2790 throw new UnsupportedOperationException(); }
2791 @Override public boolean isCompletedExceptionally() {
2792 throw new UnsupportedOperationException(); }
2793 @Override public int getNumberOfDependents() {
2794 throw new UnsupportedOperationException(); }
2795 @Override public CompletableFuture<T> completeAsync
2796 (Supplier<? extends T> supplier, Executor executor) {
2797 throw new UnsupportedOperationException(); }
2798 @Override public CompletableFuture<T> completeAsync
2799 (Supplier<? extends T> supplier) {
2800 throw new UnsupportedOperationException(); }
2801 @Override public CompletableFuture<T> orTimeout
2802 (long timeout, TimeUnit unit) {
2803 throw new UnsupportedOperationException(); }
2804 @Override public CompletableFuture<T> completeOnTimeout
2805 (T value, long timeout, TimeUnit unit) {
2806 throw new UnsupportedOperationException(); }
2807 }
2808
2809 // VarHandle mechanics
2810 private static final VarHandle RESULT;
2811 private static final VarHandle STACK;
2812 private static final VarHandle NEXT;
2813 static {
2814 try {
2815 MethodHandles.Lookup l = MethodHandles.lookup();
2816 RESULT = l.findVarHandle(CompletableFuture.class, "result", Object.class);
2817 STACK = l.findVarHandle(CompletableFuture.class, "stack", Completion.class);
2818 NEXT = l.findVarHandle(Completion.class, "next", Completion.class);
2819 } catch (ReflectiveOperationException e) {
2820 throw new Error(e);
2821 }
2822
2823 // Reduce the risk of rare disastrous classloading in first call to
2824 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
2825 Class<?> ensureLoaded = LockSupport.class;
2826 }
2827 }