ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.202
Committed: Sun Jun 26 22:07:25 2016 UTC (7 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.201: +37 -29 lines
Log Message:
refresh internal documentation

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.invoke.MethodHandles;
10 import java.lang.invoke.VarHandle;
11 import java.util.concurrent.locks.LockSupport;
12 import java.util.function.BiConsumer;
13 import java.util.function.BiFunction;
14 import java.util.function.Consumer;
15 import java.util.function.Function;
16 import java.util.function.Supplier;
17
18 /**
19 * A {@link Future} that may be explicitly completed (setting its
20 * value and status), and may be used as a {@link CompletionStage},
21 * supporting dependent functions and actions that trigger upon its
22 * completion.
23 *
24 * <p>When two or more threads attempt to
25 * {@link #complete complete},
26 * {@link #completeExceptionally completeExceptionally}, or
27 * {@link #cancel cancel}
28 * a CompletableFuture, only one of them succeeds.
29 *
30 * <p>In addition to these and related methods for directly
31 * manipulating status and results, CompletableFuture implements
32 * interface {@link CompletionStage} with the following policies: <ul>
33 *
34 * <li>Actions supplied for dependent completions of
35 * <em>non-async</em> methods may be performed by the thread that
36 * completes the current CompletableFuture, or by any other caller of
37 * a completion method.
38 *
39 * <li>All <em>async</em> methods without an explicit Executor
40 * argument are performed using the {@link ForkJoinPool#commonPool()}
41 * (unless it does not support a parallelism level of at least two, in
42 * which case, a new Thread is created to run each task). This may be
43 * overridden for non-static methods in subclasses by defining method
44 * {@link #defaultExecutor()}. To simplify monitoring, debugging,
45 * and tracking, all generated asynchronous tasks are instances of the
46 * marker interface {@link AsynchronousCompletionTask}. Operations
47 * with time-delays can use adapter methods defined in this class, for
48 * example: {@code supplyAsync(supplier, delayedExecutor(timeout,
49 * timeUnit))}. To support methods with delays and timeouts, this
50 * class maintains at most one daemon thread for triggering and
51 * cancelling actions, not for running them.
52 *
53 * <li>All CompletionStage methods are implemented independently of
54 * other public methods, so the behavior of one method is not impacted
55 * by overrides of others in subclasses.
56 *
57 * <li>All CompletionStage methods return CompletableFutures. To
58 * restrict usages to only those methods defined in interface
59 * CompletionStage, use method {@link #minimalCompletionStage}. Or to
60 * ensure only that clients do not themselves modify a future, use
61 * method {@link #copy}.
62 * </ul>
63 *
64 * <p>CompletableFuture also implements {@link Future} with the following
65 * policies: <ul>
66 *
67 * <li>Since (unlike {@link FutureTask}) this class has no direct
68 * control over the computation that causes it to be completed,
69 * cancellation is treated as just another form of exceptional
70 * completion. Method {@link #cancel cancel} has the same effect as
71 * {@code completeExceptionally(new CancellationException())}. Method
72 * {@link #isCompletedExceptionally} can be used to determine if a
73 * CompletableFuture completed in any exceptional fashion.
74 *
75 * <li>In case of exceptional completion with a CompletionException,
76 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
77 * {@link ExecutionException} with the same cause as held in the
78 * corresponding CompletionException. To simplify usage in most
79 * contexts, this class also defines methods {@link #join()} and
80 * {@link #getNow} that instead throw the CompletionException directly
81 * in these cases.
82 * </ul>
83 *
84 * <p>Arguments used to pass a completion result (that is, for
85 * parameters of type {@code T}) for methods accepting them may be
86 * null, but passing a null value for any other parameter will result
87 * in a {@link NullPointerException} being thrown.
88 *
89 * <p>Subclasses of this class should normally override the "virtual
90 * constructor" method {@link #newIncompleteFuture}, which establishes
91 * the concrete type returned by CompletionStage methods. For example,
92 * here is a class that substitutes a different default Executor and
93 * disables the {@code obtrude} methods:
94 *
95 * <pre> {@code
96 * class MyCompletableFuture<T> extends CompletableFuture<T> {
97 * static final Executor myExecutor = ...;
98 * public MyCompletableFuture() { }
99 * public <U> CompletableFuture<U> newIncompleteFuture() {
100 * return new MyCompletableFuture<U>(); }
101 * public Executor defaultExecutor() {
102 * return myExecutor; }
103 * public void obtrudeValue(T value) {
104 * throw new UnsupportedOperationException(); }
105 * public void obtrudeException(Throwable ex) {
106 * throw new UnsupportedOperationException(); }
107 * }}</pre>
108 *
109 * @author Doug Lea
110 * @since 1.8
111 * @param <T> The result type returned by this future's {@code join}
112 * and {@code get} methods
113 */
114 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
115
116 /*
117 * Overview:
118 *
119 * A CompletableFuture may have dependent completion actions,
120 * collected in a linked stack. It atomically completes by CASing
121 * a result field, and then pops off and runs those actions. This
122 * applies across normal vs exceptional outcomes, sync vs async
123 * actions, binary triggers, and various forms of completions.
124 *
125 * Non-nullness of volatile field "result" indicates done. It may
126 * be set directly if known to be thread-confined, else via CAS.
127 * An AltResult is used to box null as a result, as well as to
128 * hold exceptions. Using a single field makes completion simple
129 * to detect and trigger. Result encoding and decoding is
130 * straightforward but tedious and adds to the sprawl of trapping
131 * and associating exceptions with targets. Minor simplifications
132 * rely on (static) NIL (to box null results) being the only
133 * AltResult with a null exception field, so we don't usually need
134 * explicit comparisons. Even though some of the generics casts
135 * are unchecked (see SuppressWarnings annotations), they are
136 * placed to be appropriate even if checked.
137 *
138 * Dependent actions are represented by Completion objects linked
139 * as Treiber stacks headed by field "stack". There are Completion
140 * classes for each kind of action, grouped into:
141 * - single-input (UniCompletion),
142 * - two-input (BiCompletion),
143 * - projected (BiCompletions using exactly one of two inputs),
144 * - shared (CoCompletion, used by the second of two sources),
145 * - zero-input source actions,
146 * - Signallers that unblock waiters.
147 * Class Completion extends ForkJoinTask to enable async execution
148 * (adding no space overhead because we exploit its "tag" methods
149 * to maintain claims). It is also declared as Runnable to allow
150 * usage with arbitrary executors.
151 *
152 * Support for each kind of CompletionStage relies on a separate
153 * class, along with two CompletableFuture methods:
154 *
155 * * A Completion class with name X corresponding to function,
156 * prefaced with "Uni", "Bi", or "Or". Each class contains
157 * fields for source(s), actions, and dependent. They are
158 * boringly similar, differing from others only with respect to
159 * underlying functional forms. We do this so that users don't
160 * encounter layers of adapters in common usages.
161 *
162 * * Boolean CompletableFuture method x(...) (for example
163 * biApply) takes all of the arguments needed to check that an
164 * action is triggerable, and then either runs the action or
165 * arranges its async execution by executing its Completion
166 * argument, if present. The method returns true if known to be
167 * complete.
168 *
169 * * Completion method tryFire(int mode) invokes the associated x
170 * method with its held arguments, and on success cleans up.
171 * The mode argument allows tryFire to be called twice (SYNC,
172 * then ASYNC); the first to screen and trap exceptions while
173 * arranging to execute, and the second when called from a task.
174 * (A few classes are not used async so take slightly different
175 * forms.) The claim() callback suppresses function invocation
176 * if already claimed by another thread.
177 *
178 * * Some classes (for example UniApply) have separate handling
179 * code for when known to be thread-confined ("now" methods) and
180 * for when shared (in tryFire), for efficiency.
181 *
182 * * CompletableFuture method xStage(...) is called from a public
183 * stage method of CompletableFuture f. It screens user
184 * arguments and invokes and/or creates the stage object. If
185 * not async and already triggerable, the action is run
186 * immediately. Otherwise a Completion c is created, and
187 * submitted to the executor if triggerable, or pushed onto f's
188 * stack if not. Completion actions are started via c.tryFire.
189 * We recheck after pushing to a source future's stack to cover
190 * possible races if the source completes while pushing.
191 * Classes with two inputs (for example BiApply) deal with races
192 * across both while pushing actions. The second completion is
193 * a CoCompletion pointing to the first, shared so that at most
194 * one performs the action. The multiple-arity methods allOf
195 * and anyOf do this pairwise to form trees of completions.
196 *
197 * Note that the generic type parameters of methods vary according
198 * to whether "this" is a source, dependent, or completion.
199 *
200 * Method postComplete is called upon completion unless the target
201 * is guaranteed not to be observable (i.e., not yet returned or
202 * linked). Multiple threads can call postComplete, which
203 * atomically pops each dependent action, and tries to trigger it
204 * via method tryFire, in NESTED mode. Triggering can propagate
205 * recursively, so NESTED mode returns its completed dependent (if
206 * one exists) for further processing by its caller (see method
207 * postFire).
208 *
209 * Blocking methods get() and join() rely on Signaller Completions
210 * that wake up waiting threads. The mechanics are similar to
211 * Treiber stack wait-nodes used in FutureTask, Phaser, and
212 * SynchronousQueue. See their internal documentation for
213 * algorithmic details.
214 *
215 * Without precautions, CompletableFutures would be prone to
216 * garbage accumulation as chains of Completions build up, each
217 * pointing back to its sources. So we null out fields as soon as
218 * possible. The screening checks needed anyway harmlessly ignore
219 * null arguments that may have been obtained during races with
220 * threads nulling out fields. We also try to unlink non-isLive
221 * (fired or cancelled) Completions from stacks that might
222 * otherwise never be popped: Method cleanStack always unlinks non
223 * isLive completions from the head of stack; others may
224 * occasionally remain if racing with other cancellations or
225 * removals.
226 *
227 * Completion fields need not be declared as final or volatile
228 * because they are only visible to other threads upon safe
229 * publication.
230 */
231
232 volatile Object result; // Either the result or boxed AltResult
233 volatile Completion stack; // Top of Treiber stack of dependent actions
234
235 final boolean internalComplete(Object r) { // CAS from null to r
236 return RESULT.compareAndSet(this, null, r);
237 }
238
239 /** Returns true if successfully pushed c onto stack. */
240 final boolean tryPushStack(Completion c) {
241 Completion h = stack;
242 NEXT.set(c, h); // CAS piggyback
243 return STACK.compareAndSet(this, h, c);
244 }
245
246 /** Unconditionally pushes c onto stack, retrying if necessary. */
247 final void pushStack(Completion c) {
248 do {} while (!tryPushStack(c));
249 }
250
251 /* ------------- Encoding and decoding outcomes -------------- */
252
253 static final class AltResult { // See above
254 final Throwable ex; // null only for NIL
255 AltResult(Throwable x) { this.ex = x; }
256 }
257
258 /** The encoding of the null value. */
259 static final AltResult NIL = new AltResult(null);
260
261 /** Completes with the null value, unless already completed. */
262 final boolean completeNull() {
263 return RESULT.compareAndSet(this, null, NIL);
264 }
265
266 /** Returns the encoding of the given non-exceptional value. */
267 final Object encodeValue(T t) {
268 return (t == null) ? NIL : t;
269 }
270
271 /** Completes with a non-exceptional result, unless already completed. */
272 final boolean completeValue(T t) {
273 return RESULT.compareAndSet(this, null, (t == null) ? NIL : t);
274 }
275
276 /**
277 * Returns the encoding of the given (non-null) exception as a
278 * wrapped CompletionException unless it is one already.
279 */
280 static AltResult encodeThrowable(Throwable x) {
281 return new AltResult((x instanceof CompletionException) ? x :
282 new CompletionException(x));
283 }
284
285 /** Completes with an exceptional result, unless already completed. */
286 final boolean completeThrowable(Throwable x) {
287 return RESULT.compareAndSet(this, null, encodeThrowable(x));
288 }
289
290 /**
291 * Returns the encoding of the given (non-null) exception as a
292 * wrapped CompletionException unless it is one already. May
293 * return the given Object r (which must have been the result of a
294 * source future) if it is equivalent, i.e. if this is a simple
295 * relay of an existing CompletionException.
296 */
297 static Object encodeThrowable(Throwable x, Object r) {
298 if (!(x instanceof CompletionException))
299 x = new CompletionException(x);
300 else if (r instanceof AltResult && x == ((AltResult)r).ex)
301 return r;
302 return new AltResult(x);
303 }
304
305 /**
306 * Completes with the given (non-null) exceptional result as a
307 * wrapped CompletionException unless it is one already, unless
308 * already completed. May complete with the given Object r
309 * (which must have been the result of a source future) if it is
310 * equivalent, i.e. if this is a simple propagation of an
311 * existing CompletionException.
312 */
313 final boolean completeThrowable(Throwable x, Object r) {
314 return RESULT.compareAndSet(this, null, encodeThrowable(x, r));
315 }
316
317 /**
318 * Returns the encoding of the given arguments: if the exception
319 * is non-null, encodes as AltResult. Otherwise uses the given
320 * value, boxed as NIL if null.
321 */
322 Object encodeOutcome(T t, Throwable x) {
323 return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x);
324 }
325
326 /**
327 * Returns the encoding of a copied outcome; if exceptional,
328 * rewraps as a CompletionException, else returns argument.
329 */
330 static Object encodeRelay(Object r) {
331 Throwable x;
332 if (r instanceof AltResult
333 && (x = ((AltResult)r).ex) != null
334 && !(x instanceof CompletionException))
335 r = new AltResult(new CompletionException(x));
336 return r;
337 }
338
339 /**
340 * Completes with r or a copy of r, unless already completed.
341 * If exceptional, r is first coerced to a CompletionException.
342 */
343 final boolean completeRelay(Object r) {
344 return RESULT.compareAndSet(this, null, encodeRelay(r));
345 }
346
347 /**
348 * Reports result using Future.get conventions.
349 */
350 private static Object reportGet(Object r)
351 throws InterruptedException, ExecutionException {
352 if (r == null) // by convention below, null means interrupted
353 throw new InterruptedException();
354 if (r instanceof AltResult) {
355 Throwable x, cause;
356 if ((x = ((AltResult)r).ex) == null)
357 return null;
358 if (x instanceof CancellationException)
359 throw (CancellationException)x;
360 if ((x instanceof CompletionException) &&
361 (cause = x.getCause()) != null)
362 x = cause;
363 throw new ExecutionException(x);
364 }
365 return r;
366 }
367
368 /**
369 * Decodes outcome to return result or throw unchecked exception.
370 */
371 private static Object reportJoin(Object r) {
372 if (r instanceof AltResult) {
373 Throwable x;
374 if ((x = ((AltResult)r).ex) == null)
375 return null;
376 if (x instanceof CancellationException)
377 throw (CancellationException)x;
378 if (x instanceof CompletionException)
379 throw (CompletionException)x;
380 throw new CompletionException(x);
381 }
382 return r;
383 }
384
385 /* ------------- Async task preliminaries -------------- */
386
387 /**
388 * A marker interface identifying asynchronous tasks produced by
389 * {@code async} methods. This may be useful for monitoring,
390 * debugging, and tracking asynchronous activities.
391 *
392 * @since 1.8
393 */
394 public static interface AsynchronousCompletionTask {
395 }
396
397 private static final boolean USE_COMMON_POOL =
398 (ForkJoinPool.getCommonPoolParallelism() > 1);
399
400 /**
401 * Default executor -- ForkJoinPool.commonPool() unless it cannot
402 * support parallelism.
403 */
404 private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
405 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
406
407 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
408 static final class ThreadPerTaskExecutor implements Executor {
409 public void execute(Runnable r) { new Thread(r).start(); }
410 }
411
412 /**
413 * Null-checks user executor argument, and translates uses of
414 * commonPool to ASYNC_POOL in case parallelism disabled.
415 */
416 static Executor screenExecutor(Executor e) {
417 if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool())
418 return ASYNC_POOL;
419 if (e == null) throw new NullPointerException();
420 return e;
421 }
422
423 // Modes for Completion.tryFire. Signedness matters.
424 static final int SYNC = 0;
425 static final int ASYNC = 1;
426 static final int NESTED = -1;
427
428 /* ------------- Base Completion classes and operations -------------- */
429
430 @SuppressWarnings("serial")
431 abstract static class Completion extends ForkJoinTask<Void>
432 implements Runnable, AsynchronousCompletionTask {
433 volatile Completion next; // Treiber stack link
434
435 /**
436 * Performs completion action if triggered, returning a
437 * dependent that may need propagation, if one exists.
438 *
439 * @param mode SYNC, ASYNC, or NESTED
440 */
441 abstract CompletableFuture<?> tryFire(int mode);
442
443 /** Returns true if possibly still triggerable. Used by cleanStack. */
444 abstract boolean isLive();
445
446 public final void run() { tryFire(ASYNC); }
447 public final boolean exec() { tryFire(ASYNC); return false; }
448 public final Void getRawResult() { return null; }
449 public final void setRawResult(Void v) {}
450 }
451
452 /**
453 * Pops and tries to trigger all reachable dependents. Call only
454 * when known to be done.
455 */
456 final void postComplete() {
457 /*
458 * On each step, variable f holds current dependents to pop
459 * and run. It is extended along only one path at a time,
460 * pushing others to avoid unbounded recursion.
461 */
462 CompletableFuture<?> f = this; Completion h;
463 while ((h = f.stack) != null ||
464 (f != this && (h = (f = this).stack) != null)) {
465 CompletableFuture<?> d; Completion t;
466 if (STACK.compareAndSet(f, h, t = h.next)) {
467 if (t != null) {
468 if (f != this) {
469 pushStack(h);
470 continue;
471 }
472 NEXT.compareAndSet(h, t, null); // try to detach
473 }
474 f = (d = h.tryFire(NESTED)) == null ? this : d;
475 }
476 }
477 }
478
479 /** Traverses stack and unlinks one or more dead Completions, if found. */
480 final void cleanStack() {
481 boolean unlinked = false;
482 Completion p;
483 while ((p = stack) != null && !p.isLive()) // ensure head of stack live
484 unlinked = STACK.compareAndSet(this, p, p.next);
485 if (p != null && !unlinked) { // try to unlink first nonlive
486 for (Completion q = p.next; q != null;) {
487 Completion s = q.next;
488 if (q.isLive()) {
489 p = q;
490 q = s;
491 }
492 else {
493 NEXT.compareAndSet(p, q, s);
494 break;
495 }
496 }
497 }
498 }
499
500 /* ------------- One-input Completions -------------- */
501
502 /** A Completion with a source, dependent, and executor. */
503 @SuppressWarnings("serial")
504 abstract static class UniCompletion<T,V> extends Completion {
505 Executor executor; // executor to use (null if none)
506 CompletableFuture<V> dep; // the dependent to complete
507 CompletableFuture<T> src; // source for action
508
509 UniCompletion(Executor executor, CompletableFuture<V> dep,
510 CompletableFuture<T> src) {
511 this.executor = executor; this.dep = dep; this.src = src;
512 }
513
514 /**
515 * Returns true if action can be run. Call only when known to
516 * be triggerable. Uses FJ tag bit to ensure that only one
517 * thread claims ownership. If async, starts as task -- a
518 * later call to tryFire will run action.
519 */
520 final boolean claim() {
521 Executor e = executor;
522 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
523 if (e == null)
524 return true;
525 executor = null; // disable
526 e.execute(this);
527 }
528 return false;
529 }
530
531 final boolean isLive() { return dep != null; }
532 }
533
534 /**
535 * Pushes the given completion unless it completes while trying.
536 * Caller should first check that result is null.
537 */
538 final void unipush(Completion c) {
539 if (c != null) {
540 while (!tryPushStack(c)) {
541 if (result != null) {
542 NEXT.set(c, null);
543 break;
544 }
545 }
546 if (result != null)
547 c.tryFire(SYNC);
548 }
549 }
550
551 /**
552 * Post-processing by dependent after successful UniCompletion
553 * tryFire. Tries to clean stack of source a, and then either runs
554 * postComplete or returns this to caller, depending on mode.
555 */
556 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
557 if (a != null && a.stack != null) {
558 Object r;
559 if ((r = a.result) == null)
560 a.cleanStack();
561 if (mode >= 0 && (r != null || a.result != null))
562 a.postComplete();
563 }
564 if (result != null && stack != null) {
565 if (mode < 0)
566 return this;
567 else
568 postComplete();
569 }
570 return null;
571 }
572
573 @SuppressWarnings("serial")
574 static final class UniApply<T,V> extends UniCompletion<T,V> {
575 Function<? super T,? extends V> fn;
576 UniApply(Executor executor, CompletableFuture<V> dep,
577 CompletableFuture<T> src,
578 Function<? super T,? extends V> fn) {
579 super(executor, dep, src); this.fn = fn;
580 }
581 final CompletableFuture<V> tryFire(int mode) {
582 CompletableFuture<V> d; CompletableFuture<T> a;
583 Object r; Throwable x; Function<? super T,? extends V> f;
584 if ((d = dep) == null || (f = fn) == null
585 || (a = src) == null || (r = a.result) == null)
586 return null;
587 tryComplete: if (d.result == null) {
588 if (r instanceof AltResult) {
589 if ((x = ((AltResult)r).ex) != null) {
590 d.completeThrowable(x, r);
591 break tryComplete;
592 }
593 r = null;
594 }
595 try {
596 if (mode <= 0 && !claim())
597 return null;
598 else {
599 @SuppressWarnings("unchecked") T t = (T) r;
600 d.completeValue(f.apply(t));
601 }
602 } catch (Throwable ex) {
603 d.completeThrowable(ex);
604 }
605 }
606 dep = null; src = null; fn = null;
607 return d.postFire(a, mode);
608 }
609 }
610
611 private <V> CompletableFuture<V> uniApplyStage(
612 Executor e, Function<? super T,? extends V> f) {
613 if (f == null) throw new NullPointerException();
614 Object r;
615 if ((r = result) != null)
616 return uniApplyNow(r, e, f);
617 CompletableFuture<V> d = newIncompleteFuture();
618 unipush(new UniApply<T,V>(e, d, this, f));
619 return d;
620 }
621
622 private <V> CompletableFuture<V> uniApplyNow(
623 Object r, Executor e, Function<? super T,? extends V> f) {
624 Throwable x;
625 CompletableFuture<V> d = newIncompleteFuture();
626 if (r instanceof AltResult) {
627 if ((x = ((AltResult)r).ex) != null) {
628 d.result = encodeThrowable(x, r);
629 return d;
630 }
631 r = null;
632 }
633 try {
634 if (e != null) {
635 e.execute(new UniApply<T,V>(null, d, this, f));
636 } else {
637 @SuppressWarnings("unchecked") T t = (T) r;
638 d.result = d.encodeValue(f.apply(t));
639 }
640 } catch (Throwable ex) {
641 d.result = encodeThrowable(ex);
642 }
643 return d;
644 }
645
646 @SuppressWarnings("serial")
647 static final class UniAccept<T> extends UniCompletion<T,Void> {
648 Consumer<? super T> fn;
649 UniAccept(Executor executor, CompletableFuture<Void> dep,
650 CompletableFuture<T> src, Consumer<? super T> fn) {
651 super(executor, dep, src); this.fn = fn;
652 }
653 final CompletableFuture<Void> tryFire(int mode) {
654 CompletableFuture<Void> d; CompletableFuture<T> a;
655 Object r; Throwable x; Consumer<? super T> f;
656 if ((d = dep) == null || (f = fn) == null
657 || (a = src) == null || (r = a.result) == null)
658 return null;
659 tryComplete: if (d.result == null) {
660 if (r instanceof AltResult) {
661 if ((x = ((AltResult)r).ex) != null) {
662 d.completeThrowable(x, r);
663 break tryComplete;
664 }
665 r = null;
666 }
667 try {
668 if (mode <= 0 && !claim())
669 return null;
670 else {
671 @SuppressWarnings("unchecked") T t = (T) r;
672 f.accept(t);
673 d.completeNull();
674 }
675 } catch (Throwable ex) {
676 d.completeThrowable(ex);
677 }
678 }
679 dep = null; src = null; fn = null;
680 return d.postFire(a, mode);
681 }
682 }
683
684 private CompletableFuture<Void> uniAcceptStage(Executor e,
685 Consumer<? super T> f) {
686 if (f == null) throw new NullPointerException();
687 Object r;
688 if ((r = result) != null)
689 return uniAcceptNow(r, e, f);
690 CompletableFuture<Void> d = newIncompleteFuture();
691 unipush(new UniAccept<T>(e, d, this, f));
692 return d;
693 }
694
695 private CompletableFuture<Void> uniAcceptNow(
696 Object r, Executor e, Consumer<? super T> f) {
697 Throwable x;
698 CompletableFuture<Void> d = newIncompleteFuture();
699 if (r instanceof AltResult) {
700 if ((x = ((AltResult)r).ex) != null) {
701 d.result = encodeThrowable(x, r);
702 return d;
703 }
704 r = null;
705 }
706 try {
707 if (e != null) {
708 e.execute(new UniAccept<T>(null, d, this, f));
709 } else {
710 @SuppressWarnings("unchecked") T t = (T) r;
711 f.accept(t);
712 d.result = NIL;
713 }
714 } catch (Throwable ex) {
715 d.result = encodeThrowable(ex);
716 }
717 return d;
718 }
719
720 @SuppressWarnings("serial")
721 static final class UniRun<T> extends UniCompletion<T,Void> {
722 Runnable fn;
723 UniRun(Executor executor, CompletableFuture<Void> dep,
724 CompletableFuture<T> src, Runnable fn) {
725 super(executor, dep, src); this.fn = fn;
726 }
727 final CompletableFuture<Void> tryFire(int mode) {
728 CompletableFuture<Void> d; CompletableFuture<T> a;
729 Object r; Throwable x; Runnable f;
730 if ((d = dep) == null || (f = fn) == null
731 || (a = src) == null || (r = a.result) == null)
732 return null;
733 if (d.result == null) {
734 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
735 d.completeThrowable(x, r);
736 else
737 try {
738 if (mode <= 0 && !claim())
739 return null;
740 else {
741 f.run();
742 d.completeNull();
743 }
744 } catch (Throwable ex) {
745 d.completeThrowable(ex);
746 }
747 }
748 dep = null; src = null; fn = null;
749 return d.postFire(a, mode);
750 }
751 }
752
753 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
754 if (f == null) throw new NullPointerException();
755 Object r;
756 if ((r = result) != null)
757 return uniRunNow(r, e, f);
758 CompletableFuture<Void> d = newIncompleteFuture();
759 unipush(new UniRun<T>(e, d, this, f));
760 return d;
761 }
762
763 private CompletableFuture<Void> uniRunNow(Object r, Executor e, Runnable f) {
764 Throwable x;
765 CompletableFuture<Void> d = newIncompleteFuture();
766 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
767 d.result = encodeThrowable(x, r);
768 else
769 try {
770 if (e != null) {
771 e.execute(new UniRun<T>(null, d, this, f));
772 } else {
773 f.run();
774 d.result = NIL;
775 }
776 } catch (Throwable ex) {
777 d.result = encodeThrowable(ex);
778 }
779 return d;
780 }
781
782 @SuppressWarnings("serial")
783 static final class UniWhenComplete<T> extends UniCompletion<T,T> {
784 BiConsumer<? super T, ? super Throwable> fn;
785 UniWhenComplete(Executor executor, CompletableFuture<T> dep,
786 CompletableFuture<T> src,
787 BiConsumer<? super T, ? super Throwable> fn) {
788 super(executor, dep, src); this.fn = fn;
789 }
790 final CompletableFuture<T> tryFire(int mode) {
791 CompletableFuture<T> d; CompletableFuture<T> a;
792 Object r; BiConsumer<? super T, ? super Throwable> f;
793 if ((d = dep) == null || (f = fn) == null
794 || (a = src) == null || (r = a.result) == null
795 || !d.uniWhenComplete(r, f, mode > 0 ? null : this))
796 return null;
797 dep = null; src = null; fn = null;
798 return d.postFire(a, mode);
799 }
800 }
801
802 final boolean uniWhenComplete(Object r,
803 BiConsumer<? super T,? super Throwable> f,
804 UniWhenComplete<T> c) {
805 T t; Throwable x = null;
806 if (result == null) {
807 try {
808 if (c != null && !c.claim())
809 return false;
810 if (r instanceof AltResult) {
811 x = ((AltResult)r).ex;
812 t = null;
813 } else {
814 @SuppressWarnings("unchecked") T tr = (T) r;
815 t = tr;
816 }
817 f.accept(t, x);
818 if (x == null) {
819 internalComplete(r);
820 return true;
821 }
822 } catch (Throwable ex) {
823 if (x == null)
824 x = ex;
825 else if (x != ex)
826 x.addSuppressed(ex);
827 }
828 completeThrowable(x, r);
829 }
830 return true;
831 }
832
833 private CompletableFuture<T> uniWhenCompleteStage(
834 Executor e, BiConsumer<? super T, ? super Throwable> f) {
835 if (f == null) throw new NullPointerException();
836 CompletableFuture<T> d = newIncompleteFuture();
837 Object r;
838 if ((r = result) == null)
839 unipush(new UniWhenComplete<T>(e, d, this, f));
840 else if (e == null)
841 d.uniWhenComplete(r, f, null);
842 else {
843 try {
844 e.execute(new UniWhenComplete<T>(null, d, this, f));
845 } catch (Throwable ex) {
846 d.result = encodeThrowable(ex);
847 }
848 }
849 return d;
850 }
851
852 @SuppressWarnings("serial")
853 static final class UniHandle<T,V> extends UniCompletion<T,V> {
854 BiFunction<? super T, Throwable, ? extends V> fn;
855 UniHandle(Executor executor, CompletableFuture<V> dep,
856 CompletableFuture<T> src,
857 BiFunction<? super T, Throwable, ? extends V> fn) {
858 super(executor, dep, src); this.fn = fn;
859 }
860 final CompletableFuture<V> tryFire(int mode) {
861 CompletableFuture<V> d; CompletableFuture<T> a;
862 Object r; BiFunction<? super T, Throwable, ? extends V> f;
863 if ((d = dep) == null || (f = fn) == null
864 || (a = src) == null || (r = a.result) == null
865 || !d.uniHandle(r, f, mode > 0 ? null : this))
866 return null;
867 dep = null; src = null; fn = null;
868 return d.postFire(a, mode);
869 }
870 }
871
872 final <S> boolean uniHandle(Object r,
873 BiFunction<? super S, Throwable, ? extends T> f,
874 UniHandle<S,T> c) {
875 S s; Throwable x;
876 if (result == null) {
877 try {
878 if (c != null && !c.claim())
879 return false;
880 if (r instanceof AltResult) {
881 x = ((AltResult)r).ex;
882 s = null;
883 } else {
884 x = null;
885 @SuppressWarnings("unchecked") S ss = (S) r;
886 s = ss;
887 }
888 completeValue(f.apply(s, x));
889 } catch (Throwable ex) {
890 completeThrowable(ex);
891 }
892 }
893 return true;
894 }
895
896 private <V> CompletableFuture<V> uniHandleStage(
897 Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
898 if (f == null) throw new NullPointerException();
899 CompletableFuture<V> d = newIncompleteFuture();
900 Object r;
901 if ((r = result) == null)
902 unipush(new UniHandle<T,V>(e, d, this, f));
903 else if (e == null)
904 d.uniHandle(r, f, null);
905 else {
906 try {
907 e.execute(new UniHandle<T,V>(null, d, this, f));
908 } catch (Throwable ex) {
909 d.result = encodeThrowable(ex);
910 }
911 }
912 return d;
913 }
914
915 @SuppressWarnings("serial")
916 static final class UniExceptionally<T> extends UniCompletion<T,T> {
917 Function<? super Throwable, ? extends T> fn;
918 UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src,
919 Function<? super Throwable, ? extends T> fn) {
920 super(null, dep, src); this.fn = fn;
921 }
922 final CompletableFuture<T> tryFire(int mode) { // never ASYNC
923 // assert mode != ASYNC;
924 CompletableFuture<T> d; CompletableFuture<T> a;
925 Object r; Function<? super Throwable, ? extends T> f;
926 if ((d = dep) == null || (f = fn) == null
927 || (a = src) == null || (r = a.result) == null
928 || !d.uniExceptionally(r, f, this))
929 return null;
930 dep = null; src = null; fn = null;
931 return d.postFire(a, mode);
932 }
933 }
934
935 final boolean uniExceptionally(Object r,
936 Function<? super Throwable, ? extends T> f,
937 UniExceptionally<T> c) {
938 Throwable x;
939 if (result == null) {
940 try {
941 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) {
942 if (c != null && !c.claim())
943 return false;
944 completeValue(f.apply(x));
945 } else
946 internalComplete(r);
947 } catch (Throwable ex) {
948 completeThrowable(ex);
949 }
950 }
951 return true;
952 }
953
954 private CompletableFuture<T> uniExceptionallyStage(
955 Function<Throwable, ? extends T> f) {
956 if (f == null) throw new NullPointerException();
957 CompletableFuture<T> d = newIncompleteFuture();
958 Object r;
959 if ((r = result) == null)
960 unipush(new UniExceptionally<T>(d, this, f));
961 else
962 d.uniExceptionally(r, f, null);
963 return d;
964 }
965
966 @SuppressWarnings("serial")
967 static final class UniRelay<T> extends UniCompletion<T,T> {
968 UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) {
969 super(null, dep, src);
970 }
971 final CompletableFuture<T> tryFire(int mode) {
972 CompletableFuture<T> d; CompletableFuture<T> a; Object r;
973 if ((d = dep) == null
974 || (a = src) == null || (r = a.result) == null)
975 return null;
976 if (d.result == null)
977 d.completeRelay(r);
978 src = null; dep = null;
979 return d.postFire(a, mode);
980 }
981 }
982
983 private CompletableFuture<T> uniCopyStage() {
984 Object r;
985 CompletableFuture<T> d = newIncompleteFuture();
986 if ((r = result) != null)
987 d.result = encodeRelay(r);
988 else
989 unipush(new UniRelay<T>(d, this));
990 return d;
991 }
992
993 private MinimalStage<T> uniAsMinimalStage() {
994 Object r;
995 if ((r = result) != null)
996 return new MinimalStage<T>(encodeRelay(r));
997 MinimalStage<T> d = new MinimalStage<T>();
998 unipush(new UniRelay<T>(d, this));
999 return d;
1000 }
1001
1002 @SuppressWarnings("serial")
1003 static final class UniCompose<T,V> extends UniCompletion<T,V> {
1004 Function<? super T, ? extends CompletionStage<V>> fn;
1005 UniCompose(Executor executor, CompletableFuture<V> dep,
1006 CompletableFuture<T> src,
1007 Function<? super T, ? extends CompletionStage<V>> fn) {
1008 super(executor, dep, src); this.fn = fn;
1009 }
1010 final CompletableFuture<V> tryFire(int mode) {
1011 CompletableFuture<V> d; CompletableFuture<T> a;
1012 Function<? super T, ? extends CompletionStage<V>> f;
1013 Object r; Throwable x;
1014 if ((d = dep) == null || (f = fn) == null
1015 || (a = src) == null || (r = a.result) == null)
1016 return null;
1017 tryComplete: if (d.result == null) {
1018 if (r instanceof AltResult) {
1019 if ((x = ((AltResult)r).ex) != null) {
1020 d.completeThrowable(x, r);
1021 break tryComplete;
1022 }
1023 r = null;
1024 }
1025 try {
1026 if (mode <= 0 && !claim())
1027 return null;
1028 @SuppressWarnings("unchecked") T t = (T) r;
1029 CompletableFuture<V> g = f.apply(t).toCompletableFuture();
1030 if ((r = g.result) != null)
1031 d.completeRelay(r);
1032 else {
1033 g.unipush(new UniRelay<V>(d, g));
1034 if (d.result == null)
1035 return null;
1036 }
1037 } catch (Throwable ex) {
1038 d.completeThrowable(ex);
1039 }
1040 }
1041 dep = null; src = null; fn = null;
1042 return d.postFire(a, mode);
1043 }
1044 }
1045
1046 private <V> CompletableFuture<V> uniComposeStage(
1047 Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
1048 if (f == null) throw new NullPointerException();
1049 CompletableFuture<V> d = newIncompleteFuture();
1050 Object r, s; Throwable x;
1051 if ((r = result) == null)
1052 unipush(new UniCompose<T,V>(e, d, this, f));
1053 else if (e == null) {
1054 if (r instanceof AltResult) {
1055 if ((x = ((AltResult)r).ex) != null) {
1056 d.result = encodeThrowable(x, r);
1057 return d;
1058 }
1059 r = null;
1060 }
1061 try {
1062 @SuppressWarnings("unchecked") T t = (T) r;
1063 CompletableFuture<V> g = f.apply(t).toCompletableFuture();
1064 if ((s = g.result) != null)
1065 d.result = encodeRelay(s);
1066 else {
1067 g.unipush(new UniRelay<V>(d, g));
1068 }
1069 } catch (Throwable ex) {
1070 d.result = encodeThrowable(ex);
1071 }
1072 }
1073 else
1074 try {
1075 e.execute(new UniCompose<T,V>(null, d, this, f));
1076 } catch (Throwable ex) {
1077 d.result = encodeThrowable(ex);
1078 }
1079 return d;
1080 }
1081
1082 /* ------------- Two-input Completions -------------- */
1083
1084 /** A Completion for an action with two sources */
1085 @SuppressWarnings("serial")
1086 abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {
1087 CompletableFuture<U> snd; // second source for action
1088 BiCompletion(Executor executor, CompletableFuture<V> dep,
1089 CompletableFuture<T> src, CompletableFuture<U> snd) {
1090 super(executor, dep, src); this.snd = snd;
1091 }
1092 }
1093
1094 /** A Completion delegating to a BiCompletion */
1095 @SuppressWarnings("serial")
1096 static final class CoCompletion extends Completion {
1097 BiCompletion<?,?,?> base;
1098 CoCompletion(BiCompletion<?,?,?> base) { this.base = base; }
1099 final CompletableFuture<?> tryFire(int mode) {
1100 BiCompletion<?,?,?> c; CompletableFuture<?> d;
1101 if ((c = base) == null || (d = c.tryFire(mode)) == null)
1102 return null;
1103 base = null; // detach
1104 return d;
1105 }
1106 final boolean isLive() {
1107 BiCompletion<?,?,?> c;
1108 return (c = base) != null
1109 // && c.isLive()
1110 && c.dep != null;
1111 }
1112 }
1113
1114 /**
1115 * Pushes completion to this and b unless both done.
1116 * Caller should first check that either result or b.result is null.
1117 */
1118 final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
1119 if (c != null) {
1120 while (result == null) {
1121 if (tryPushStack(c)) {
1122 if (b.result == null)
1123 b.unipush(new CoCompletion(c));
1124 else if (result != null)
1125 c.tryFire(SYNC);
1126 return;
1127 }
1128 }
1129 b.unipush(c);
1130 }
1131 }
1132
1133 /** Post-processing after successful BiCompletion tryFire. */
1134 final CompletableFuture<T> postFire(CompletableFuture<?> a,
1135 CompletableFuture<?> b, int mode) {
1136 if (b != null && b.stack != null) { // clean second source
1137 Object r;
1138 if ((r = b.result) == null)
1139 b.cleanStack();
1140 if (mode >= 0 && (r != null || b.result != null))
1141 b.postComplete();
1142 }
1143 return postFire(a, mode);
1144 }
1145
1146 @SuppressWarnings("serial")
1147 static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {
1148 BiFunction<? super T,? super U,? extends V> fn;
1149 BiApply(Executor executor, CompletableFuture<V> dep,
1150 CompletableFuture<T> src, CompletableFuture<U> snd,
1151 BiFunction<? super T,? super U,? extends V> fn) {
1152 super(executor, dep, src, snd); this.fn = fn;
1153 }
1154 final CompletableFuture<V> tryFire(int mode) {
1155 CompletableFuture<V> d;
1156 CompletableFuture<T> a;
1157 CompletableFuture<U> b;
1158 Object r, s; BiFunction<? super T,? super U,? extends V> f;
1159 if ((d = dep) == null || (f = fn) == null
1160 || (a = src) == null || (r = a.result) == null
1161 || (b = snd) == null || (s = b.result) == null
1162 || !d.biApply(r, s, f, mode > 0 ? null : this))
1163 return null;
1164 dep = null; src = null; snd = null; fn = null;
1165 return d.postFire(a, b, mode);
1166 }
1167 }
1168
1169 final <R,S> boolean biApply(Object r, Object s,
1170 BiFunction<? super R,? super S,? extends T> f,
1171 BiApply<R,S,T> c) {
1172 Throwable x;
1173 tryComplete: if (result == null) {
1174 if (r instanceof AltResult) {
1175 if ((x = ((AltResult)r).ex) != null) {
1176 completeThrowable(x, r);
1177 break tryComplete;
1178 }
1179 r = null;
1180 }
1181 if (s instanceof AltResult) {
1182 if ((x = ((AltResult)s).ex) != null) {
1183 completeThrowable(x, s);
1184 break tryComplete;
1185 }
1186 s = null;
1187 }
1188 try {
1189 if (c != null && !c.claim())
1190 return false;
1191 @SuppressWarnings("unchecked") R rr = (R) r;
1192 @SuppressWarnings("unchecked") S ss = (S) s;
1193 completeValue(f.apply(rr, ss));
1194 } catch (Throwable ex) {
1195 completeThrowable(ex);
1196 }
1197 }
1198 return true;
1199 }
1200
1201 private <U,V> CompletableFuture<V> biApplyStage(
1202 Executor e, CompletionStage<U> o,
1203 BiFunction<? super T,? super U,? extends V> f) {
1204 CompletableFuture<U> b; Object r, s;
1205 if (f == null || (b = o.toCompletableFuture()) == null)
1206 throw new NullPointerException();
1207 CompletableFuture<V> d = newIncompleteFuture();
1208 if ((r = result) == null || (s = b.result) == null)
1209 bipush(b, new BiApply<T,U,V>(e, d, this, b, f));
1210 else if (e == null)
1211 d.biApply(r, s, f, null);
1212 else
1213 try {
1214 e.execute(new BiApply<T,U,V>(null, d, this, b, f));
1215 } catch (Throwable ex) {
1216 d.result = encodeThrowable(ex);
1217 }
1218 return d;
1219 }
1220
1221 @SuppressWarnings("serial")
1222 static final class BiAccept<T,U> extends BiCompletion<T,U,Void> {
1223 BiConsumer<? super T,? super U> fn;
1224 BiAccept(Executor executor, CompletableFuture<Void> dep,
1225 CompletableFuture<T> src, CompletableFuture<U> snd,
1226 BiConsumer<? super T,? super U> fn) {
1227 super(executor, dep, src, snd); this.fn = fn;
1228 }
1229 final CompletableFuture<Void> tryFire(int mode) {
1230 CompletableFuture<Void> d;
1231 CompletableFuture<T> a;
1232 CompletableFuture<U> b;
1233 Object r, s; BiConsumer<? super T,? super U> f;
1234 if ((d = dep) == null || (f = fn) == null
1235 || (a = src) == null || (r = a.result) == null
1236 || (b = snd) == null || (s = b.result) == null
1237 || !d.biAccept(r, s, f, mode > 0 ? null : this))
1238 return null;
1239 dep = null; src = null; snd = null; fn = null;
1240 return d.postFire(a, b, mode);
1241 }
1242 }
1243
1244 final <R,S> boolean biAccept(Object r, Object s,
1245 BiConsumer<? super R,? super S> f,
1246 BiAccept<R,S> c) {
1247 Throwable x;
1248 tryComplete: if (result == null) {
1249 if (r instanceof AltResult) {
1250 if ((x = ((AltResult)r).ex) != null) {
1251 completeThrowable(x, r);
1252 break tryComplete;
1253 }
1254 r = null;
1255 }
1256 if (s instanceof AltResult) {
1257 if ((x = ((AltResult)s).ex) != null) {
1258 completeThrowable(x, s);
1259 break tryComplete;
1260 }
1261 s = null;
1262 }
1263 try {
1264 if (c != null && !c.claim())
1265 return false;
1266 @SuppressWarnings("unchecked") R rr = (R) r;
1267 @SuppressWarnings("unchecked") S ss = (S) s;
1268 f.accept(rr, ss);
1269 completeNull();
1270 } catch (Throwable ex) {
1271 completeThrowable(ex);
1272 }
1273 }
1274 return true;
1275 }
1276
1277 private <U> CompletableFuture<Void> biAcceptStage(
1278 Executor e, CompletionStage<U> o,
1279 BiConsumer<? super T,? super U> f) {
1280 CompletableFuture<U> b; Object r, s;
1281 if (f == null || (b = o.toCompletableFuture()) == null)
1282 throw new NullPointerException();
1283 CompletableFuture<Void> d = newIncompleteFuture();
1284 if ((r = result) == null || (s = b.result) == null)
1285 bipush(b, new BiAccept<T,U>(e, d, this, b, f));
1286 else if (e == null)
1287 d.biAccept(r, s, f, null);
1288 else
1289 try {
1290 e.execute(new BiAccept<T,U>(null, d, this, b, f));
1291 } catch (Throwable ex) {
1292 d.result = encodeThrowable(ex);
1293 }
1294 return d;
1295 }
1296
1297 @SuppressWarnings("serial")
1298 static final class BiRun<T,U> extends BiCompletion<T,U,Void> {
1299 Runnable fn;
1300 BiRun(Executor executor, CompletableFuture<Void> dep,
1301 CompletableFuture<T> src,
1302 CompletableFuture<U> snd,
1303 Runnable fn) {
1304 super(executor, dep, src, snd); this.fn = fn;
1305 }
1306 final CompletableFuture<Void> tryFire(int mode) {
1307 CompletableFuture<Void> d;
1308 CompletableFuture<T> a;
1309 CompletableFuture<U> b;
1310 Object r, s; Runnable f;
1311 if ((d = dep) == null || (f = fn) == null
1312 || (a = src) == null || (r = a.result) == null
1313 || (b = snd) == null || (s = b.result) == null
1314 || !d.biRun(r, s, f, mode > 0 ? null : this))
1315 return null;
1316 dep = null; src = null; snd = null; fn = null;
1317 return d.postFire(a, b, mode);
1318 }
1319 }
1320
1321 final boolean biRun(Object r, Object s, Runnable f, BiRun<?,?> c) {
1322 Throwable x; Object z;
1323 if (result == null) {
1324 if ((r instanceof AltResult
1325 && (x = ((AltResult)(z = r)).ex) != null) ||
1326 (s instanceof AltResult
1327 && (x = ((AltResult)(z = s)).ex) != null))
1328 completeThrowable(x, z);
1329 else
1330 try {
1331 if (c != null && !c.claim())
1332 return false;
1333 f.run();
1334 completeNull();
1335 } catch (Throwable ex) {
1336 completeThrowable(ex);
1337 }
1338 }
1339 return true;
1340 }
1341
1342 private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o,
1343 Runnable f) {
1344 CompletableFuture<?> b; Object r, s;
1345 if (f == null || (b = o.toCompletableFuture()) == null)
1346 throw new NullPointerException();
1347 CompletableFuture<Void> d = newIncompleteFuture();
1348 if ((r = result) == null || (s = b.result) == null)
1349 bipush(b, new BiRun<>(e, d, this, b, f));
1350 else if (e == null)
1351 d.biRun(r, s, f, null);
1352 else
1353 try {
1354 e.execute(new BiRun<>(null, d, this, b, f));
1355 } catch (Throwable ex) {
1356 d.result = encodeThrowable(ex);
1357 }
1358 return d;
1359 }
1360
1361 @SuppressWarnings("serial")
1362 static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And
1363 BiRelay(CompletableFuture<Void> dep,
1364 CompletableFuture<T> src,
1365 CompletableFuture<U> snd) {
1366 super(null, dep, src, snd);
1367 }
1368 final CompletableFuture<Void> tryFire(int mode) {
1369 CompletableFuture<Void> d;
1370 CompletableFuture<T> a;
1371 CompletableFuture<U> b;
1372 Object r, s, z; Throwable x;
1373 if ((d = dep) == null
1374 || (a = src) == null || (r = a.result) == null
1375 || (b = snd) == null || (s = b.result) == null)
1376 return null;
1377 if (d.result == null) {
1378 if ((r instanceof AltResult
1379 && (x = ((AltResult)(z = r)).ex) != null) ||
1380 (s instanceof AltResult
1381 && (x = ((AltResult)(z = s)).ex) != null))
1382 d.completeThrowable(x, z);
1383 else
1384 d.completeNull();
1385 }
1386 src = null; snd = null; dep = null;
1387 return d.postFire(a, b, mode);
1388 }
1389 }
1390
1391 /** Recursively constructs a tree of completions. */
1392 static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
1393 int lo, int hi) {
1394 CompletableFuture<Void> d = new CompletableFuture<Void>();
1395 if (lo > hi) // empty
1396 d.result = NIL;
1397 else {
1398 CompletableFuture<?> a, b; Object r, s, z; Throwable x;
1399 int mid = (lo + hi) >>> 1;
1400 if ((a = (lo == mid ? cfs[lo] :
1401 andTree(cfs, lo, mid))) == null ||
1402 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
1403 andTree(cfs, mid+1, hi))) == null)
1404 throw new NullPointerException();
1405 if ((r = a.result) == null || (s = b.result) == null)
1406 a.bipush(b, new BiRelay<>(d, a, b));
1407 else if ((r instanceof AltResult
1408 && (x = ((AltResult)(z = r)).ex) != null) ||
1409 (s instanceof AltResult
1410 && (x = ((AltResult)(z = s)).ex) != null))
1411 d.result = encodeThrowable(x, z);
1412 else
1413 d.result = NIL;
1414 }
1415 return d;
1416 }
1417
1418 /* ------------- Projected (Ored) BiCompletions -------------- */
1419
1420 /**
1421 * Pushes completion to this and b unless either done.
1422 * Caller should first check that result and b.result are both null.
1423 */
1424 final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
1425 if (c != null) {
1426 while (!tryPushStack(c)) {
1427 if (result != null) {
1428 NEXT.set(c, null);
1429 break;
1430 }
1431 }
1432 if (result != null)
1433 c.tryFire(SYNC);
1434 else
1435 b.unipush(new CoCompletion(c));
1436 }
1437 }
1438
1439 @SuppressWarnings("serial")
1440 static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> {
1441 Function<? super T,? extends V> fn;
1442 OrApply(Executor executor, CompletableFuture<V> dep,
1443 CompletableFuture<T> src,
1444 CompletableFuture<U> snd,
1445 Function<? super T,? extends V> fn) {
1446 super(executor, dep, src, snd); this.fn = fn;
1447 }
1448 final CompletableFuture<V> tryFire(int mode) {
1449 CompletableFuture<V> d;
1450 CompletableFuture<T> a;
1451 CompletableFuture<U> b;
1452 Object r; Throwable x; Function<? super T,? extends V> f;
1453 if ((d = dep) == null || (f = fn) == null
1454 || (a = src) == null || (b = snd) == null
1455 || ((r = a.result) == null && (r = b.result) == null))
1456 return null;
1457 tryComplete: if (d.result == null) {
1458 try {
1459 if (mode <= 0 && !claim())
1460 return null;
1461 if (r instanceof AltResult) {
1462 if ((x = ((AltResult)r).ex) != null) {
1463 d.completeThrowable(x, r);
1464 break tryComplete;
1465 }
1466 r = null;
1467 }
1468 @SuppressWarnings("unchecked") T t = (T) r;
1469 d.completeValue(f.apply(t));
1470 } catch (Throwable ex) {
1471 d.completeThrowable(ex);
1472 }
1473 }
1474 dep = null; src = null; snd = null; fn = null;
1475 return d.postFire(a, b, mode);
1476 }
1477 }
1478
1479 private <U extends T,V> CompletableFuture<V> orApplyStage(
1480 Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) {
1481 CompletableFuture<U> b;
1482 if (f == null || (b = o.toCompletableFuture()) == null)
1483 throw new NullPointerException();
1484
1485 Object r; CompletableFuture<? extends T> z;
1486 if ((r = (z = this).result) != null ||
1487 (r = (z = b).result) != null)
1488 return z.uniApplyNow(r, e, f);
1489
1490 CompletableFuture<V> d = newIncompleteFuture();
1491 orpush(b, new OrApply<T,U,V>(e, d, this, b, f));
1492 return d;
1493 }
1494
1495 @SuppressWarnings("serial")
1496 static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> {
1497 Consumer<? super T> fn;
1498 OrAccept(Executor executor, CompletableFuture<Void> dep,
1499 CompletableFuture<T> src,
1500 CompletableFuture<U> snd,
1501 Consumer<? super T> fn) {
1502 super(executor, dep, src, snd); this.fn = fn;
1503 }
1504 final CompletableFuture<Void> tryFire(int mode) {
1505 CompletableFuture<Void> d;
1506 CompletableFuture<T> a;
1507 CompletableFuture<U> b;
1508 Object r; Throwable x; Consumer<? super T> f;
1509 if ((d = dep) == null || (f = fn) == null
1510 || (a = src) == null || (b = snd) == null
1511 || ((r = a.result) == null && (r = b.result) == null))
1512 return null;
1513 tryComplete: if (d.result == null) {
1514 try {
1515 if (mode <= 0 && !claim())
1516 return null;
1517 if (r instanceof AltResult) {
1518 if ((x = ((AltResult)r).ex) != null) {
1519 d.completeThrowable(x, r);
1520 break tryComplete;
1521 }
1522 r = null;
1523 }
1524 @SuppressWarnings("unchecked") T t = (T) r;
1525 f.accept(t);
1526 d.completeNull();
1527 } catch (Throwable ex) {
1528 d.completeThrowable(ex);
1529 }
1530 }
1531 dep = null; src = null; snd = null; fn = null;
1532 return d.postFire(a, b, mode);
1533 }
1534 }
1535
1536 private <U extends T> CompletableFuture<Void> orAcceptStage(
1537 Executor e, CompletionStage<U> o, Consumer<? super T> f) {
1538 CompletableFuture<U> b;
1539 if (f == null || (b = o.toCompletableFuture()) == null)
1540 throw new NullPointerException();
1541
1542 Object r; CompletableFuture<? extends T> z;
1543 if ((r = (z = this).result) != null ||
1544 (r = (z = b).result) != null)
1545 return z.uniAcceptNow(r, e, f);
1546
1547 CompletableFuture<Void> d = newIncompleteFuture();
1548 orpush(b, new OrAccept<T,U>(e, d, this, b, f));
1549 return d;
1550 }
1551
1552 @SuppressWarnings("serial")
1553 static final class OrRun<T,U> extends BiCompletion<T,U,Void> {
1554 Runnable fn;
1555 OrRun(Executor executor, CompletableFuture<Void> dep,
1556 CompletableFuture<T> src,
1557 CompletableFuture<U> snd,
1558 Runnable fn) {
1559 super(executor, dep, src, snd); this.fn = fn;
1560 }
1561 final CompletableFuture<Void> tryFire(int mode) {
1562 CompletableFuture<Void> d;
1563 CompletableFuture<T> a;
1564 CompletableFuture<U> b;
1565 Object r; Throwable x; Runnable f;
1566 if ((d = dep) == null || (f = fn) == null
1567 || (a = src) == null || (b = snd) == null
1568 || ((r = a.result) == null && (r = b.result) == null))
1569 return null;
1570 if (d.result == null) {
1571 try {
1572 if (mode <= 0 && !claim())
1573 return null;
1574 else if (r instanceof AltResult
1575 && (x = ((AltResult)r).ex) != null)
1576 d.completeThrowable(x, r);
1577 else {
1578 f.run();
1579 d.completeNull();
1580 }
1581 } catch (Throwable ex) {
1582 d.completeThrowable(ex);
1583 }
1584 }
1585 dep = null; src = null; snd = null; fn = null;
1586 return d.postFire(a, b, mode);
1587 }
1588 }
1589
1590 private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o,
1591 Runnable f) {
1592 CompletableFuture<?> b;
1593 if (f == null || (b = o.toCompletableFuture()) == null)
1594 throw new NullPointerException();
1595
1596 Object r; CompletableFuture<?> z;
1597 if ((r = (z = this).result) != null ||
1598 (r = (z = b).result) != null)
1599 return z.uniRunNow(r, e, f);
1600
1601 CompletableFuture<Void> d = newIncompleteFuture();
1602 orpush(b, new OrRun<>(e, d, this, b, f));
1603 return d;
1604 }
1605
1606 @SuppressWarnings("serial")
1607 static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or
1608 OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src,
1609 CompletableFuture<U> snd) {
1610 super(null, dep, src, snd);
1611 }
1612 final CompletableFuture<Object> tryFire(int mode) {
1613 CompletableFuture<Object> d;
1614 CompletableFuture<T> a;
1615 CompletableFuture<U> b;
1616 Object r;
1617 if ((d = dep) == null
1618 || (a = src) == null || (b = snd) == null
1619 || ((r = a.result) == null && (r = b.result) == null))
1620 return null;
1621 d.completeRelay(r);
1622 src = null; snd = null; dep = null;
1623 return d.postFire(a, b, mode);
1624 }
1625 }
1626
1627 /** Recursively constructs a tree of completions. */
1628 static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
1629 int lo, int hi) {
1630 CompletableFuture<Object> d = new CompletableFuture<Object>();
1631 if (lo <= hi) {
1632 CompletableFuture<?> a, b; Object r;
1633 int mid = (lo + hi) >>> 1;
1634 if ((a = (lo == mid ? cfs[lo] :
1635 orTree(cfs, lo, mid))) == null ||
1636 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
1637 orTree(cfs, mid+1, hi))) == null)
1638 throw new NullPointerException();
1639 if ((r = a.result) != null && (r = b.result) != null)
1640 d.result = encodeRelay(r);
1641 else
1642 a.orpush(b, new OrRelay<>(d, a, b));
1643 }
1644 return d;
1645 }
1646
1647 /* ------------- Zero-input Async forms -------------- */
1648
1649 @SuppressWarnings("serial")
1650 static final class AsyncSupply<T> extends ForkJoinTask<Void>
1651 implements Runnable, AsynchronousCompletionTask {
1652 CompletableFuture<T> dep; Supplier<? extends T> fn;
1653 AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {
1654 this.dep = dep; this.fn = fn;
1655 }
1656
1657 public final Void getRawResult() { return null; }
1658 public final void setRawResult(Void v) {}
1659 public final boolean exec() { run(); return false; }
1660
1661 public void run() {
1662 CompletableFuture<T> d; Supplier<? extends T> f;
1663 if ((d = dep) != null && (f = fn) != null) {
1664 dep = null; fn = null;
1665 if (d.result == null) {
1666 try {
1667 d.completeValue(f.get());
1668 } catch (Throwable ex) {
1669 d.completeThrowable(ex);
1670 }
1671 }
1672 d.postComplete();
1673 }
1674 }
1675 }
1676
1677 static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
1678 Supplier<U> f) {
1679 if (f == null) throw new NullPointerException();
1680 CompletableFuture<U> d = new CompletableFuture<U>();
1681 e.execute(new AsyncSupply<U>(d, f));
1682 return d;
1683 }
1684
1685 @SuppressWarnings("serial")
1686 static final class AsyncRun extends ForkJoinTask<Void>
1687 implements Runnable, AsynchronousCompletionTask {
1688 CompletableFuture<Void> dep; Runnable fn;
1689 AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
1690 this.dep = dep; this.fn = fn;
1691 }
1692
1693 public final Void getRawResult() { return null; }
1694 public final void setRawResult(Void v) {}
1695 public final boolean exec() { run(); return false; }
1696
1697 public void run() {
1698 CompletableFuture<Void> d; Runnable f;
1699 if ((d = dep) != null && (f = fn) != null) {
1700 dep = null; fn = null;
1701 if (d.result == null) {
1702 try {
1703 f.run();
1704 d.completeNull();
1705 } catch (Throwable ex) {
1706 d.completeThrowable(ex);
1707 }
1708 }
1709 d.postComplete();
1710 }
1711 }
1712 }
1713
1714 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
1715 if (f == null) throw new NullPointerException();
1716 CompletableFuture<Void> d = new CompletableFuture<Void>();
1717 e.execute(new AsyncRun(d, f));
1718 return d;
1719 }
1720
1721 /* ------------- Signallers -------------- */
1722
1723 /**
1724 * Completion for recording and releasing a waiting thread. This
1725 * class implements ManagedBlocker to avoid starvation when
1726 * blocking actions pile up in ForkJoinPools.
1727 */
1728 @SuppressWarnings("serial")
1729 static final class Signaller extends Completion
1730 implements ForkJoinPool.ManagedBlocker {
1731 long nanos; // remaining wait time if timed
1732 final long deadline; // non-zero if timed
1733 final boolean interruptible;
1734 boolean interrupted;
1735 volatile Thread thread;
1736
1737 Signaller(boolean interruptible, long nanos, long deadline) {
1738 this.thread = Thread.currentThread();
1739 this.interruptible = interruptible;
1740 this.nanos = nanos;
1741 this.deadline = deadline;
1742 }
1743 final CompletableFuture<?> tryFire(int ignore) {
1744 Thread w; // no need to atomically claim
1745 if ((w = thread) != null) {
1746 thread = null;
1747 LockSupport.unpark(w);
1748 }
1749 return null;
1750 }
1751 public boolean isReleasable() {
1752 if (Thread.interrupted())
1753 interrupted = true;
1754 return ((interrupted && interruptible) ||
1755 (deadline != 0L &&
1756 (nanos <= 0L ||
1757 (nanos = deadline - System.nanoTime()) <= 0L)) ||
1758 thread == null);
1759 }
1760 public boolean block() {
1761 while (!isReleasable()) {
1762 if (deadline == 0L)
1763 LockSupport.park(this);
1764 else
1765 LockSupport.parkNanos(this, nanos);
1766 }
1767 return true;
1768 }
1769 final boolean isLive() { return thread != null; }
1770 }
1771
1772 /**
1773 * Returns raw result after waiting, or null if interruptible and
1774 * interrupted.
1775 */
1776 private Object waitingGet(boolean interruptible) {
1777 Signaller q = null;
1778 boolean queued = false;
1779 Object r;
1780 while ((r = result) == null) {
1781 if (q == null) {
1782 q = new Signaller(interruptible, 0L, 0L);
1783 if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1784 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1785 }
1786 else if (!queued)
1787 queued = tryPushStack(q);
1788 else {
1789 try {
1790 ForkJoinPool.managedBlock(q);
1791 } catch (InterruptedException ie) { // currently cannot happen
1792 q.interrupted = true;
1793 }
1794 if (q.interrupted && interruptible)
1795 break;
1796 }
1797 }
1798 if (q != null && queued) {
1799 q.thread = null;
1800 if (!interruptible && q.interrupted)
1801 Thread.currentThread().interrupt();
1802 if (r == null)
1803 cleanStack();
1804 }
1805 if (r != null || (r = result) != null)
1806 postComplete();
1807 return r;
1808 }
1809
1810 /**
1811 * Returns raw result after waiting, or null if interrupted, or
1812 * throws TimeoutException on timeout.
1813 */
1814 private Object timedGet(long nanos) throws TimeoutException {
1815 if (Thread.interrupted())
1816 return null;
1817 if (nanos > 0L) {
1818 long d = System.nanoTime() + nanos;
1819 long deadline = (d == 0L) ? 1L : d; // avoid 0
1820 Signaller q = null;
1821 boolean queued = false;
1822 Object r;
1823 while ((r = result) == null) { // similar to untimed
1824 if (q == null) {
1825 q = new Signaller(true, nanos, deadline);
1826 if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1827 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1828 }
1829 else if (!queued)
1830 queued = tryPushStack(q);
1831 else if (q.nanos <= 0L)
1832 break;
1833 else {
1834 try {
1835 ForkJoinPool.managedBlock(q);
1836 } catch (InterruptedException ie) {
1837 q.interrupted = true;
1838 }
1839 if (q.interrupted)
1840 break;
1841 }
1842 }
1843 if (q != null && queued) {
1844 q.thread = null;
1845 if (r == null)
1846 cleanStack();
1847 }
1848 if (r != null || (r = result) != null)
1849 postComplete();
1850 if (r != null || (q != null && q.interrupted))
1851 return r;
1852 }
1853 throw new TimeoutException();
1854 }
1855
1856 /* ------------- public methods -------------- */
1857
1858 /**
1859 * Creates a new incomplete CompletableFuture.
1860 */
1861 public CompletableFuture() {
1862 }
1863
1864 /**
1865 * Creates a new complete CompletableFuture with given encoded result.
1866 */
1867 CompletableFuture(Object r) {
1868 this.result = r;
1869 }
1870
1871 /**
1872 * Returns a new CompletableFuture that is asynchronously completed
1873 * by a task running in the {@link ForkJoinPool#commonPool()} with
1874 * the value obtained by calling the given Supplier.
1875 *
1876 * @param supplier a function returning the value to be used
1877 * to complete the returned CompletableFuture
1878 * @param <U> the function's return type
1879 * @return the new CompletableFuture
1880 */
1881 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1882 return asyncSupplyStage(ASYNC_POOL, supplier);
1883 }
1884
1885 /**
1886 * Returns a new CompletableFuture that is asynchronously completed
1887 * by a task running in the given executor with the value obtained
1888 * by calling the given Supplier.
1889 *
1890 * @param supplier a function returning the value to be used
1891 * to complete the returned CompletableFuture
1892 * @param executor the executor to use for asynchronous execution
1893 * @param <U> the function's return type
1894 * @return the new CompletableFuture
1895 */
1896 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1897 Executor executor) {
1898 return asyncSupplyStage(screenExecutor(executor), supplier);
1899 }
1900
1901 /**
1902 * Returns a new CompletableFuture that is asynchronously completed
1903 * by a task running in the {@link ForkJoinPool#commonPool()} after
1904 * it runs the given action.
1905 *
1906 * @param runnable the action to run before completing the
1907 * returned CompletableFuture
1908 * @return the new CompletableFuture
1909 */
1910 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1911 return asyncRunStage(ASYNC_POOL, runnable);
1912 }
1913
1914 /**
1915 * Returns a new CompletableFuture that is asynchronously completed
1916 * by a task running in the given executor after it runs the given
1917 * action.
1918 *
1919 * @param runnable the action to run before completing the
1920 * returned CompletableFuture
1921 * @param executor the executor to use for asynchronous execution
1922 * @return the new CompletableFuture
1923 */
1924 public static CompletableFuture<Void> runAsync(Runnable runnable,
1925 Executor executor) {
1926 return asyncRunStage(screenExecutor(executor), runnable);
1927 }
1928
1929 /**
1930 * Returns a new CompletableFuture that is already completed with
1931 * the given value.
1932 *
1933 * @param value the value
1934 * @param <U> the type of the value
1935 * @return the completed CompletableFuture
1936 */
1937 public static <U> CompletableFuture<U> completedFuture(U value) {
1938 return new CompletableFuture<U>((value == null) ? NIL : value);
1939 }
1940
1941 /**
1942 * Returns {@code true} if completed in any fashion: normally,
1943 * exceptionally, or via cancellation.
1944 *
1945 * @return {@code true} if completed
1946 */
1947 public boolean isDone() {
1948 return result != null;
1949 }
1950
1951 /**
1952 * Waits if necessary for this future to complete, and then
1953 * returns its result.
1954 *
1955 * @return the result value
1956 * @throws CancellationException if this future was cancelled
1957 * @throws ExecutionException if this future completed exceptionally
1958 * @throws InterruptedException if the current thread was interrupted
1959 * while waiting
1960 */
1961 @SuppressWarnings("unchecked")
1962 public T get() throws InterruptedException, ExecutionException {
1963 Object r;
1964 if ((r = result) == null)
1965 r = waitingGet(true);
1966 return (T) reportGet(r);
1967 }
1968
1969 /**
1970 * Waits if necessary for at most the given time for this future
1971 * to complete, and then returns its result, if available.
1972 *
1973 * @param timeout the maximum time to wait
1974 * @param unit the time unit of the timeout argument
1975 * @return the result value
1976 * @throws CancellationException if this future was cancelled
1977 * @throws ExecutionException if this future completed exceptionally
1978 * @throws InterruptedException if the current thread was interrupted
1979 * while waiting
1980 * @throws TimeoutException if the wait timed out
1981 */
1982 @SuppressWarnings("unchecked")
1983 public T get(long timeout, TimeUnit unit)
1984 throws InterruptedException, ExecutionException, TimeoutException {
1985 long nanos = unit.toNanos(timeout);
1986 Object r;
1987 if ((r = result) == null)
1988 r = timedGet(nanos);
1989 return (T) reportGet(r);
1990 }
1991
1992 /**
1993 * Returns the result value when complete, or throws an
1994 * (unchecked) exception if completed exceptionally. To better
1995 * conform with the use of common functional forms, if a
1996 * computation involved in the completion of this
1997 * CompletableFuture threw an exception, this method throws an
1998 * (unchecked) {@link CompletionException} with the underlying
1999 * exception as its cause.
2000 *
2001 * @return the result value
2002 * @throws CancellationException if the computation was cancelled
2003 * @throws CompletionException if this future completed
2004 * exceptionally or a completion computation threw an exception
2005 */
2006 @SuppressWarnings("unchecked")
2007 public T join() {
2008 Object r;
2009 if ((r = result) == null)
2010 r = waitingGet(false);
2011 return (T) reportJoin(r);
2012 }
2013
2014 /**
2015 * Returns the result value (or throws any encountered exception)
2016 * if completed, else returns the given valueIfAbsent.
2017 *
2018 * @param valueIfAbsent the value to return if not completed
2019 * @return the result value, if completed, else the given valueIfAbsent
2020 * @throws CancellationException if the computation was cancelled
2021 * @throws CompletionException if this future completed
2022 * exceptionally or a completion computation threw an exception
2023 */
2024 @SuppressWarnings("unchecked")
2025 public T getNow(T valueIfAbsent) {
2026 Object r;
2027 return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r);
2028 }
2029
2030 /**
2031 * If not already completed, sets the value returned by {@link
2032 * #get()} and related methods to the given value.
2033 *
2034 * @param value the result value
2035 * @return {@code true} if this invocation caused this CompletableFuture
2036 * to transition to a completed state, else {@code false}
2037 */
2038 public boolean complete(T value) {
2039 boolean triggered = completeValue(value);
2040 postComplete();
2041 return triggered;
2042 }
2043
2044 /**
2045 * If not already completed, causes invocations of {@link #get()}
2046 * and related methods to throw the given exception.
2047 *
2048 * @param ex the exception
2049 * @return {@code true} if this invocation caused this CompletableFuture
2050 * to transition to a completed state, else {@code false}
2051 */
2052 public boolean completeExceptionally(Throwable ex) {
2053 if (ex == null) throw new NullPointerException();
2054 boolean triggered = internalComplete(new AltResult(ex));
2055 postComplete();
2056 return triggered;
2057 }
2058
2059 public <U> CompletableFuture<U> thenApply(
2060 Function<? super T,? extends U> fn) {
2061 return uniApplyStage(null, fn);
2062 }
2063
2064 public <U> CompletableFuture<U> thenApplyAsync(
2065 Function<? super T,? extends U> fn) {
2066 return uniApplyStage(defaultExecutor(), fn);
2067 }
2068
2069 public <U> CompletableFuture<U> thenApplyAsync(
2070 Function<? super T,? extends U> fn, Executor executor) {
2071 return uniApplyStage(screenExecutor(executor), fn);
2072 }
2073
2074 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
2075 return uniAcceptStage(null, action);
2076 }
2077
2078 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
2079 return uniAcceptStage(defaultExecutor(), action);
2080 }
2081
2082 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
2083 Executor executor) {
2084 return uniAcceptStage(screenExecutor(executor), action);
2085 }
2086
2087 public CompletableFuture<Void> thenRun(Runnable action) {
2088 return uniRunStage(null, action);
2089 }
2090
2091 public CompletableFuture<Void> thenRunAsync(Runnable action) {
2092 return uniRunStage(defaultExecutor(), action);
2093 }
2094
2095 public CompletableFuture<Void> thenRunAsync(Runnable action,
2096 Executor executor) {
2097 return uniRunStage(screenExecutor(executor), action);
2098 }
2099
2100 public <U,V> CompletableFuture<V> thenCombine(
2101 CompletionStage<? extends U> other,
2102 BiFunction<? super T,? super U,? extends V> fn) {
2103 return biApplyStage(null, other, fn);
2104 }
2105
2106 public <U,V> CompletableFuture<V> thenCombineAsync(
2107 CompletionStage<? extends U> other,
2108 BiFunction<? super T,? super U,? extends V> fn) {
2109 return biApplyStage(defaultExecutor(), other, fn);
2110 }
2111
2112 public <U,V> CompletableFuture<V> thenCombineAsync(
2113 CompletionStage<? extends U> other,
2114 BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
2115 return biApplyStage(screenExecutor(executor), other, fn);
2116 }
2117
2118 public <U> CompletableFuture<Void> thenAcceptBoth(
2119 CompletionStage<? extends U> other,
2120 BiConsumer<? super T, ? super U> action) {
2121 return biAcceptStage(null, other, action);
2122 }
2123
2124 public <U> CompletableFuture<Void> thenAcceptBothAsync(
2125 CompletionStage<? extends U> other,
2126 BiConsumer<? super T, ? super U> action) {
2127 return biAcceptStage(defaultExecutor(), other, action);
2128 }
2129
2130 public <U> CompletableFuture<Void> thenAcceptBothAsync(
2131 CompletionStage<? extends U> other,
2132 BiConsumer<? super T, ? super U> action, Executor executor) {
2133 return biAcceptStage(screenExecutor(executor), other, action);
2134 }
2135
2136 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
2137 Runnable action) {
2138 return biRunStage(null, other, action);
2139 }
2140
2141 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
2142 Runnable action) {
2143 return biRunStage(defaultExecutor(), other, action);
2144 }
2145
2146 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
2147 Runnable action,
2148 Executor executor) {
2149 return biRunStage(screenExecutor(executor), other, action);
2150 }
2151
2152 public <U> CompletableFuture<U> applyToEither(
2153 CompletionStage<? extends T> other, Function<? super T, U> fn) {
2154 return orApplyStage(null, other, fn);
2155 }
2156
2157 public <U> CompletableFuture<U> applyToEitherAsync(
2158 CompletionStage<? extends T> other, Function<? super T, U> fn) {
2159 return orApplyStage(defaultExecutor(), other, fn);
2160 }
2161
2162 public <U> CompletableFuture<U> applyToEitherAsync(
2163 CompletionStage<? extends T> other, Function<? super T, U> fn,
2164 Executor executor) {
2165 return orApplyStage(screenExecutor(executor), other, fn);
2166 }
2167
2168 public CompletableFuture<Void> acceptEither(
2169 CompletionStage<? extends T> other, Consumer<? super T> action) {
2170 return orAcceptStage(null, other, action);
2171 }
2172
2173 public CompletableFuture<Void> acceptEitherAsync(
2174 CompletionStage<? extends T> other, Consumer<? super T> action) {
2175 return orAcceptStage(defaultExecutor(), other, action);
2176 }
2177
2178 public CompletableFuture<Void> acceptEitherAsync(
2179 CompletionStage<? extends T> other, Consumer<? super T> action,
2180 Executor executor) {
2181 return orAcceptStage(screenExecutor(executor), other, action);
2182 }
2183
2184 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2185 Runnable action) {
2186 return orRunStage(null, other, action);
2187 }
2188
2189 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2190 Runnable action) {
2191 return orRunStage(defaultExecutor(), other, action);
2192 }
2193
2194 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2195 Runnable action,
2196 Executor executor) {
2197 return orRunStage(screenExecutor(executor), other, action);
2198 }
2199
2200 public <U> CompletableFuture<U> thenCompose(
2201 Function<? super T, ? extends CompletionStage<U>> fn) {
2202 return uniComposeStage(null, fn);
2203 }
2204
2205 public <U> CompletableFuture<U> thenComposeAsync(
2206 Function<? super T, ? extends CompletionStage<U>> fn) {
2207 return uniComposeStage(defaultExecutor(), fn);
2208 }
2209
2210 public <U> CompletableFuture<U> thenComposeAsync(
2211 Function<? super T, ? extends CompletionStage<U>> fn,
2212 Executor executor) {
2213 return uniComposeStage(screenExecutor(executor), fn);
2214 }
2215
2216 public CompletableFuture<T> whenComplete(
2217 BiConsumer<? super T, ? super Throwable> action) {
2218 return uniWhenCompleteStage(null, action);
2219 }
2220
2221 public CompletableFuture<T> whenCompleteAsync(
2222 BiConsumer<? super T, ? super Throwable> action) {
2223 return uniWhenCompleteStage(defaultExecutor(), action);
2224 }
2225
2226 public CompletableFuture<T> whenCompleteAsync(
2227 BiConsumer<? super T, ? super Throwable> action, Executor executor) {
2228 return uniWhenCompleteStage(screenExecutor(executor), action);
2229 }
2230
2231 public <U> CompletableFuture<U> handle(
2232 BiFunction<? super T, Throwable, ? extends U> fn) {
2233 return uniHandleStage(null, fn);
2234 }
2235
2236 public <U> CompletableFuture<U> handleAsync(
2237 BiFunction<? super T, Throwable, ? extends U> fn) {
2238 return uniHandleStage(defaultExecutor(), fn);
2239 }
2240
2241 public <U> CompletableFuture<U> handleAsync(
2242 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
2243 return uniHandleStage(screenExecutor(executor), fn);
2244 }
2245
2246 /**
2247 * Returns this CompletableFuture.
2248 *
2249 * @return this CompletableFuture
2250 */
2251 public CompletableFuture<T> toCompletableFuture() {
2252 return this;
2253 }
2254
2255 // not in interface CompletionStage
2256
2257 /**
2258 * Returns a new CompletableFuture that is completed when this
2259 * CompletableFuture completes, with the result of the given
2260 * function of the exception triggering this CompletableFuture's
2261 * completion when it completes exceptionally; otherwise, if this
2262 * CompletableFuture completes normally, then the returned
2263 * CompletableFuture also completes normally with the same value.
2264 * Note: More flexible versions of this functionality are
2265 * available using methods {@code whenComplete} and {@code handle}.
2266 *
2267 * @param fn the function to use to compute the value of the
2268 * returned CompletableFuture if this CompletableFuture completed
2269 * exceptionally
2270 * @return the new CompletableFuture
2271 */
2272 public CompletableFuture<T> exceptionally(
2273 Function<Throwable, ? extends T> fn) {
2274 return uniExceptionallyStage(fn);
2275 }
2276
2277
2278 /* ------------- Arbitrary-arity constructions -------------- */
2279
2280 /**
2281 * Returns a new CompletableFuture that is completed when all of
2282 * the given CompletableFutures complete. If any of the given
2283 * CompletableFutures complete exceptionally, then the returned
2284 * CompletableFuture also does so, with a CompletionException
2285 * holding this exception as its cause. Otherwise, the results,
2286 * if any, of the given CompletableFutures are not reflected in
2287 * the returned CompletableFuture, but may be obtained by
2288 * inspecting them individually. If no CompletableFutures are
2289 * provided, returns a CompletableFuture completed with the value
2290 * {@code null}.
2291 *
2292 * <p>Among the applications of this method is to await completion
2293 * of a set of independent CompletableFutures before continuing a
2294 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2295 * c3).join();}.
2296 *
2297 * @param cfs the CompletableFutures
2298 * @return a new CompletableFuture that is completed when all of the
2299 * given CompletableFutures complete
2300 * @throws NullPointerException if the array or any of its elements are
2301 * {@code null}
2302 */
2303 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2304 return andTree(cfs, 0, cfs.length - 1);
2305 }
2306
2307 /**
2308 * Returns a new CompletableFuture that is completed when any of
2309 * the given CompletableFutures complete, with the same result.
2310 * Otherwise, if it completed exceptionally, the returned
2311 * CompletableFuture also does so, with a CompletionException
2312 * holding this exception as its cause. If no CompletableFutures
2313 * are provided, returns an incomplete CompletableFuture.
2314 *
2315 * @param cfs the CompletableFutures
2316 * @return a new CompletableFuture that is completed with the
2317 * result or exception of any of the given CompletableFutures when
2318 * one completes
2319 * @throws NullPointerException if the array or any of its elements are
2320 * {@code null}
2321 */
2322 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2323 return orTree(cfs, 0, cfs.length - 1);
2324 }
2325
2326 /* ------------- Control and status methods -------------- */
2327
2328 /**
2329 * If not already completed, completes this CompletableFuture with
2330 * a {@link CancellationException}. Dependent CompletableFutures
2331 * that have not already completed will also complete
2332 * exceptionally, with a {@link CompletionException} caused by
2333 * this {@code CancellationException}.
2334 *
2335 * @param mayInterruptIfRunning this value has no effect in this
2336 * implementation because interrupts are not used to control
2337 * processing.
2338 *
2339 * @return {@code true} if this task is now cancelled
2340 */
2341 public boolean cancel(boolean mayInterruptIfRunning) {
2342 boolean cancelled = (result == null) &&
2343 internalComplete(new AltResult(new CancellationException()));
2344 postComplete();
2345 return cancelled || isCancelled();
2346 }
2347
2348 /**
2349 * Returns {@code true} if this CompletableFuture was cancelled
2350 * before it completed normally.
2351 *
2352 * @return {@code true} if this CompletableFuture was cancelled
2353 * before it completed normally
2354 */
2355 public boolean isCancelled() {
2356 Object r;
2357 return ((r = result) instanceof AltResult) &&
2358 (((AltResult)r).ex instanceof CancellationException);
2359 }
2360
2361 /**
2362 * Returns {@code true} if this CompletableFuture completed
2363 * exceptionally, in any way. Possible causes include
2364 * cancellation, explicit invocation of {@code
2365 * completeExceptionally}, and abrupt termination of a
2366 * CompletionStage action.
2367 *
2368 * @return {@code true} if this CompletableFuture completed
2369 * exceptionally
2370 */
2371 public boolean isCompletedExceptionally() {
2372 Object r;
2373 return ((r = result) instanceof AltResult) && r != NIL;
2374 }
2375
2376 /**
2377 * Forcibly sets or resets the value subsequently returned by
2378 * method {@link #get()} and related methods, whether or not
2379 * already completed. This method is designed for use only in
2380 * error recovery actions, and even in such situations may result
2381 * in ongoing dependent completions using established versus
2382 * overwritten outcomes.
2383 *
2384 * @param value the completion value
2385 */
2386 public void obtrudeValue(T value) {
2387 result = (value == null) ? NIL : value;
2388 postComplete();
2389 }
2390
2391 /**
2392 * Forcibly causes subsequent invocations of method {@link #get()}
2393 * and related methods to throw the given exception, whether or
2394 * not already completed. This method is designed for use only in
2395 * error recovery actions, and even in such situations may result
2396 * in ongoing dependent completions using established versus
2397 * overwritten outcomes.
2398 *
2399 * @param ex the exception
2400 * @throws NullPointerException if the exception is null
2401 */
2402 public void obtrudeException(Throwable ex) {
2403 if (ex == null) throw new NullPointerException();
2404 result = new AltResult(ex);
2405 postComplete();
2406 }
2407
2408 /**
2409 * Returns the estimated number of CompletableFutures whose
2410 * completions are awaiting completion of this CompletableFuture.
2411 * This method is designed for use in monitoring system state, not
2412 * for synchronization control.
2413 *
2414 * @return the number of dependent CompletableFutures
2415 */
2416 public int getNumberOfDependents() {
2417 int count = 0;
2418 for (Completion p = stack; p != null; p = p.next)
2419 ++count;
2420 return count;
2421 }
2422
2423 /**
2424 * Returns a string identifying this CompletableFuture, as well as
2425 * its completion state. The state, in brackets, contains the
2426 * String {@code "Completed Normally"} or the String {@code
2427 * "Completed Exceptionally"}, or the String {@code "Not
2428 * completed"} followed by the number of CompletableFutures
2429 * dependent upon its completion, if any.
2430 *
2431 * @return a string identifying this CompletableFuture, as well as its state
2432 */
2433 public String toString() {
2434 Object r = result;
2435 int count = 0; // avoid call to getNumberOfDependents in case disabled
2436 for (Completion p = stack; p != null; p = p.next)
2437 ++count;
2438 return super.toString() +
2439 ((r == null) ?
2440 ((count == 0) ?
2441 "[Not completed]" :
2442 "[Not completed, " + count + " dependents]") :
2443 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2444 "[Completed exceptionally]" :
2445 "[Completed normally]"));
2446 }
2447
2448 // jdk9 additions
2449
2450 /**
2451 * Returns a new incomplete CompletableFuture of the type to be
2452 * returned by a CompletionStage method. Subclasses should
2453 * normally override this method to return an instance of the same
2454 * class as this CompletableFuture. The default implementation
2455 * returns an instance of class CompletableFuture.
2456 *
2457 * @param <U> the type of the value
2458 * @return a new CompletableFuture
2459 * @since 9
2460 */
2461 public <U> CompletableFuture<U> newIncompleteFuture() {
2462 return new CompletableFuture<U>();
2463 }
2464
2465 /**
2466 * Returns the default Executor used for async methods that do not
2467 * specify an Executor. This class uses the {@link
2468 * ForkJoinPool#commonPool()} if it supports more than one
2469 * parallel thread, or else an Executor using one thread per async
2470 * task. This method may be overridden in subclasses to return
2471 * an Executor that provides at least one independent thread.
2472 *
2473 * @return the executor
2474 * @since 9
2475 */
2476 public Executor defaultExecutor() {
2477 return ASYNC_POOL;
2478 }
2479
2480 /**
2481 * Returns a new CompletableFuture that is completed normally with
2482 * the same value as this CompletableFuture when it completes
2483 * normally. If this CompletableFuture completes exceptionally,
2484 * then the returned CompletableFuture completes exceptionally
2485 * with a CompletionException with this exception as cause. The
2486 * behavior is equivalent to {@code thenApply(x -> x)}. This
2487 * method may be useful as a form of "defensive copying", to
2488 * prevent clients from completing, while still being able to
2489 * arrange dependent actions.
2490 *
2491 * @return the new CompletableFuture
2492 * @since 9
2493 */
2494 public CompletableFuture<T> copy() {
2495 return uniCopyStage();
2496 }
2497
2498 /**
2499 * Returns a new CompletionStage that is completed normally with
2500 * the same value as this CompletableFuture when it completes
2501 * normally, and cannot be independently completed or otherwise
2502 * used in ways not defined by the methods of interface {@link
2503 * CompletionStage}. If this CompletableFuture completes
2504 * exceptionally, then the returned CompletionStage completes
2505 * exceptionally with a CompletionException with this exception as
2506 * cause.
2507 *
2508 * @return the new CompletionStage
2509 * @since 9
2510 */
2511 public CompletionStage<T> minimalCompletionStage() {
2512 return uniAsMinimalStage();
2513 }
2514
2515 /**
2516 * Completes this CompletableFuture with the result of
2517 * the given Supplier function invoked from an asynchronous
2518 * task using the given executor.
2519 *
2520 * @param supplier a function returning the value to be used
2521 * to complete this CompletableFuture
2522 * @param executor the executor to use for asynchronous execution
2523 * @return this CompletableFuture
2524 * @since 9
2525 */
2526 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier,
2527 Executor executor) {
2528 if (supplier == null || executor == null)
2529 throw new NullPointerException();
2530 executor.execute(new AsyncSupply<T>(this, supplier));
2531 return this;
2532 }
2533
2534 /**
2535 * Completes this CompletableFuture with the result of the given
2536 * Supplier function invoked from an asynchronous task using the
2537 * default executor.
2538 *
2539 * @param supplier a function returning the value to be used
2540 * to complete this CompletableFuture
2541 * @return this CompletableFuture
2542 * @since 9
2543 */
2544 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {
2545 return completeAsync(supplier, defaultExecutor());
2546 }
2547
2548 /**
2549 * Exceptionally completes this CompletableFuture with
2550 * a {@link TimeoutException} if not otherwise completed
2551 * before the given timeout.
2552 *
2553 * @param timeout how long to wait before completing exceptionally
2554 * with a TimeoutException, in units of {@code unit}
2555 * @param unit a {@code TimeUnit} determining how to interpret the
2556 * {@code timeout} parameter
2557 * @return this CompletableFuture
2558 * @since 9
2559 */
2560 public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
2561 if (unit == null)
2562 throw new NullPointerException();
2563 if (result == null)
2564 whenComplete(new Canceller(Delayer.delay(new Timeout(this),
2565 timeout, unit)));
2566 return this;
2567 }
2568
2569 /**
2570 * Completes this CompletableFuture with the given value if not
2571 * otherwise completed before the given timeout.
2572 *
2573 * @param value the value to use upon timeout
2574 * @param timeout how long to wait before completing normally
2575 * with the given value, in units of {@code unit}
2576 * @param unit a {@code TimeUnit} determining how to interpret the
2577 * {@code timeout} parameter
2578 * @return this CompletableFuture
2579 * @since 9
2580 */
2581 public CompletableFuture<T> completeOnTimeout(T value, long timeout,
2582 TimeUnit unit) {
2583 if (unit == null)
2584 throw new NullPointerException();
2585 if (result == null)
2586 whenComplete(new Canceller(Delayer.delay(
2587 new DelayedCompleter<T>(this, value),
2588 timeout, unit)));
2589 return this;
2590 }
2591
2592 /**
2593 * Returns a new Executor that submits a task to the given base
2594 * executor after the given delay (or no delay if non-positive).
2595 * Each delay commences upon invocation of the returned executor's
2596 * {@code execute} method.
2597 *
2598 * @param delay how long to delay, in units of {@code unit}
2599 * @param unit a {@code TimeUnit} determining how to interpret the
2600 * {@code delay} parameter
2601 * @param executor the base executor
2602 * @return the new delayed executor
2603 * @since 9
2604 */
2605 public static Executor delayedExecutor(long delay, TimeUnit unit,
2606 Executor executor) {
2607 if (unit == null || executor == null)
2608 throw new NullPointerException();
2609 return new DelayedExecutor(delay, unit, executor);
2610 }
2611
2612 /**
2613 * Returns a new Executor that submits a task to the default
2614 * executor after the given delay (or no delay if non-positive).
2615 * Each delay commences upon invocation of the returned executor's
2616 * {@code execute} method.
2617 *
2618 * @param delay how long to delay, in units of {@code unit}
2619 * @param unit a {@code TimeUnit} determining how to interpret the
2620 * {@code delay} parameter
2621 * @return the new delayed executor
2622 * @since 9
2623 */
2624 public static Executor delayedExecutor(long delay, TimeUnit unit) {
2625 if (unit == null)
2626 throw new NullPointerException();
2627 return new DelayedExecutor(delay, unit, ASYNC_POOL);
2628 }
2629
2630 /**
2631 * Returns a new CompletionStage that is already completed with
2632 * the given value and supports only those methods in
2633 * interface {@link CompletionStage}.
2634 *
2635 * @param value the value
2636 * @param <U> the type of the value
2637 * @return the completed CompletionStage
2638 * @since 9
2639 */
2640 public static <U> CompletionStage<U> completedStage(U value) {
2641 return new MinimalStage<U>((value == null) ? NIL : value);
2642 }
2643
2644 /**
2645 * Returns a new CompletableFuture that is already completed
2646 * exceptionally with the given exception.
2647 *
2648 * @param ex the exception
2649 * @param <U> the type of the value
2650 * @return the exceptionally completed CompletableFuture
2651 * @since 9
2652 */
2653 public static <U> CompletableFuture<U> failedFuture(Throwable ex) {
2654 if (ex == null) throw new NullPointerException();
2655 return new CompletableFuture<U>(new AltResult(ex));
2656 }
2657
2658 /**
2659 * Returns a new CompletionStage that is already completed
2660 * exceptionally with the given exception and supports only those
2661 * methods in interface {@link CompletionStage}.
2662 *
2663 * @param ex the exception
2664 * @param <U> the type of the value
2665 * @return the exceptionally completed CompletionStage
2666 * @since 9
2667 */
2668 public static <U> CompletionStage<U> failedStage(Throwable ex) {
2669 if (ex == null) throw new NullPointerException();
2670 return new MinimalStage<U>(new AltResult(ex));
2671 }
2672
2673 /**
2674 * Singleton delay scheduler, used only for starting and
2675 * cancelling tasks.
2676 */
2677 static final class Delayer {
2678 static ScheduledFuture<?> delay(Runnable command, long delay,
2679 TimeUnit unit) {
2680 return delayer.schedule(command, delay, unit);
2681 }
2682
2683 static final class DaemonThreadFactory implements ThreadFactory {
2684 public Thread newThread(Runnable r) {
2685 Thread t = new Thread(r);
2686 t.setDaemon(true);
2687 t.setName("CompletableFutureDelayScheduler");
2688 return t;
2689 }
2690 }
2691
2692 static final ScheduledThreadPoolExecutor delayer;
2693 static {
2694 (delayer = new ScheduledThreadPoolExecutor(
2695 1, new DaemonThreadFactory())).
2696 setRemoveOnCancelPolicy(true);
2697 }
2698 }
2699
2700 // Little class-ified lambdas to better support monitoring
2701
2702 static final class DelayedExecutor implements Executor {
2703 final long delay;
2704 final TimeUnit unit;
2705 final Executor executor;
2706 DelayedExecutor(long delay, TimeUnit unit, Executor executor) {
2707 this.delay = delay; this.unit = unit; this.executor = executor;
2708 }
2709 public void execute(Runnable r) {
2710 Delayer.delay(new TaskSubmitter(executor, r), delay, unit);
2711 }
2712 }
2713
2714 /** Action to submit user task */
2715 static final class TaskSubmitter implements Runnable {
2716 final Executor executor;
2717 final Runnable action;
2718 TaskSubmitter(Executor executor, Runnable action) {
2719 this.executor = executor;
2720 this.action = action;
2721 }
2722 public void run() { executor.execute(action); }
2723 }
2724
2725 /** Action to completeExceptionally on timeout */
2726 static final class Timeout implements Runnable {
2727 final CompletableFuture<?> f;
2728 Timeout(CompletableFuture<?> f) { this.f = f; }
2729 public void run() {
2730 if (f != null && !f.isDone())
2731 f.completeExceptionally(new TimeoutException());
2732 }
2733 }
2734
2735 /** Action to complete on timeout */
2736 static final class DelayedCompleter<U> implements Runnable {
2737 final CompletableFuture<U> f;
2738 final U u;
2739 DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; }
2740 public void run() {
2741 if (f != null)
2742 f.complete(u);
2743 }
2744 }
2745
2746 /** Action to cancel unneeded timeouts */
2747 static final class Canceller implements BiConsumer<Object, Throwable> {
2748 final Future<?> f;
2749 Canceller(Future<?> f) { this.f = f; }
2750 public void accept(Object ignore, Throwable ex) {
2751 if (ex == null && f != null && !f.isDone())
2752 f.cancel(false);
2753 }
2754 }
2755
2756 /**
2757 * A subclass that just throws UOE for most non-CompletionStage methods.
2758 */
2759 static final class MinimalStage<T> extends CompletableFuture<T> {
2760 MinimalStage() { }
2761 MinimalStage(Object r) { super(r); }
2762 @Override public <U> CompletableFuture<U> newIncompleteFuture() {
2763 return new MinimalStage<U>(); }
2764 @Override public T get() {
2765 throw new UnsupportedOperationException(); }
2766 @Override public T get(long timeout, TimeUnit unit) {
2767 throw new UnsupportedOperationException(); }
2768 @Override public T getNow(T valueIfAbsent) {
2769 throw new UnsupportedOperationException(); }
2770 @Override public T join() {
2771 throw new UnsupportedOperationException(); }
2772 @Override public boolean complete(T value) {
2773 throw new UnsupportedOperationException(); }
2774 @Override public boolean completeExceptionally(Throwable ex) {
2775 throw new UnsupportedOperationException(); }
2776 @Override public boolean cancel(boolean mayInterruptIfRunning) {
2777 throw new UnsupportedOperationException(); }
2778 @Override public void obtrudeValue(T value) {
2779 throw new UnsupportedOperationException(); }
2780 @Override public void obtrudeException(Throwable ex) {
2781 throw new UnsupportedOperationException(); }
2782 @Override public boolean isDone() {
2783 throw new UnsupportedOperationException(); }
2784 @Override public boolean isCancelled() {
2785 throw new UnsupportedOperationException(); }
2786 @Override public boolean isCompletedExceptionally() {
2787 throw new UnsupportedOperationException(); }
2788 @Override public int getNumberOfDependents() {
2789 throw new UnsupportedOperationException(); }
2790 @Override public CompletableFuture<T> completeAsync
2791 (Supplier<? extends T> supplier, Executor executor) {
2792 throw new UnsupportedOperationException(); }
2793 @Override public CompletableFuture<T> completeAsync
2794 (Supplier<? extends T> supplier) {
2795 throw new UnsupportedOperationException(); }
2796 @Override public CompletableFuture<T> orTimeout
2797 (long timeout, TimeUnit unit) {
2798 throw new UnsupportedOperationException(); }
2799 @Override public CompletableFuture<T> completeOnTimeout
2800 (T value, long timeout, TimeUnit unit) {
2801 throw new UnsupportedOperationException(); }
2802 }
2803
2804 // VarHandle mechanics
2805 private static final VarHandle RESULT;
2806 private static final VarHandle STACK;
2807 private static final VarHandle NEXT;
2808 static {
2809 try {
2810 MethodHandles.Lookup l = MethodHandles.lookup();
2811 RESULT = l.findVarHandle(CompletableFuture.class, "result", Object.class);
2812 STACK = l.findVarHandle(CompletableFuture.class, "stack", Completion.class);
2813 NEXT = l.findVarHandle(Completion.class, "next", Completion.class);
2814 } catch (ReflectiveOperationException e) {
2815 throw new Error(e);
2816 }
2817
2818 // Reduce the risk of rare disastrous classloading in first call to
2819 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
2820 Class<?> ensureLoaded = LockSupport.class;
2821 }
2822 }