ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.27
Committed: Wed Jan 2 07:50:57 2013 UTC (11 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.26: +4 -4 lines
Log Message:
@code

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Block;
10 import java.util.function.BiBlock;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.concurrent.locks.LockSupport;
25
26 /**
27 * A {@link Future} that may be explicitly completed (setting its
28 * value and status), and may include dependent functions and actions
29 * that trigger upon its completion.
30 *
31 * <p>Similar methods are available for Functions, Blocks, and
32 * Runnables, depending on whether actions require arguments and/or
33 * produce results. Functions and actions supplied for dependent
34 * completions (mainly using methods with prefix {@code then}) may be
35 * performed by the thread that completes the current
36 * CompletableFuture, or by any other caller of these methods. There
37 * are no guarantees about the order of processing completions unless
38 * constrained by these methods.
39 *
40 * <p>When two or more threads attempt to {@link #complete} or {@link
41 * #completeExceptionally} a CompletableFuture, only one of them
42 * succeeds.
43 *
44 * <p>Upon exceptional completion, or when a completion entails
45 * computation of a function or action, and it terminates abruptly
46 * with an (unchecked) exception or error, then further completions
47 * act as {@code completeExceptionally} with a {@link
48 * CompletionException} holding that exception as its cause. If a
49 * CompletableFuture completes exceptionally, and is not followed by a
50 * {@link #exceptionally} or {@link #handle} completion, then all of
51 * its dependents (and their dependents) also complete exceptionally
52 * with CompletionExceptions holding the ultimate cause. In case of a
53 * CompletionException, methods {@link #get()} and {@link #get(long,
54 * TimeUnit)} throw a {@link ExecutionException} with the same cause
55 * as would be held in the corresponding CompletionException. However,
56 * in these cases, methods {@link #join()} and {@link #getNow} throw
57 * the CompletionException, which simplifies usage especially within
58 * other completion functions.
59 *
60 * <p>CompletableFutures themselves do not execute asynchronously.
61 * However, the {@code async} methods provide commonly useful ways to
62 * commence asynchronous processing, using either a given {@link
63 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
64 * function or action that will result in the completion of a new
65 * CompletableFuture.
66 *
67 * @author Doug Lea
68 * @since 1.8
69 */
70 public class CompletableFuture<T> implements Future<T> {
71 /*
72 * Overview:
73 *
74 * 1. Non-nullness of field result (set via CAS) indicates
75 * done. An AltResult is used to box null as a result, as well as
76 * to hold exceptions. Using a single field makes completion fast
77 * and simple to detect and trigger, at the expense of a lot of
78 * encoding and decoding that infiltrates many methods. One minor
79 * simplification relies on the (static) NIL (to box null results)
80 * being the only AltResult with a null exception field, so we
81 * don't usually need explicit comparisons with NIL.
82 *
83 * 2. Waiters are held in a Treiber stack similar to the one used
84 * in FutureTask, Phaser, and SynchronousQueue. See their
85 * internal documentation for details.
86 *
87 * 3. Completions are also kept in a list/stack, and pulled off
88 * and run when completion is triggered. (We could in fact use the
89 * same stack as for waiters, but would give up the potential
90 * parallelism obtained because woken waiters help release/run
91 * others (see method postComplete). Because post-processing may
92 * race with direct calls, completions extend AtomicInteger so
93 * callers can claim the action via compareAndSet(0, 1). The
94 * Completion.run methods are all written a boringly similar
95 * uniform way (that sometimes includes unnecessary-looking
96 * checks, kept to maintain uniformity). There are enough
97 * dimensions upon which they differ that factoring to use common
98 * code isn't worthwhile.
99 *
100 * 4. The exported then/and/or methods do support a bit of
101 * factoring (see thenFunction, andBlock, etc). They must cope
102 * with the intrinsic races surrounding addition of a dependent
103 * action versus performing the action directly because the task
104 * is already complete. For example, a CF may not be complete
105 * upon entry, so a dependent completion is added, but by the time
106 * it is added, the target CF is complete, so must be directly
107 * executed. This is all done while avoiding unnecessary object
108 * construction in safe-bypass cases.
109 */
110
111 // preliminary class definitions
112
113 static final class AltResult {
114 final Throwable ex; // null only for NIL
115 AltResult(Throwable ex) { this.ex = ex; }
116 }
117
118 static final AltResult NIL = new AltResult(null);
119
120 /**
121 * Simple linked list nodes to record waiting threads in a Treiber
122 * stack. See other classes such as Phaser and SynchronousQueue
123 * for more detailed explanation.
124 */
125 static final class WaitNode {
126 volatile Thread thread;
127 volatile WaitNode next;
128 }
129
130 /**
131 * Simple linked list nodes to record completions, used in
132 * basically the same way as WaitNodes. (We separate nodes from
133 * the Completions themselves mainly because for the And and Or
134 * methods, the same Completion object resides in two lists.)
135 */
136 static final class CompletionNode {
137 final Completion completion;
138 volatile CompletionNode next;
139 CompletionNode(Completion completion) { this.completion = completion; }
140 }
141
142
143 // Fields
144
145 volatile Object result; // Either the result or boxed AltResult
146 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
147 volatile CompletionNode completions; // list (Treiber stack) of completions
148
149 // Basic utilities for triggering and processing completions
150 // (The Completion and Async classes and internal methods that
151 // use them are defined after the public methods.)
152
153 /**
154 * Removes and signals all waiting threads and runs all completions.
155 */
156 final void postComplete() {
157 WaitNode q; Thread t;
158 while ((q = waiters) != null) {
159 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
160 (t = q.thread) != null) {
161 q.thread = null;
162 LockSupport.unpark(t);
163 }
164 }
165
166 CompletionNode h; Completion c;
167 while ((h = completions) != null) {
168 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
169 (c = h.completion) != null)
170 c.run();
171 }
172 }
173
174 /**
175 * Triggers completion with the encoding of the given arguments:
176 * if the exception is non-null, encodes it as a wrapped
177 * CompletionException unless it is one already. Otherwise uses
178 * the given result, boxed as NIL if null.
179 */
180 final void internalComplete(Object v, Throwable ex) {
181 if (result == null)
182 UNSAFE.compareAndSwapObject
183 (this, RESULT, null,
184 (ex == null) ? (v == null) ? NIL : v :
185 new AltResult((ex instanceof CompletionException) ? ex :
186 new CompletionException(ex)));
187 postComplete(); // help out even if not triggered
188 }
189
190 /**
191 * If triggered, helps release and/or process completions.
192 */
193 final void helpPostComplete() {
194 if (result != null)
195 postComplete();
196 }
197
198 // public methods
199
200 /**
201 * Creates a new incomplete CompletableFuture.
202 */
203 public CompletableFuture() {
204 }
205
206 /**
207 * Asynchronously executes in the {@link
208 * ForkJoinPool#commonPool()}, a task that completes the returned
209 * CompletableFuture with the result of the given Supplier.
210 *
211 * @param supplier a function returning the value to be used
212 * to complete the returned CompletableFuture
213 * @return the CompletableFuture
214 */
215 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
216 if (supplier == null) throw new NullPointerException();
217 CompletableFuture<U> f = new CompletableFuture<U>();
218 ForkJoinPool.commonPool().
219 execute((ForkJoinTask<?>)new AsyncSupplier<U>(supplier, f));
220 return f;
221 }
222
223 /**
224 * Asynchronously executes using the given executor, a task that
225 * completes the returned CompletableFuture with the result of the
226 * given Supplier.
227 *
228 * @param supplier a function returning the value to be used
229 * to complete the returned CompletableFuture
230 * @param executor the executor to use for asynchronous execution
231 * @return the CompletableFuture
232 */
233 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
234 Executor executor) {
235 if (executor == null || supplier == null)
236 throw new NullPointerException();
237 CompletableFuture<U> f = new CompletableFuture<U>();
238 executor.execute(new AsyncSupplier<U>(supplier, f));
239 return f;
240 }
241
242 /**
243 * Asynchronously executes in the {@link
244 * ForkJoinPool#commonPool()} a task that runs the given action,
245 * and then completes the returned CompletableFuture.
246 *
247 * @param runnable the action to run before completing the
248 * returned CompletableFuture
249 * @return the CompletableFuture
250 */
251 public static CompletableFuture<Void> runAsync(Runnable runnable) {
252 if (runnable == null) throw new NullPointerException();
253 CompletableFuture<Void> f = new CompletableFuture<Void>();
254 ForkJoinPool.commonPool().
255 execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
256 return f;
257 }
258
259 /**
260 * Asynchronously executes using the given executor, a task that
261 * runs the given action, and then completes the returned
262 * CompletableFuture.
263 *
264 * @param runnable the action to run before completing the
265 * returned CompletableFuture
266 * @param executor the executor to use for asynchronous execution
267 * @return the CompletableFuture
268 */
269 public static CompletableFuture<Void> runAsync(Runnable runnable,
270 Executor executor) {
271 if (executor == null || runnable == null)
272 throw new NullPointerException();
273 CompletableFuture<Void> f = new CompletableFuture<Void>();
274 executor.execute(new AsyncRunnable(runnable, f));
275 return f;
276 }
277
278 /**
279 * Returns {@code true} if completed in any fashion: normally,
280 * exceptionally, or via cancellation.
281 *
282 * @return {@code true} if completed
283 */
284 public boolean isDone() {
285 return result != null;
286 }
287
288 /**
289 * Suppresses warnings and is slightly safer than a plain cast.
290 */
291 @SuppressWarnings("unchecked")
292 T castResult(Object r) {
293 assert ! (r == null || (r instanceof AltResult));
294 return (T)r;
295 }
296
297 /**
298 * Waits if necessary for the computation to complete, and then
299 * retrieves its result.
300 *
301 * @return the computed result
302 * @throws CancellationException if the computation was cancelled
303 * @throws ExecutionException if the computation threw an
304 * exception
305 * @throws InterruptedException if the current thread was interrupted
306 * while waiting
307 */
308 public T get() throws InterruptedException, ExecutionException {
309 Object r; Throwable ex, cause;
310 if ((r = result) == null && (r = waitingGet(true)) == null)
311 throw new InterruptedException();
312 if (r instanceof AltResult) {
313 if ((ex = ((AltResult)r).ex) != null) {
314 if (ex instanceof CancellationException)
315 throw (CancellationException)ex;
316 if ((ex instanceof CompletionException) &&
317 (cause = ex.getCause()) != null)
318 ex = cause;
319 throw new ExecutionException(ex);
320 }
321 return null;
322 }
323 return castResult(r);
324 }
325
326 /**
327 * Waits if necessary for at most the given time for completion,
328 * and then retrieves its result, if available.
329 *
330 * @param timeout the maximum time to wait
331 * @param unit the time unit of the timeout argument
332 * @return the computed result
333 * @throws CancellationException if the computation was cancelled
334 * @throws ExecutionException if the computation threw an
335 * exception
336 * @throws InterruptedException if the current thread was interrupted
337 * while waiting
338 * @throws TimeoutException if the wait timed out
339 */
340 public T get(long timeout, TimeUnit unit)
341 throws InterruptedException, ExecutionException, TimeoutException {
342 Object r; Throwable ex, cause;
343 long nanos = unit.toNanos(timeout);
344 if (Thread.interrupted())
345 throw new InterruptedException();
346 if ((r = result) == null)
347 r = timedAwaitDone(nanos);
348 if (r instanceof AltResult) {
349 if ((ex = ((AltResult)r).ex) != null) {
350 if (ex instanceof CancellationException)
351 throw (CancellationException)ex;
352 if ((ex instanceof CompletionException) &&
353 (cause = ex.getCause()) != null)
354 ex = cause;
355 throw new ExecutionException(ex);
356 }
357 return null;
358 }
359 return castResult(r);
360 }
361
362 /**
363 * Returns the result value when complete, or throws an
364 * (unchecked) exception if completed exceptionally. To better
365 * conform with the use of common functional forms, if a
366 * computation involved in the completion of this
367 * CompletableFuture threw an exception, this method throws an
368 * (unchecked) {@link CompletionException} with the underlying
369 * exception as its cause.
370 *
371 * @return the result value
372 * @throws CancellationException if the computation was cancelled
373 * @throws CompletionException if a completion computation threw
374 * an exception
375 */
376 public T join() {
377 Object r; Throwable ex;
378 if ((r = result) == null)
379 r = waitingGet(false);
380 if (r instanceof AltResult) {
381 if ((ex = ((AltResult)r).ex) != null) {
382 if (ex instanceof CancellationException)
383 throw (CancellationException)ex;
384 if (ex instanceof CompletionException)
385 throw (CompletionException)ex;
386 throw new CompletionException(ex);
387 }
388 return null;
389 }
390 return castResult(r);
391 }
392
393 /**
394 * Returns the result value (or throws any encountered exception)
395 * if completed, else returns the given valueIfAbsent.
396 *
397 * @param valueIfAbsent the value to return if not completed
398 * @return the result value, if completed, else the given valueIfAbsent
399 * @throws CancellationException if the computation was cancelled
400 * @throws CompletionException if a completion computation threw
401 * an exception
402 */
403 public T getNow(T valueIfAbsent) {
404 Object r; Throwable ex;
405 if ((r = result) == null)
406 return valueIfAbsent;
407 if (r instanceof AltResult) {
408 if ((ex = ((AltResult)r).ex) != null) {
409 if (ex instanceof CancellationException)
410 throw (CancellationException)ex;
411 if (ex instanceof CompletionException)
412 throw (CompletionException)ex;
413 throw new CompletionException(ex);
414 }
415 return null;
416 }
417 return castResult(r);
418 }
419
420 /**
421 * If not already completed, sets the value returned by {@link
422 * #get()} and related methods to the given value.
423 *
424 * @param value the result value
425 * @return {@code true} if this invocation caused this CompletableFuture
426 * to transition to a completed state, else {@code false}
427 */
428 public boolean complete(T value) {
429 boolean triggered =
430 result == null &&
431 UNSAFE.compareAndSwapObject(this, RESULT, null,
432 value == null ? NIL : value);
433 postComplete();
434 return triggered;
435 }
436
437 /**
438 * If not already completed, causes invocations of {@link #get()}
439 * and related methods to throw the given exception.
440 *
441 * @param ex the exception
442 * @return {@code true} if this invocation caused this CompletableFuture
443 * to transition to a completed state, else {@code false}
444 */
445 public boolean completeExceptionally(Throwable ex) {
446 if (ex == null) throw new NullPointerException();
447 boolean triggered =
448 result == null &&
449 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
450 postComplete();
451 return triggered;
452 }
453
454 /**
455 * Creates and returns a CompletableFuture that is completed with
456 * the result of the given function of this CompletableFuture.
457 * If this CompletableFuture completes exceptionally,
458 * then the returned CompletableFuture also does so,
459 * with a CompletionException holding this exception as
460 * its cause.
461 *
462 * @param fn the function to use to compute the value of
463 * the returned CompletableFuture
464 * @return the new CompletableFuture
465 */
466 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
467 return thenFunction(fn, null);
468 }
469
470 /**
471 * Creates and returns a CompletableFuture that is asynchronously
472 * completed using the {@link ForkJoinPool#commonPool()} with the
473 * result of the given function of this CompletableFuture. If
474 * this CompletableFuture completes exceptionally, then the
475 * returned CompletableFuture also does so, with a
476 * CompletionException holding this exception as its cause.
477 *
478 * @param fn the function to use to compute the value of
479 * the returned CompletableFuture
480 * @return the new CompletableFuture
481 */
482 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
483 return thenFunction(fn, ForkJoinPool.commonPool());
484 }
485
486 /**
487 * Creates and returns a CompletableFuture that is asynchronously
488 * completed using the given executor with the result of the given
489 * function of this CompletableFuture. If this CompletableFuture
490 * completes exceptionally, then the returned CompletableFuture
491 * also does so, with a CompletionException holding this exception as
492 * its cause.
493 *
494 * @param fn the function to use to compute the value of
495 * the returned CompletableFuture
496 * @param executor the executor to use for asynchronous execution
497 * @return the new CompletableFuture
498 */
499 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
500 Executor executor) {
501 if (executor == null) throw new NullPointerException();
502 return thenFunction(fn, executor);
503 }
504
505 /**
506 * Creates and returns a CompletableFuture that is completed after
507 * performing the given action with this CompletableFuture's
508 * result when it completes. If this CompletableFuture
509 * completes exceptionally, then the returned CompletableFuture
510 * also does so, with a CompletionException holding this exception as
511 * its cause.
512 *
513 * @param block the action to perform before completing the
514 * returned CompletableFuture
515 * @return the new CompletableFuture
516 */
517 public CompletableFuture<Void> thenAccept(Block<? super T> block) {
518 return thenBlock(block, null);
519 }
520
521 /**
522 * Creates and returns a CompletableFuture that is asynchronously
523 * completed using the {@link ForkJoinPool#commonPool()} with this
524 * CompletableFuture's result when it completes. If this
525 * CompletableFuture completes exceptionally, then the returned
526 * CompletableFuture also does so, with a CompletionException holding
527 * this exception as its cause.
528 *
529 * @param block the action to perform before completing the
530 * returned CompletableFuture
531 * @return the new CompletableFuture
532 */
533 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block) {
534 return thenBlock(block, ForkJoinPool.commonPool());
535 }
536
537 /**
538 * Creates and returns a CompletableFuture that is asynchronously
539 * completed using the given executor with this
540 * CompletableFuture's result when it completes. If this
541 * CompletableFuture completes exceptionally, then the returned
542 * CompletableFuture also does so, with a CompletionException holding
543 * this exception as its cause.
544 *
545 * @param block the action to perform before completing the
546 * returned CompletableFuture
547 * @param executor the executor to use for asynchronous execution
548 * @return the new CompletableFuture
549 */
550 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block,
551 Executor executor) {
552 if (executor == null) throw new NullPointerException();
553 return thenBlock(block, executor);
554 }
555
556 /**
557 * Creates and returns a CompletableFuture that is completed after
558 * performing the given action when this CompletableFuture
559 * completes. If this CompletableFuture completes exceptionally,
560 * then the returned CompletableFuture also does so, with a
561 * CompletionException holding this exception as its cause.
562 *
563 * @param action the action to perform before completing the
564 * returned CompletableFuture
565 * @return the new CompletableFuture
566 */
567 public CompletableFuture<Void> thenRun(Runnable action) {
568 return thenRunnable(action, null);
569 }
570
571 /**
572 * Creates and returns a CompletableFuture that is asynchronously
573 * completed using the {@link ForkJoinPool#commonPool()} after
574 * performing the given action when this CompletableFuture
575 * completes. If this CompletableFuture completes exceptionally,
576 * then the returned CompletableFuture also does so, with a
577 * CompletionException holding this exception as its cause.
578 *
579 * @param action the action to perform before completing the
580 * returned CompletableFuture
581 * @return the new CompletableFuture
582 */
583 public CompletableFuture<Void> thenRunAsync(Runnable action) {
584 return thenRunnable(action, ForkJoinPool.commonPool());
585 }
586
587 /**
588 * Creates and returns a CompletableFuture that is asynchronously
589 * completed using the given executor after performing the given
590 * action when this CompletableFuture completes. If this
591 * CompletableFuture completes exceptionally, then the returned
592 * CompletableFuture also does so, with a CompletionException holding
593 * this exception as its cause.
594 *
595 * @param action the action to perform before completing the
596 * returned CompletableFuture
597 * @param executor the executor to use for asynchronous execution
598 * @return the new CompletableFuture
599 */
600 public CompletableFuture<Void> thenRunAsync(Runnable action,
601 Executor executor) {
602 if (executor == null) throw new NullPointerException();
603 return thenRunnable(action, executor);
604 }
605
606 /**
607 * Creates and returns a CompletableFuture that is completed with
608 * the result of the given function of this and the other given
609 * CompletableFuture's results when both complete. If this or
610 * the other CompletableFuture complete exceptionally, then the
611 * returned CompletableFuture also does so, with a
612 * CompletionException holding the exception as its cause.
613 *
614 * @param other the other CompletableFuture
615 * @param fn the function to use to compute the value of
616 * the returned CompletableFuture
617 * @return the new CompletableFuture
618 */
619 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
620 BiFunction<? super T,? super U,? extends V> fn) {
621 return andFunction(other, fn, null);
622 }
623
624 /**
625 * Creates and returns a CompletableFuture that is asynchronously
626 * completed using the {@link ForkJoinPool#commonPool()} with
627 * the result of the given function of this and the other given
628 * CompletableFuture's results when both complete. If this or
629 * the other CompletableFuture complete exceptionally, then the
630 * returned CompletableFuture also does so, with a
631 * CompletionException holding the exception as its cause.
632 *
633 * @param other the other CompletableFuture
634 * @param fn the function to use to compute the value of
635 * the returned CompletableFuture
636 * @return the new CompletableFuture
637 */
638 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
639 BiFunction<? super T,? super U,? extends V> fn) {
640 return andFunction(other, fn, ForkJoinPool.commonPool());
641 }
642
643 /**
644 * Creates and returns a CompletableFuture that is
645 * asynchronously completed using the given executor with the
646 * result of the given function of this and the other given
647 * CompletableFuture's results when both complete. If this or
648 * the other CompletableFuture complete exceptionally, then the
649 * returned CompletableFuture also does so, with a
650 * CompletionException holding the exception as its cause.
651 *
652 * @param other the other CompletableFuture
653 * @param fn the function to use to compute the value of
654 * the returned CompletableFuture
655 * @param executor the executor to use for asynchronous execution
656 * @return the new CompletableFuture
657 */
658
659 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
660 BiFunction<? super T,? super U,? extends V> fn,
661 Executor executor) {
662 if (executor == null) throw new NullPointerException();
663 return andFunction(other, fn, executor);
664 }
665
666 /**
667 * Creates and returns a CompletableFuture that is completed with
668 * the results of this and the other given CompletableFuture if
669 * both complete. If this and/or the other CompletableFuture
670 * complete exceptionally, then the returned CompletableFuture
671 * also does so, with a CompletionException holding one of these
672 * exceptions as its cause.
673 *
674 * @param other the other CompletableFuture
675 * @param block the action to perform before completing the
676 * returned CompletableFuture
677 * @return the new CompletableFuture
678 */
679 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
680 BiBlock<? super T, ? super U> block) {
681 return andBlock(other, block, null);
682 }
683
684 /**
685 * Creates and returns a CompletableFuture that is completed
686 * asynchronously using the {@link ForkJoinPool#commonPool()} with
687 * the results of this and the other given CompletableFuture when
688 * both complete. If this and/or the other CompletableFuture
689 * complete exceptionally, then the returned CompletableFuture
690 * also does so, with a CompletionException holding one of these
691 * exceptions as its cause.
692 *
693 * @param other the other CompletableFuture
694 * @param block the action to perform before completing the
695 * returned CompletableFuture
696 * @return the new CompletableFuture
697 */
698 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
699 BiBlock<? super T, ? super U> block) {
700 return andBlock(other, block, ForkJoinPool.commonPool());
701 }
702
703 /**
704 * Creates and returns a CompletableFuture that is completed
705 * asynchronously using the given executor with the results of
706 * this and the other given CompletableFuture when both complete.
707 * If this and/or the other CompletableFuture complete
708 * exceptionally, then the returned CompletableFuture also does
709 * so, with a CompletionException holding one of these exceptions as
710 * its cause.
711 *
712 * @param other the other CompletableFuture
713 * @param block the action to perform before completing the
714 * returned CompletableFuture
715 * @param executor the executor to use for asynchronous execution
716 * @return the new CompletableFuture
717 */
718 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
719 BiBlock<? super T, ? super U> block,
720 Executor executor) {
721 if (executor == null) throw new NullPointerException();
722 return andBlock(other, block, executor);
723 }
724
725 /**
726 * Creates and returns a CompletableFuture that is completed
727 * when this and the other given CompletableFuture both
728 * complete. If this and/or the other CompletableFuture complete
729 * exceptionally, then the returned CompletableFuture also does
730 * so, with a CompletionException holding one of these exceptions as
731 * its cause.
732 *
733 * @param other the other CompletableFuture
734 * @param action the action to perform before completing the
735 * returned CompletableFuture
736 * @return the new CompletableFuture
737 */
738 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
739 Runnable action) {
740 return andRunnable(other, action, null);
741 }
742
743 /**
744 * Creates and returns a CompletableFuture that is completed
745 * asynchronously using the {@link ForkJoinPool#commonPool()}
746 * when this and the other given CompletableFuture both
747 * complete. If this and/or the other CompletableFuture complete
748 * exceptionally, then the returned CompletableFuture also does
749 * so, with a CompletionException holding one of these exceptions as
750 * its cause.
751 *
752 * @param other the other CompletableFuture
753 * @param action the action to perform before completing the
754 * returned CompletableFuture
755 * @return the new CompletableFuture
756 */
757 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
758 Runnable action) {
759 return andRunnable(other, action, ForkJoinPool.commonPool());
760 }
761
762 /**
763 * Creates and returns a CompletableFuture that is completed
764 * asynchronously using the given executor
765 * when this and the other given CompletableFuture both
766 * complete. If this and/or the other CompletableFuture complete
767 * exceptionally, then the returned CompletableFuture also does
768 * so, with a CompletionException holding one of these exceptions as
769 * its cause.
770 *
771 * @param other the other CompletableFuture
772 * @param action the action to perform before completing the
773 * returned CompletableFuture
774 * @param executor the executor to use for asynchronous execution
775 * @return the new CompletableFuture
776 */
777 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
778 Runnable action,
779 Executor executor) {
780 if (executor == null) throw new NullPointerException();
781 return andRunnable(other, action, executor);
782 }
783
784 /**
785 * Creates and returns a CompletableFuture that is completed with
786 * the result of the given function of either this or the other
787 * given CompletableFuture's results when either complete. If
788 * this and/or the other CompletableFuture complete exceptionally,
789 * then the returned CompletableFuture may also do so, with a
790 * CompletionException holding one of these exceptions as its cause.
791 * No guarantees are made about which result or exception is used
792 * in the returned CompletableFuture.
793 *
794 * @param other the other CompletableFuture
795 * @param fn the function to use to compute the value of
796 * the returned CompletableFuture
797 * @return the new CompletableFuture
798 */
799 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
800 Function<? super T, U> fn) {
801 return orFunction(other, fn, null);
802 }
803
804 /**
805 * Creates and returns a CompletableFuture that is completed
806 * asynchronously using the {@link ForkJoinPool#commonPool()} with
807 * the result of the given function of either this or the other
808 * given CompletableFuture's results when either complete. If
809 * this and/or the other CompletableFuture complete exceptionally,
810 * then the returned CompletableFuture may also do so, with a
811 * CompletionException holding one of these exceptions as its cause.
812 * No guarantees are made about which result or exception is used
813 * in the returned CompletableFuture.
814 *
815 * @param other the other CompletableFuture
816 * @param fn the function to use to compute the value of
817 * the returned CompletableFuture
818 * @return the new CompletableFuture
819 */
820 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
821 Function<? super T, U> fn) {
822 return orFunction(other, fn, ForkJoinPool.commonPool());
823 }
824
825 /**
826 * Creates and returns a CompletableFuture that is completed
827 * asynchronously using the given executor with the result of the
828 * given function of either this or the other given
829 * CompletableFuture's results when either complete. If this
830 * and/or the other CompletableFuture complete exceptionally, then
831 * the returned CompletableFuture may also do so, with a
832 * CompletionException holding one of these exceptions as its cause.
833 * No guarantees are made about which result or exception is used
834 * in the returned CompletableFuture.
835 *
836 * @param other the other CompletableFuture
837 * @param fn the function to use to compute the value of
838 * the returned CompletableFuture
839 * @param executor the executor to use for asynchronous execution
840 * @return the new CompletableFuture
841 */
842 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
843 Function<? super T, U> fn,
844 Executor executor) {
845 if (executor == null) throw new NullPointerException();
846 return orFunction(other, fn, executor);
847 }
848
849 /**
850 * Creates and returns a CompletableFuture that is completed after
851 * performing the given action with the result of either this or the
852 * other given CompletableFuture's result, when either complete.
853 * If this and/or the other CompletableFuture complete
854 * exceptionally, then the returned CompletableFuture may also do
855 * so, with a CompletionException holding one of these exceptions as
856 * its cause. No guarantees are made about which exception is
857 * used in the returned CompletableFuture.
858 *
859 * @param other the other CompletableFuture
860 * @param block the action to perform before completing the
861 * returned CompletableFuture
862 * @return the new CompletableFuture
863 */
864 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
865 Block<? super T> block) {
866 return orBlock(other, block, null);
867 }
868
869 /**
870 * Creates and returns a CompletableFuture that is completed
871 * asynchronously using the {@link ForkJoinPool#commonPool()},
872 * performing the given action with the result of either this or
873 * the other given CompletableFuture's result, when either
874 * complete. If this and/or the other CompletableFuture complete
875 * exceptionally, then the returned CompletableFuture may also do
876 * so, with a CompletionException holding one of these exceptions as
877 * its cause. No guarantees are made about which exception is
878 * used in the returned CompletableFuture.
879 *
880 * @param other the other CompletableFuture
881 * @param block the action to perform before completing the
882 * returned CompletableFuture
883 * @return the new CompletableFuture
884 */
885 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
886 Block<? super T> block) {
887 return orBlock(other, block, ForkJoinPool.commonPool());
888 }
889
890 /**
891 * Creates and returns a CompletableFuture that is completed
892 * asynchronously using the given executor,
893 * performing the given action with the result of either this or
894 * the other given CompletableFuture's result, when either
895 * complete. If this and/or the other CompletableFuture complete
896 * exceptionally, then the returned CompletableFuture may also do
897 * so, with a CompletionException holding one of these exceptions as
898 * its cause. No guarantees are made about which exception is
899 * used in the returned CompletableFuture.
900 *
901 * @param other the other CompletableFuture
902 * @param block the action to perform before completing the
903 * returned CompletableFuture
904 * @param executor the executor to use for asynchronous execution
905 * @return the new CompletableFuture
906 */
907 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
908 Block<? super T> block,
909 Executor executor) {
910 if (executor == null) throw new NullPointerException();
911 return orBlock(other, block, executor);
912 }
913
914 /**
915 * Creates and returns a CompletableFuture that is completed
916 * after this or the other given CompletableFuture complete. If
917 * this and/or the other CompletableFuture complete exceptionally,
918 * then the returned CompletableFuture may also do so, with a
919 * CompletionException holding one of these exceptions as its cause.
920 * No guarantees are made about which exception is used in the
921 * returned CompletableFuture.
922 *
923 * @param other the other CompletableFuture
924 * @param action the action to perform before completing the
925 * returned CompletableFuture
926 * @return the new CompletableFuture
927 */
928 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
929 Runnable action) {
930 return orRunnable(other, action, null);
931 }
932
933 /**
934 * Creates and returns a CompletableFuture that is completed
935 * asynchronously using the {@link ForkJoinPool#commonPool()}
936 * after this or the other given CompletableFuture complete. If
937 * this and/or the other CompletableFuture complete exceptionally,
938 * then the returned CompletableFuture may also do so, with a
939 * CompletionException holding one of these exceptions as its cause.
940 * No guarantees are made about which exception is used in the
941 * returned CompletableFuture.
942 *
943 * @param other the other CompletableFuture
944 * @param action the action to perform before completing the
945 * returned CompletableFuture
946 * @return the new CompletableFuture
947 */
948 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
949 Runnable action) {
950 return orRunnable(other, action, ForkJoinPool.commonPool());
951 }
952
953 /**
954 * Creates and returns a CompletableFuture that is completed
955 * asynchronously using the given executor after this or the other
956 * given CompletableFuture complete. If this and/or the other
957 * CompletableFuture complete exceptionally, then the returned
958 * CompletableFuture may also do so, with a CompletionException
959 * holding one of these exceptions as its cause. No guarantees are
960 * made about which exception is used in the returned
961 * CompletableFuture.
962 *
963 * @param other the other CompletableFuture
964 * @param action the action to perform before completing the
965 * returned CompletableFuture
966 * @param executor the executor to use for asynchronous execution
967 * @return the new CompletableFuture
968 */
969 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
970 Runnable action,
971 Executor executor) {
972 if (executor == null) throw new NullPointerException();
973 return orRunnable(other, action, executor);
974 }
975
976 /**
977 * Returns a CompletableFuture (or an equivalent one) produced by
978 * the given function of the result of this CompletableFuture when
979 * completed. If this CompletableFuture completes exceptionally,
980 * then the returned CompletableFuture also does so, with a
981 * CompletionException holding this exception as its cause.
982 *
983 * @param fn the function returning a new CompletableFuture.
984 * @return the CompletableFuture, that {@code isDone()} upon
985 * return if completed by the given function, or an exception
986 * occurs.
987 */
988 public <U> CompletableFuture<U> thenCompose(Function<? super T,
989 CompletableFuture<U>> fn) {
990 if (fn == null) throw new NullPointerException();
991 CompletableFuture<U> dst = null;
992 ThenCompose<T,U> d = null;
993 Object r;
994 if ((r = result) == null) {
995 dst = new CompletableFuture<U>();
996 CompletionNode p = new CompletionNode
997 (d = new ThenCompose<T,U>(this, fn, dst));
998 while ((r = result) == null) {
999 if (UNSAFE.compareAndSwapObject
1000 (this, COMPLETIONS, p.next = completions, p))
1001 break;
1002 }
1003 }
1004 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1005 T t; Throwable ex;
1006 if (r instanceof AltResult) {
1007 ex = ((AltResult)r).ex;
1008 t = null;
1009 }
1010 else {
1011 ex = null;
1012 t = castResult(r);
1013 }
1014 if (ex == null) {
1015 try {
1016 dst = fn.apply(t);
1017 } catch (Throwable rex) {
1018 ex = rex;
1019 }
1020 }
1021 if (dst == null) {
1022 dst = new CompletableFuture<U>();
1023 if (ex == null)
1024 ex = new NullPointerException();
1025 }
1026 if (ex != null)
1027 dst.internalComplete(null, ex);
1028 }
1029 helpPostComplete();
1030 dst.helpPostComplete();
1031 return dst;
1032 }
1033
1034 /**
1035 * Creates and returns a CompletableFuture that is completed with
1036 * the result of the given function of the exception triggering
1037 * this CompletableFuture's completion when it completes
1038 * exceptionally; Otherwise, if this CompletableFuture completes
1039 * normally, then the returned CompletableFuture also completes
1040 * normally with the same value.
1041 *
1042 * @param fn the function to use to compute the value of the
1043 * returned CompletableFuture if this CompletableFuture completed
1044 * exceptionally
1045 * @return the new CompletableFuture
1046 */
1047 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
1048 if (fn == null) throw new NullPointerException();
1049 CompletableFuture<T> dst = new CompletableFuture<T>();
1050 ExceptionAction<T> d = null;
1051 Object r;
1052 if ((r = result) == null) {
1053 CompletionNode p =
1054 new CompletionNode(d = new ExceptionAction<T>(this, fn, dst));
1055 while ((r = result) == null) {
1056 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1057 p.next = completions, p))
1058 break;
1059 }
1060 }
1061 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1062 T t = null; Throwable ex, dx = null;
1063 if (r instanceof AltResult) {
1064 if ((ex = ((AltResult)r).ex) != null) {
1065 try {
1066 t = fn.apply(ex);
1067 } catch (Throwable rex) {
1068 dx = rex;
1069 }
1070 }
1071 }
1072 else
1073 t = castResult(r);
1074 dst.internalComplete(t, dx);
1075 }
1076 helpPostComplete();
1077 return dst;
1078 }
1079
1080 /**
1081 * Creates and returns a CompletableFuture that is completed with
1082 * the result of the given function of the result and exception of
1083 * this CompletableFuture's completion when it completes. The
1084 * given function is invoked with the result (or {@code null} if
1085 * none) and the exception (or {@code null} if none) of this
1086 * CompletableFuture when complete.
1087 *
1088 * @param fn the function to use to compute the value of the
1089 * returned CompletableFuture
1090
1091 * @return the new CompletableFuture
1092 */
1093 public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
1094 if (fn == null) throw new NullPointerException();
1095 CompletableFuture<U> dst = new CompletableFuture<U>();
1096 ThenHandle<T,U> d = null;
1097 Object r;
1098 if ((r = result) == null) {
1099 CompletionNode p =
1100 new CompletionNode(d = new ThenHandle<T,U>(this, fn, dst));
1101 while ((r = result) == null) {
1102 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1103 p.next = completions, p))
1104 break;
1105 }
1106 }
1107 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1108 T t; Throwable ex;
1109 if (r instanceof AltResult) {
1110 ex = ((AltResult)r).ex;
1111 t = null;
1112 }
1113 else {
1114 ex = null;
1115 t = castResult(r);
1116 }
1117 U u = null; Throwable dx = null;
1118 try {
1119 u = fn.apply(t, ex);
1120 } catch (Throwable rex) {
1121 dx = rex;
1122 }
1123 dst.internalComplete(u, dx);
1124 }
1125 helpPostComplete();
1126 return dst;
1127 }
1128
1129 /**
1130 * Attempts to complete this CompletableFuture with
1131 * a {@link CancellationException}.
1132 *
1133 * @param mayInterruptIfRunning this value has no effect in this
1134 * implementation because interrupts are not used to control
1135 * processing.
1136 *
1137 * @return {@code true} if this task is now cancelled
1138 */
1139 public boolean cancel(boolean mayInterruptIfRunning) {
1140 Object r;
1141 while ((r = result) == null) {
1142 r = new AltResult(new CancellationException());
1143 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
1144 postComplete();
1145 return true;
1146 }
1147 }
1148 return ((r instanceof AltResult) &&
1149 (((AltResult)r).ex instanceof CancellationException));
1150 }
1151
1152 /**
1153 * Returns {@code true} if this CompletableFuture was cancelled
1154 * before it completed normally.
1155 *
1156 * @return {@code true} if this CompletableFuture was cancelled
1157 * before it completed normally
1158 */
1159 public boolean isCancelled() {
1160 Object r;
1161 return ((r = result) != null &&
1162 (r instanceof AltResult) &&
1163 (((AltResult)r).ex instanceof CancellationException));
1164 }
1165
1166 /**
1167 * Forcibly sets or resets the value subsequently returned by
1168 * method get() and related methods, whether or not already
1169 * completed. This method is designed for use only in error
1170 * recovery actions, and even in such situations may result in
1171 * ongoing dependent completions using established versus
1172 * overwritten values.
1173 *
1174 * @param value the completion value
1175 */
1176 public void obtrudeValue(T value) {
1177 result = (value == null) ? NIL : value;
1178 postComplete();
1179 }
1180
1181 /* ------------- waiting for completions -------------- */
1182
1183 /**
1184 * Heuristic spin value for waitingGet() before blocking on
1185 * multiprocessors
1186 */
1187 static final int WAITING_GET_SPINS = 256;
1188
1189 /**
1190 * Returns raw result after waiting, or null if interruptible and
1191 * interrupted.
1192 */
1193 private Object waitingGet(boolean interruptible) {
1194 WaitNode q = null;
1195 boolean queued = false, interrupted = false;
1196 int h = 0, spins = 0;
1197 for (Object r;;) {
1198 if ((r = result) != null) {
1199 Throwable ex;
1200 if (q != null) // suppress unpark
1201 q.thread = null;
1202 postComplete(); // help release others
1203 if (interrupted)
1204 Thread.currentThread().interrupt();
1205 return r;
1206 }
1207 else if (h == 0) {
1208 h = ThreadLocalRandom.current().nextInt();
1209 if (Runtime.getRuntime().availableProcessors() > 1)
1210 spins = WAITING_GET_SPINS;
1211 }
1212 else if (spins > 0) {
1213 h ^= h << 1; // xorshift
1214 h ^= h >>> 3;
1215 if ((h ^= h << 10) >= 0)
1216 --spins;
1217 }
1218 else if (q == null)
1219 q = new WaitNode();
1220 else if (!queued)
1221 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1222 q.next = waiters, q);
1223 else if (Thread.interrupted()) {
1224 if (interruptible) {
1225 removeWaiter(q);
1226 return null;
1227 }
1228 interrupted = true;
1229 }
1230 else if (q.thread == null)
1231 q.thread = Thread.currentThread();
1232 else
1233 LockSupport.park(this);
1234 }
1235 }
1236
1237 /**
1238 * Awaits completion or aborts on interrupt or timeout.
1239 *
1240 * @param nanos time to wait
1241 * @return raw result
1242 */
1243 private Object timedAwaitDone(long nanos)
1244 throws InterruptedException, TimeoutException {
1245 final long deadline = System.nanoTime() + nanos;
1246 WaitNode q = null;
1247 boolean queued = false;
1248 for (Object r;;) {
1249 if (Thread.interrupted()) {
1250 removeWaiter(q);
1251 throw new InterruptedException();
1252 }
1253 else if ((r = result) != null) {
1254 if (q != null)
1255 q.thread = null;
1256 postComplete();
1257 return r;
1258 }
1259 else if (q == null)
1260 q = new WaitNode();
1261 else if (!queued)
1262 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1263 q.next = waiters, q);
1264 else if ((nanos = deadline - System.nanoTime()) <= 0L) {
1265 removeWaiter(q);
1266 throw new TimeoutException();
1267 }
1268 else if (q.thread == null)
1269 q.thread = Thread.currentThread();
1270 else
1271 LockSupport.parkNanos(this, nanos);
1272 }
1273 }
1274
1275 /**
1276 * Tries to unlink a timed-out or interrupted wait node to avoid
1277 * accumulating garbage. Internal nodes are simply unspliced
1278 * without CAS since it is harmless if they are traversed anyway
1279 * by releasers. To avoid effects of unsplicing from already
1280 * removed nodes, the list is retraversed in case of an apparent
1281 * race. This is slow when there are a lot of nodes, but we don't
1282 * expect lists to be long enough to outweigh higher-overhead
1283 * schemes.
1284 */
1285 private void removeWaiter(WaitNode node) {
1286 if (node != null) {
1287 node.thread = null;
1288 retry:
1289 for (;;) { // restart on removeWaiter race
1290 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
1291 s = q.next;
1292 if (q.thread != null)
1293 pred = q;
1294 else if (pred != null) {
1295 pred.next = s;
1296 if (pred.thread == null) // check for race
1297 continue retry;
1298 }
1299 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
1300 continue retry;
1301 }
1302 break;
1303 }
1304 }
1305 }
1306
1307 /* ------------- Async tasks -------------- */
1308
1309 /** Base class can act as either FJ or plain Runnable */
1310 static abstract class Async extends ForkJoinTask<Void> implements Runnable {
1311 public final Void getRawResult() { return null; }
1312 public final void setRawResult(Void v) { }
1313 public final void run() { exec(); }
1314 }
1315
1316 static final class AsyncRunnable extends Async {
1317 final Runnable fn;
1318 final CompletableFuture<Void> dst;
1319 AsyncRunnable(Runnable fn, CompletableFuture<Void> dst) {
1320 this.fn = fn; this.dst = dst;
1321 }
1322 public final boolean exec() {
1323 CompletableFuture<Void> d; Throwable ex;
1324 if ((d = this.dst) != null) {
1325 try {
1326 fn.run();
1327 ex = null;
1328 } catch (Throwable rex) {
1329 ex = rex;
1330 }
1331 d.internalComplete(null, ex);
1332 }
1333 return true;
1334 }
1335 private static final long serialVersionUID = 5232453952276885070L;
1336 }
1337
1338 static final class AsyncSupplier<U> extends Async {
1339 final Supplier<U> fn;
1340 final CompletableFuture<U> dst;
1341 AsyncSupplier(Supplier<U> fn, CompletableFuture<U> dst) {
1342 this.fn = fn; this.dst = dst;
1343 }
1344 public final boolean exec() {
1345 CompletableFuture<U> d; U u; Throwable ex;
1346 if ((d = this.dst) != null) {
1347 try {
1348 u = fn.get();
1349 ex = null;
1350 } catch (Throwable rex) {
1351 ex = rex;
1352 u = null;
1353 }
1354 d.internalComplete(u, ex);
1355 }
1356 return true;
1357 }
1358 private static final long serialVersionUID = 5232453952276885070L;
1359 }
1360
1361 static final class AsyncFunction<T,U> extends Async {
1362 Function<? super T,? extends U> fn;
1363 T arg;
1364 final CompletableFuture<U> dst;
1365 AsyncFunction(T arg, Function<? super T,? extends U> fn,
1366 CompletableFuture<U> dst) {
1367 this.arg = arg; this.fn = fn; this.dst = dst;
1368 }
1369 public final boolean exec() {
1370 CompletableFuture<U> d; U u; Throwable ex;
1371 if ((d = this.dst) != null) {
1372 try {
1373 u = fn.apply(arg);
1374 ex = null;
1375 } catch (Throwable rex) {
1376 ex = rex;
1377 u = null;
1378 }
1379 d.internalComplete(u, ex);
1380 }
1381 return true;
1382 }
1383 private static final long serialVersionUID = 5232453952276885070L;
1384 }
1385
1386 static final class AsyncBiFunction<T,U,V> extends Async {
1387 final BiFunction<? super T,? super U,? extends V> fn;
1388 final T arg1;
1389 final U arg2;
1390 final CompletableFuture<V> dst;
1391 AsyncBiFunction(T arg1, U arg2,
1392 BiFunction<? super T,? super U,? extends V> fn,
1393 CompletableFuture<V> dst) {
1394 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1395 }
1396 public final boolean exec() {
1397 CompletableFuture<V> d; V v; Throwable ex;
1398 if ((d = this.dst) != null) {
1399 try {
1400 v = fn.apply(arg1, arg2);
1401 ex = null;
1402 } catch (Throwable rex) {
1403 ex = rex;
1404 v = null;
1405 }
1406 d.internalComplete(v, ex);
1407 }
1408 return true;
1409 }
1410 private static final long serialVersionUID = 5232453952276885070L;
1411 }
1412
1413 static final class AsyncBlock<T> extends Async {
1414 Block<? super T> fn;
1415 T arg;
1416 final CompletableFuture<Void> dst;
1417 AsyncBlock(T arg, Block<? super T> fn,
1418 CompletableFuture<Void> dst) {
1419 this.arg = arg; this.fn = fn; this.dst = dst;
1420 }
1421 public final boolean exec() {
1422 CompletableFuture<Void> d; Throwable ex;
1423 if ((d = this.dst) != null) {
1424 try {
1425 fn.accept(arg);
1426 ex = null;
1427 } catch (Throwable rex) {
1428 ex = rex;
1429 }
1430 d.internalComplete(null, ex);
1431 }
1432 return true;
1433 }
1434 private static final long serialVersionUID = 5232453952276885070L;
1435 }
1436
1437 static final class AsyncBiBlock<T,U> extends Async {
1438 final BiBlock<? super T,? super U> fn;
1439 final T arg1;
1440 final U arg2;
1441 final CompletableFuture<Void> dst;
1442 AsyncBiBlock(T arg1, U arg2,
1443 BiBlock<? super T,? super U> fn,
1444 CompletableFuture<Void> dst) {
1445 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1446 }
1447 public final boolean exec() {
1448 CompletableFuture<Void> d; Throwable ex;
1449 if ((d = this.dst) != null) {
1450 try {
1451 fn.accept(arg1, arg2);
1452 ex = null;
1453 } catch (Throwable rex) {
1454 ex = rex;
1455 }
1456 d.internalComplete(null, ex);
1457 }
1458 return true;
1459 }
1460 private static final long serialVersionUID = 5232453952276885070L;
1461 }
1462
1463 /* ------------- Completions -------------- */
1464
1465 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1466 static abstract class Completion extends AtomicInteger implements Runnable {
1467 }
1468
1469 static final class ThenFunction<T,U> extends Completion {
1470 final CompletableFuture<? extends T> src;
1471 final Function<? super T,? extends U> fn;
1472 final CompletableFuture<U> dst;
1473 final Executor executor;
1474 ThenFunction(CompletableFuture<? extends T> src,
1475 final Function<? super T,? extends U> fn,
1476 final CompletableFuture<U> dst, Executor executor) {
1477 this.src = src; this.fn = fn; this.dst = dst;
1478 this.executor = executor;
1479 }
1480 public final void run() {
1481 CompletableFuture<? extends T> a;
1482 Function<? super T,? extends U> fn;
1483 CompletableFuture<U> dst;
1484 Object r; T t; Throwable ex;
1485 if ((dst = this.dst) != null &&
1486 (fn = this.fn) != null &&
1487 (a = this.src) != null &&
1488 (r = a.result) != null &&
1489 compareAndSet(0, 1)) {
1490 if (r instanceof AltResult) {
1491 ex = ((AltResult)r).ex;
1492 t = null;
1493 }
1494 else {
1495 ex = null;
1496 t = a.castResult(r);
1497 }
1498 Executor e = executor;
1499 U u = null;
1500 if (ex == null) {
1501 try {
1502 if (e != null)
1503 e.execute(new AsyncFunction<T,U>(t, fn, dst));
1504 else
1505 u = fn.apply(t);
1506 } catch (Throwable rex) {
1507 ex = rex;
1508 }
1509 }
1510 if (e == null || ex != null)
1511 dst.internalComplete(u, ex);
1512 }
1513 }
1514 private static final long serialVersionUID = 5232453952276885070L;
1515 }
1516
1517 static final class ThenBlock<T> extends Completion {
1518 final CompletableFuture<? extends T> src;
1519 final Block<? super T> fn;
1520 final CompletableFuture<Void> dst;
1521 final Executor executor;
1522 ThenBlock(CompletableFuture<? extends T> src,
1523 final Block<? super T> fn,
1524 final CompletableFuture<Void> dst, Executor executor) {
1525 this.src = src; this.fn = fn; this.dst = dst;
1526 this.executor = executor;
1527 }
1528 public final void run() {
1529 CompletableFuture<? extends T> a;
1530 Block<? super T> fn;
1531 CompletableFuture<Void> dst;
1532 Object r; T t; Throwable ex;
1533 if ((dst = this.dst) != null &&
1534 (fn = this.fn) != null &&
1535 (a = this.src) != null &&
1536 (r = a.result) != null &&
1537 compareAndSet(0, 1)) {
1538 if (r instanceof AltResult) {
1539 ex = ((AltResult)r).ex;
1540 t = null;
1541 }
1542 else {
1543 ex = null;
1544 t = a.castResult(r);
1545 }
1546 Executor e = executor;
1547 if (ex == null) {
1548 try {
1549 if (e != null)
1550 e.execute(new AsyncBlock<T>(t, fn, dst));
1551 else
1552 fn.accept(t);
1553 } catch (Throwable rex) {
1554 ex = rex;
1555 }
1556 }
1557 if (e == null || ex != null)
1558 dst.internalComplete(null, ex);
1559 }
1560 }
1561 private static final long serialVersionUID = 5232453952276885070L;
1562 }
1563
1564 static final class ThenRunnable<T> extends Completion {
1565 final CompletableFuture<? extends T> src;
1566 final Runnable fn;
1567 final CompletableFuture<Void> dst;
1568 final Executor executor;
1569 ThenRunnable(CompletableFuture<? extends T> src,
1570 Runnable fn,
1571 CompletableFuture<Void> dst,
1572 Executor executor) {
1573 this.src = src; this.fn = fn; this.dst = dst;
1574 this.executor = executor;
1575 }
1576 public final void run() {
1577 CompletableFuture<? extends T> a;
1578 Runnable fn;
1579 CompletableFuture<Void> dst;
1580 Object r; Throwable ex;
1581 if ((dst = this.dst) != null &&
1582 (fn = this.fn) != null &&
1583 (a = this.src) != null &&
1584 (r = a.result) != null &&
1585 compareAndSet(0, 1)) {
1586 if (r instanceof AltResult)
1587 ex = ((AltResult)r).ex;
1588 else
1589 ex = null;
1590 Executor e = executor;
1591 if (ex == null) {
1592 try {
1593 if (e != null)
1594 e.execute(new AsyncRunnable(fn, dst));
1595 else
1596 fn.run();
1597 } catch (Throwable rex) {
1598 ex = rex;
1599 }
1600 }
1601 if (e == null || ex != null)
1602 dst.internalComplete(null, ex);
1603 }
1604 }
1605 private static final long serialVersionUID = 5232453952276885070L;
1606 }
1607
1608 static final class AndFunction<T,U,V> extends Completion {
1609 final CompletableFuture<? extends T> src;
1610 final CompletableFuture<? extends U> snd;
1611 final BiFunction<? super T,? super U,? extends V> fn;
1612 final CompletableFuture<V> dst;
1613 final Executor executor;
1614 AndFunction(CompletableFuture<? extends T> src,
1615 CompletableFuture<? extends U> snd,
1616 BiFunction<? super T,? super U,? extends V> fn,
1617 CompletableFuture<V> dst, Executor executor) {
1618 this.src = src; this.snd = snd;
1619 this.fn = fn; this.dst = dst;
1620 this.executor = executor;
1621 }
1622 public final void run() {
1623 Object r, s; T t; U u; Throwable ex;
1624 CompletableFuture<? extends T> a;
1625 CompletableFuture<? extends U> b;
1626 BiFunction<? super T,? super U,? extends V> fn;
1627 CompletableFuture<V> dst;
1628 if ((dst = this.dst) != null &&
1629 (fn = this.fn) != null &&
1630 (a = this.src) != null &&
1631 (r = a.result) != null &&
1632 (b = this.snd) != null &&
1633 (s = b.result) != null &&
1634 compareAndSet(0, 1)) {
1635 if (r instanceof AltResult) {
1636 ex = ((AltResult)r).ex;
1637 t = null;
1638 }
1639 else {
1640 ex = null;
1641 t = a.castResult(r);
1642 }
1643 if (ex != null)
1644 u = null;
1645 else if (s instanceof AltResult) {
1646 ex = ((AltResult)s).ex;
1647 u = null;
1648 }
1649 else
1650 u = b.castResult(s);
1651 Executor e = executor;
1652 V v = null;
1653 if (ex == null) {
1654 try {
1655 if (e != null)
1656 e.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1657 else
1658 v = fn.apply(t, u);
1659 } catch (Throwable rex) {
1660 ex = rex;
1661 }
1662 }
1663 if (e == null || ex != null)
1664 dst.internalComplete(v, ex);
1665 }
1666 }
1667 private static final long serialVersionUID = 5232453952276885070L;
1668 }
1669
1670 static final class AndBlock<T,U> extends Completion {
1671 final CompletableFuture<? extends T> src;
1672 final CompletableFuture<? extends U> snd;
1673 final BiBlock<? super T,? super U> fn;
1674 final CompletableFuture<Void> dst;
1675 final Executor executor;
1676 AndBlock(CompletableFuture<? extends T> src,
1677 CompletableFuture<? extends U> snd,
1678 BiBlock<? super T,? super U> fn,
1679 CompletableFuture<Void> dst, Executor executor) {
1680 this.src = src; this.snd = snd;
1681 this.fn = fn; this.dst = dst;
1682 this.executor = executor;
1683 }
1684 public final void run() {
1685 Object r, s; T t; U u; Throwable ex;
1686 CompletableFuture<? extends T> a;
1687 CompletableFuture<? extends U> b;
1688 BiBlock<? super T,? super U> fn;
1689 CompletableFuture<Void> dst;
1690 if ((dst = this.dst) != null &&
1691 (fn = this.fn) != null &&
1692 (a = this.src) != null &&
1693 (r = a.result) != null &&
1694 (b = this.snd) != null &&
1695 (s = b.result) != null &&
1696 compareAndSet(0, 1)) {
1697 if (r instanceof AltResult) {
1698 ex = ((AltResult)r).ex;
1699 t = null;
1700 }
1701 else {
1702 ex = null;
1703 t = a.castResult(r);
1704 }
1705 if (ex != null)
1706 u = null;
1707 else if (s instanceof AltResult) {
1708 ex = ((AltResult)s).ex;
1709 u = null;
1710 }
1711 else
1712 u = b.castResult(s);
1713 Executor e = executor;
1714 if (ex == null) {
1715 try {
1716 if (e != null)
1717 e.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
1718 else
1719 fn.accept(t, u);
1720 } catch (Throwable rex) {
1721 ex = rex;
1722 }
1723 }
1724 if (e == null || ex != null)
1725 dst.internalComplete(null, ex);
1726 }
1727 }
1728 private static final long serialVersionUID = 5232453952276885070L;
1729 }
1730
1731 static final class AndRunnable<T> extends Completion {
1732 final CompletableFuture<? extends T> src;
1733 final CompletableFuture<?> snd;
1734 final Runnable fn;
1735 final CompletableFuture<Void> dst;
1736 final Executor executor;
1737 AndRunnable(CompletableFuture<? extends T> src,
1738 CompletableFuture<?> snd,
1739 Runnable fn,
1740 CompletableFuture<Void> dst, Executor executor) {
1741 this.src = src; this.snd = snd;
1742 this.fn = fn; this.dst = dst;
1743 this.executor = executor;
1744 }
1745 public final void run() {
1746 Object r, s; Throwable ex;
1747 final CompletableFuture<? extends T> a;
1748 final CompletableFuture<?> b;
1749 final Runnable fn;
1750 final CompletableFuture<Void> dst;
1751 if ((dst = this.dst) != null &&
1752 (fn = this.fn) != null &&
1753 (a = this.src) != null &&
1754 (r = a.result) != null &&
1755 (b = this.snd) != null &&
1756 (s = b.result) != null &&
1757 compareAndSet(0, 1)) {
1758 if (r instanceof AltResult)
1759 ex = ((AltResult)r).ex;
1760 else
1761 ex = null;
1762 if (ex == null && (s instanceof AltResult))
1763 ex = ((AltResult)s).ex;
1764 Executor e = executor;
1765 if (ex == null) {
1766 try {
1767 if (e != null)
1768 e.execute(new AsyncRunnable(fn, dst));
1769 else
1770 fn.run();
1771 } catch (Throwable rex) {
1772 ex = rex;
1773 }
1774 }
1775 if (e == null || ex != null)
1776 dst.internalComplete(null, ex);
1777 }
1778 }
1779 private static final long serialVersionUID = 5232453952276885070L;
1780 }
1781
1782 static final class OrFunction<T,U> extends Completion {
1783 final CompletableFuture<? extends T> src;
1784 final CompletableFuture<? extends T> snd;
1785 final Function<? super T,? extends U> fn;
1786 final CompletableFuture<U> dst;
1787 final Executor executor;
1788 OrFunction(CompletableFuture<? extends T> src,
1789 CompletableFuture<? extends T> snd,
1790 Function<? super T,? extends U> fn,
1791 CompletableFuture<U> dst, Executor executor) {
1792 this.src = src; this.snd = snd;
1793 this.fn = fn; this.dst = dst;
1794 this.executor = executor;
1795 }
1796 public final void run() {
1797 Object r; T t; Throwable ex;
1798 CompletableFuture<? extends T> a;
1799 CompletableFuture<? extends T> b;
1800 Function<? super T,? extends U> fn;
1801 CompletableFuture<U> dst;
1802 if ((dst = this.dst) != null &&
1803 (fn = this.fn) != null &&
1804 (((a = this.src) != null && (r = a.result) != null) ||
1805 ((b = this.snd) != null && (r = b.result) != null)) &&
1806 compareAndSet(0, 1)) {
1807 if (r instanceof AltResult) {
1808 ex = ((AltResult)r).ex;
1809 t = null;
1810 }
1811 else {
1812 ex = null;
1813 t = a.castResult(r);
1814 }
1815 Executor e = executor;
1816 U u = null;
1817 if (ex == null) {
1818 try {
1819 if (e != null)
1820 e.execute(new AsyncFunction<T,U>(t, fn, dst));
1821 else
1822 u = fn.apply(t);
1823 } catch (Throwable rex) {
1824 ex = rex;
1825 }
1826 }
1827 if (e == null || ex != null)
1828 dst.internalComplete(u, ex);
1829 }
1830 }
1831 private static final long serialVersionUID = 5232453952276885070L;
1832 }
1833
1834 static final class OrBlock<T> extends Completion {
1835 final CompletableFuture<? extends T> src;
1836 final CompletableFuture<? extends T> snd;
1837 final Block<? super T> fn;
1838 final CompletableFuture<Void> dst;
1839 final Executor executor;
1840 OrBlock(CompletableFuture<? extends T> src,
1841 CompletableFuture<? extends T> snd,
1842 Block<? super T> fn,
1843 CompletableFuture<Void> dst, Executor executor) {
1844 this.src = src; this.snd = snd;
1845 this.fn = fn; this.dst = dst;
1846 this.executor = executor;
1847 }
1848 public final void run() {
1849 Object r; T t; Throwable ex;
1850 CompletableFuture<? extends T> a;
1851 CompletableFuture<? extends T> b;
1852 Block<? super T> fn;
1853 CompletableFuture<Void> dst;
1854 if ((dst = this.dst) != null &&
1855 (fn = this.fn) != null &&
1856 (((a = this.src) != null && (r = a.result) != null) ||
1857 ((b = this.snd) != null && (r = b.result) != null)) &&
1858 compareAndSet(0, 1)) {
1859 if (r instanceof AltResult) {
1860 ex = ((AltResult)r).ex;
1861 t = null;
1862 }
1863 else {
1864 ex = null;
1865 t = a.castResult(r);
1866 }
1867 Executor e = executor;
1868 if (ex == null) {
1869 try {
1870 if (e != null)
1871 e.execute(new AsyncBlock<T>(t, fn, dst));
1872 else
1873 fn.accept(t);
1874 } catch (Throwable rex) {
1875 ex = rex;
1876 }
1877 }
1878 if (e == null || ex != null)
1879 dst.internalComplete(null, ex);
1880 }
1881 }
1882 private static final long serialVersionUID = 5232453952276885070L;
1883 }
1884
1885 static final class OrRunnable<T> extends Completion {
1886 final CompletableFuture<? extends T> src;
1887 final CompletableFuture<?> snd;
1888 final Runnable fn;
1889 final CompletableFuture<Void> dst;
1890 final Executor executor;
1891 OrRunnable(CompletableFuture<? extends T> src,
1892 CompletableFuture<?> snd,
1893 Runnable fn,
1894 CompletableFuture<Void> dst, Executor executor) {
1895 this.src = src; this.snd = snd;
1896 this.fn = fn; this.dst = dst;
1897 this.executor = executor;
1898 }
1899 public final void run() {
1900 Object r; Throwable ex;
1901 CompletableFuture<? extends T> a;
1902 final CompletableFuture<?> b;
1903 final Runnable fn;
1904 final CompletableFuture<Void> dst;
1905 if ((dst = this.dst) != null &&
1906 (fn = this.fn) != null &&
1907 (((a = this.src) != null && (r = a.result) != null) ||
1908 ((b = this.snd) != null && (r = b.result) != null)) &&
1909 compareAndSet(0, 1)) {
1910 if (r instanceof AltResult)
1911 ex = ((AltResult)r).ex;
1912 else
1913 ex = null;
1914 Executor e = executor;
1915 if (ex == null) {
1916 try {
1917 if (e != null)
1918 e.execute(new AsyncRunnable(fn, dst));
1919 else
1920 fn.run();
1921 } catch (Throwable rex) {
1922 ex = rex;
1923 }
1924 }
1925 if (e == null || ex != null)
1926 dst.internalComplete(null, ex);
1927 }
1928 }
1929 private static final long serialVersionUID = 5232453952276885070L;
1930 }
1931
1932 static final class ExceptionAction<T> extends Completion {
1933 final CompletableFuture<? extends T> src;
1934 final Function<? super Throwable, ? extends T> fn;
1935 final CompletableFuture<T> dst;
1936 ExceptionAction(CompletableFuture<? extends T> src,
1937 Function<? super Throwable, ? extends T> fn,
1938 CompletableFuture<T> dst) {
1939 this.src = src; this.fn = fn; this.dst = dst;
1940 }
1941 public final void run() {
1942 CompletableFuture<? extends T> a;
1943 Function<? super Throwable, ? extends T> fn;
1944 CompletableFuture<T> dst;
1945 Object r; T t = null; Throwable ex, dx = null;
1946 if ((dst = this.dst) != null &&
1947 (fn = this.fn) != null &&
1948 (a = this.src) != null &&
1949 (r = a.result) != null &&
1950 compareAndSet(0, 1)) {
1951 if ((r instanceof AltResult) &&
1952 (ex = ((AltResult)r).ex) != null) {
1953 try {
1954 t = fn.apply(ex);
1955 } catch (Throwable rex) {
1956 dx = rex;
1957 }
1958 }
1959 else
1960 t = a.castResult(r);
1961 dst.internalComplete(t, dx);
1962 }
1963 }
1964 private static final long serialVersionUID = 5232453952276885070L;
1965 }
1966
1967 static final class ThenCopy<T> extends Completion {
1968 final CompletableFuture<? extends T> src;
1969 final CompletableFuture<T> dst;
1970 ThenCopy(CompletableFuture<? extends T> src,
1971 CompletableFuture<T> dst) {
1972 this.src = src; this.dst = dst;
1973 }
1974 public final void run() {
1975 CompletableFuture<? extends T> a;
1976 CompletableFuture<T> dst;
1977 Object r; Object t; Throwable ex;
1978 if ((dst = this.dst) != null &&
1979 (a = this.src) != null &&
1980 (r = a.result) != null &&
1981 compareAndSet(0, 1)) {
1982 if (r instanceof AltResult) {
1983 ex = ((AltResult)r).ex;
1984 t = null;
1985 }
1986 else {
1987 ex = null;
1988 t = r;
1989 }
1990 dst.internalComplete(t, ex);
1991 }
1992 }
1993 private static final long serialVersionUID = 5232453952276885070L;
1994 }
1995
1996 static final class ThenHandle<T,U> extends Completion {
1997 final CompletableFuture<? extends T> src;
1998 final BiFunction<? super T, Throwable, ? extends U> fn;
1999 final CompletableFuture<U> dst;
2000 ThenHandle(CompletableFuture<? extends T> src,
2001 BiFunction<? super T, Throwable, ? extends U> fn,
2002 final CompletableFuture<U> dst) {
2003 this.src = src; this.fn = fn; this.dst = dst;
2004 }
2005 public final void run() {
2006 CompletableFuture<? extends T> a;
2007 BiFunction<? super T, Throwable, ? extends U> fn;
2008 CompletableFuture<U> dst;
2009 Object r; T t; Throwable ex;
2010 if ((dst = this.dst) != null &&
2011 (fn = this.fn) != null &&
2012 (a = this.src) != null &&
2013 (r = a.result) != null &&
2014 compareAndSet(0, 1)) {
2015 if (r instanceof AltResult) {
2016 ex = ((AltResult)r).ex;
2017 t = null;
2018 }
2019 else {
2020 ex = null;
2021 t = a.castResult(r);
2022 }
2023 U u = null; Throwable dx = null;
2024 try {
2025 u = fn.apply(t, ex);
2026 } catch (Throwable rex) {
2027 dx = rex;
2028 }
2029 dst.internalComplete(u, dx);
2030 }
2031 }
2032 private static final long serialVersionUID = 5232453952276885070L;
2033 }
2034
2035 static final class ThenCompose<T,U> extends Completion {
2036 final CompletableFuture<? extends T> src;
2037 final Function<? super T, CompletableFuture<U>> fn;
2038 final CompletableFuture<U> dst;
2039 ThenCompose(CompletableFuture<? extends T> src,
2040 Function<? super T, CompletableFuture<U>> fn,
2041 final CompletableFuture<U> dst) {
2042 this.src = src; this.fn = fn; this.dst = dst;
2043 }
2044 public final void run() {
2045 CompletableFuture<? extends T> a;
2046 Function<? super T, CompletableFuture<U>> fn;
2047 CompletableFuture<U> dst;
2048 Object r; T t; Throwable ex;
2049 if ((dst = this.dst) != null &&
2050 (fn = this.fn) != null &&
2051 (a = this.src) != null &&
2052 (r = a.result) != null &&
2053 compareAndSet(0, 1)) {
2054 if (r instanceof AltResult) {
2055 ex = ((AltResult)r).ex;
2056 t = null;
2057 }
2058 else {
2059 ex = null;
2060 t = a.castResult(r);
2061 }
2062 CompletableFuture<U> c = null;
2063 U u = null;
2064 boolean complete = false;
2065 if (ex == null) {
2066 try {
2067 c = fn.apply(t);
2068 } catch (Throwable rex) {
2069 ex = rex;
2070 }
2071 }
2072 if (ex != null || c == null) {
2073 if (ex == null)
2074 ex = new NullPointerException();
2075 }
2076 else {
2077 ThenCopy<U> d = null;
2078 Object s;
2079 if ((s = c.result) == null) {
2080 CompletionNode p = new CompletionNode
2081 (d = new ThenCopy<U>(c, dst));
2082 while ((s = c.result) == null) {
2083 if (UNSAFE.compareAndSwapObject
2084 (c, COMPLETIONS, p.next = c.completions, p))
2085 break;
2086 }
2087 }
2088 if (s != null && (d == null || d.compareAndSet(0, 1))) {
2089 complete = true;
2090 if (s instanceof AltResult) {
2091 ex = ((AltResult)s).ex; // no rewrap
2092 u = null;
2093 }
2094 else
2095 u = c.castResult(s);
2096 }
2097 }
2098 if (complete || ex != null)
2099 dst.internalComplete(u, ex);
2100 if (c != null)
2101 c.helpPostComplete();
2102 }
2103 }
2104 private static final long serialVersionUID = 5232453952276885070L;
2105 }
2106
2107 /* ------------- then/and/or implementations -------------- */
2108
2109 private <U> CompletableFuture<U> thenFunction
2110 (Function<? super T,? extends U> fn,
2111 Executor e) {
2112 if (fn == null) throw new NullPointerException();
2113 CompletableFuture<U> dst = new CompletableFuture<U>();
2114 ThenFunction<T,U> d = null;
2115 Object r;
2116 if ((r = result) == null) {
2117 CompletionNode p = new CompletionNode
2118 (d = new ThenFunction<T,U>(this, fn, dst, e));
2119 while ((r = result) == null) {
2120 if (UNSAFE.compareAndSwapObject
2121 (this, COMPLETIONS, p.next = completions, p))
2122 break;
2123 }
2124 }
2125 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2126 T t; Throwable ex;
2127 if (r instanceof AltResult) {
2128 ex = ((AltResult)r).ex;
2129 t = null;
2130 }
2131 else {
2132 ex = null;
2133 t = castResult(r);
2134 }
2135 U u = null;
2136 if (ex == null) {
2137 try {
2138 if (e != null)
2139 e.execute(new AsyncFunction<T,U>(t, fn, dst));
2140 else
2141 u = fn.apply(t);
2142 } catch (Throwable rex) {
2143 ex = rex;
2144 }
2145 }
2146 if (e == null || ex != null)
2147 dst.internalComplete(u, ex);
2148 }
2149 helpPostComplete();
2150 return dst;
2151 }
2152
2153 private CompletableFuture<Void> thenBlock
2154 (Block<? super T> fn,
2155 Executor e) {
2156 if (fn == null) throw new NullPointerException();
2157 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2158 ThenBlock<T> d = null;
2159 Object r;
2160 if ((r = result) == null) {
2161 CompletionNode p = new CompletionNode
2162 (d = new ThenBlock<T>(this, fn, dst, e));
2163 while ((r = result) == null) {
2164 if (UNSAFE.compareAndSwapObject
2165 (this, COMPLETIONS, p.next = completions, p))
2166 break;
2167 }
2168 }
2169 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2170 T t; Throwable ex;
2171 if (r instanceof AltResult) {
2172 ex = ((AltResult)r).ex;
2173 t = null;
2174 }
2175 else {
2176 ex = null;
2177 t = castResult(r);
2178 }
2179 if (ex == null) {
2180 try {
2181 if (e != null)
2182 e.execute(new AsyncBlock<T>(t, fn, dst));
2183 else
2184 fn.accept(t);
2185 } catch (Throwable rex) {
2186 ex = rex;
2187 }
2188 }
2189 if (e == null || ex != null)
2190 dst.internalComplete(null, ex);
2191 }
2192 helpPostComplete();
2193 return dst;
2194 }
2195
2196 private CompletableFuture<Void> thenRunnable
2197 (Runnable action,
2198 Executor e) {
2199 if (action == null) throw new NullPointerException();
2200 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2201 ThenRunnable<T> d = null;
2202 Object r;
2203 if ((r = result) == null) {
2204 CompletionNode p = new CompletionNode
2205 (d = new ThenRunnable<T>(this, action, dst, e));
2206 while ((r = result) == null) {
2207 if (UNSAFE.compareAndSwapObject
2208 (this, COMPLETIONS, p.next = completions, p))
2209 break;
2210 }
2211 }
2212 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2213 Throwable ex;
2214 if (r instanceof AltResult)
2215 ex = ((AltResult)r).ex;
2216 else
2217 ex = null;
2218 if (ex == null) {
2219 try {
2220 if (e != null)
2221 e.execute(new AsyncRunnable(action, dst));
2222 else
2223 action.run();
2224 } catch (Throwable rex) {
2225 ex = rex;
2226 }
2227 }
2228 if (e == null || ex != null)
2229 dst.internalComplete(null, ex);
2230 }
2231 helpPostComplete();
2232 return dst;
2233 }
2234
2235 private <U,V> CompletableFuture<V> andFunction
2236 (CompletableFuture<? extends U> other,
2237 BiFunction<? super T,? super U,? extends V> fn,
2238 Executor e) {
2239 if (other == null || fn == null) throw new NullPointerException();
2240 CompletableFuture<V> dst = new CompletableFuture<V>();
2241 AndFunction<T,U,V> d = null;
2242 Object r, s = null;
2243 if ((r = result) == null || (s = other.result) == null) {
2244 d = new AndFunction<T,U,V>(this, other, fn, dst, e);
2245 CompletionNode q = null, p = new CompletionNode(d);
2246 while ((r == null && (r = result) == null) ||
2247 (s == null && (s = other.result) == null)) {
2248 if (q != null) {
2249 if (s != null ||
2250 UNSAFE.compareAndSwapObject
2251 (other, COMPLETIONS, q.next = other.completions, q))
2252 break;
2253 }
2254 else if (r != null ||
2255 UNSAFE.compareAndSwapObject
2256 (this, COMPLETIONS, p.next = completions, p)) {
2257 if (s != null)
2258 break;
2259 q = new CompletionNode(d);
2260 }
2261 }
2262 }
2263 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2264 T t; U u; Throwable ex;
2265 if (r instanceof AltResult) {
2266 ex = ((AltResult)r).ex;
2267 t = null;
2268 }
2269 else {
2270 ex = null;
2271 t = castResult(r);
2272 }
2273 if (ex != null)
2274 u = null;
2275 else if (s instanceof AltResult) {
2276 ex = ((AltResult)s).ex;
2277 u = null;
2278 }
2279 else
2280 u = other.castResult(s);
2281 V v = null;
2282 if (ex == null) {
2283 try {
2284 if (e != null)
2285 e.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
2286 else
2287 v = fn.apply(t, u);
2288 } catch (Throwable rex) {
2289 ex = rex;
2290 }
2291 }
2292 if (e == null || ex != null)
2293 dst.internalComplete(v, ex);
2294 }
2295 helpPostComplete();
2296 other.helpPostComplete();
2297 return dst;
2298 }
2299
2300 private <U> CompletableFuture<Void> andBlock
2301 (CompletableFuture<? extends U> other,
2302 BiBlock<? super T,? super U> fn,
2303 Executor e) {
2304 if (other == null || fn == null) throw new NullPointerException();
2305 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2306 AndBlock<T,U> d = null;
2307 Object r, s = null;
2308 if ((r = result) == null || (s = other.result) == null) {
2309 d = new AndBlock<T,U>(this, other, fn, dst, e);
2310 CompletionNode q = null, p = new CompletionNode(d);
2311 while ((r == null && (r = result) == null) ||
2312 (s == null && (s = other.result) == null)) {
2313 if (q != null) {
2314 if (s != null ||
2315 UNSAFE.compareAndSwapObject
2316 (other, COMPLETIONS, q.next = other.completions, q))
2317 break;
2318 }
2319 else if (r != null ||
2320 UNSAFE.compareAndSwapObject
2321 (this, COMPLETIONS, p.next = completions, p)) {
2322 if (s != null)
2323 break;
2324 q = new CompletionNode(d);
2325 }
2326 }
2327 }
2328 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2329 T t; U u; Throwable ex;
2330 if (r instanceof AltResult) {
2331 ex = ((AltResult)r).ex;
2332 t = null;
2333 }
2334 else {
2335 ex = null;
2336 t = castResult(r);
2337 }
2338 if (ex != null)
2339 u = null;
2340 else if (s instanceof AltResult) {
2341 ex = ((AltResult)s).ex;
2342 u = null;
2343 }
2344 else
2345 u = other.castResult(s);
2346 if (ex == null) {
2347 try {
2348 if (e != null)
2349 e.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
2350 else
2351 fn.accept(t, u);
2352 } catch (Throwable rex) {
2353 ex = rex;
2354 }
2355 }
2356 if (e == null || ex != null)
2357 dst.internalComplete(null, ex);
2358 }
2359 helpPostComplete();
2360 other.helpPostComplete();
2361 return dst;
2362 }
2363
2364 private CompletableFuture<Void> andRunnable
2365 (CompletableFuture<?> other,
2366 Runnable action,
2367 Executor e) {
2368 if (other == null || action == null) throw new NullPointerException();
2369 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2370 AndRunnable<T> d = null;
2371 Object r, s = null;
2372 if ((r = result) == null || (s = other.result) == null) {
2373 d = new AndRunnable<T>(this, other, action, dst, e);
2374 CompletionNode q = null, p = new CompletionNode(d);
2375 while ((r == null && (r = result) == null) ||
2376 (s == null && (s = other.result) == null)) {
2377 if (q != null) {
2378 if (s != null ||
2379 UNSAFE.compareAndSwapObject
2380 (other, COMPLETIONS, q.next = other.completions, q))
2381 break;
2382 }
2383 else if (r != null ||
2384 UNSAFE.compareAndSwapObject
2385 (this, COMPLETIONS, p.next = completions, p)) {
2386 if (s != null)
2387 break;
2388 q = new CompletionNode(d);
2389 }
2390 }
2391 }
2392 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2393 Throwable ex;
2394 if (r instanceof AltResult)
2395 ex = ((AltResult)r).ex;
2396 else
2397 ex = null;
2398 if (ex == null && (s instanceof AltResult))
2399 ex = ((AltResult)s).ex;
2400 if (ex == null) {
2401 try {
2402 if (e != null)
2403 e.execute(new AsyncRunnable(action, dst));
2404 else
2405 action.run();
2406 } catch (Throwable rex) {
2407 ex = rex;
2408 }
2409 }
2410 if (e == null || ex != null)
2411 dst.internalComplete(null, ex);
2412 }
2413 helpPostComplete();
2414 other.helpPostComplete();
2415 return dst;
2416 }
2417
2418 private <U> CompletableFuture<U> orFunction
2419 (CompletableFuture<? extends T> other,
2420 Function<? super T, U> fn,
2421 Executor e) {
2422 if (other == null || fn == null) throw new NullPointerException();
2423 CompletableFuture<U> dst = new CompletableFuture<U>();
2424 OrFunction<T,U> d = null;
2425 Object r;
2426 if ((r = result) == null && (r = other.result) == null) {
2427 d = new OrFunction<T,U>(this, other, fn, dst, e);
2428 CompletionNode q = null, p = new CompletionNode(d);
2429 while ((r = result) == null && (r = other.result) == null) {
2430 if (q != null) {
2431 if (UNSAFE.compareAndSwapObject
2432 (other, COMPLETIONS, q.next = other.completions, q))
2433 break;
2434 }
2435 else if (UNSAFE.compareAndSwapObject
2436 (this, COMPLETIONS, p.next = completions, p))
2437 q = new CompletionNode(d);
2438 }
2439 }
2440 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2441 T t; Throwable ex;
2442 if (r instanceof AltResult) {
2443 ex = ((AltResult)r).ex;
2444 t = null;
2445 }
2446 else {
2447 ex = null;
2448 t = castResult(r);
2449 }
2450 U u = null;
2451 if (ex == null) {
2452 try {
2453 if (e != null)
2454 e.execute(new AsyncFunction<T,U>(t, fn, dst));
2455 else
2456 u = fn.apply(t);
2457 } catch (Throwable rex) {
2458 ex = rex;
2459 }
2460 }
2461 if (e == null || ex != null)
2462 dst.internalComplete(u, ex);
2463 }
2464 helpPostComplete();
2465 other.helpPostComplete();
2466 return dst;
2467 }
2468
2469 private CompletableFuture<Void> orBlock
2470 (CompletableFuture<? extends T> other,
2471 Block<? super T> fn,
2472 Executor e) {
2473 if (other == null || fn == null) throw new NullPointerException();
2474 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2475 OrBlock<T> d = null;
2476 Object r;
2477 if ((r = result) == null && (r = other.result) == null) {
2478 d = new OrBlock<T>(this, other, fn, dst, e);
2479 CompletionNode q = null, p = new CompletionNode(d);
2480 while ((r = result) == null && (r = other.result) == null) {
2481 if (q != null) {
2482 if (UNSAFE.compareAndSwapObject
2483 (other, COMPLETIONS, q.next = other.completions, q))
2484 break;
2485 }
2486 else if (UNSAFE.compareAndSwapObject
2487 (this, COMPLETIONS, p.next = completions, p))
2488 q = new CompletionNode(d);
2489 }
2490 }
2491 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2492 T t; Throwable ex;
2493 if (r instanceof AltResult) {
2494 ex = ((AltResult)r).ex;
2495 t = null;
2496 }
2497 else {
2498 ex = null;
2499 t = castResult(r);
2500 }
2501 if (ex == null) {
2502 try {
2503 if (e != null)
2504 e.execute(new AsyncBlock<T>(t, fn, dst));
2505 else
2506 fn.accept(t);
2507 } catch (Throwable rex) {
2508 ex = rex;
2509 }
2510 }
2511 if (e == null || ex != null)
2512 dst.internalComplete(null, ex);
2513 }
2514 helpPostComplete();
2515 other.helpPostComplete();
2516 return dst;
2517 }
2518
2519 private CompletableFuture<Void> orRunnable
2520 (CompletableFuture<?> other,
2521 Runnable action,
2522 Executor e) {
2523 if (other == null || action == null) throw new NullPointerException();
2524 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2525 OrRunnable<T> d = null;
2526 Object r;
2527 if ((r = result) == null && (r = other.result) == null) {
2528 d = new OrRunnable<T>(this, other, action, dst, e);
2529 CompletionNode q = null, p = new CompletionNode(d);
2530 while ((r = result) == null && (r = other.result) == null) {
2531 if (q != null) {
2532 if (UNSAFE.compareAndSwapObject
2533 (other, COMPLETIONS, q.next = other.completions, q))
2534 break;
2535 }
2536 else if (UNSAFE.compareAndSwapObject
2537 (this, COMPLETIONS, p.next = completions, p))
2538 q = new CompletionNode(d);
2539 }
2540 }
2541 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2542 Throwable ex;
2543 if (r instanceof AltResult)
2544 ex = ((AltResult)r).ex;
2545 else
2546 ex = null;
2547 if (ex == null) {
2548 try {
2549 if (e != null)
2550 e.execute(new AsyncRunnable(action, dst));
2551 else
2552 action.run();
2553 } catch (Throwable rex) {
2554 ex = rex;
2555 }
2556 }
2557 if (e == null || ex != null)
2558 dst.internalComplete(null, ex);
2559 }
2560 helpPostComplete();
2561 other.helpPostComplete();
2562 return dst;
2563 }
2564
2565 // Unsafe mechanics
2566 private static final sun.misc.Unsafe UNSAFE;
2567 private static final long RESULT;
2568 private static final long WAITERS;
2569 private static final long COMPLETIONS;
2570 static {
2571 try {
2572 UNSAFE = sun.misc.Unsafe.getUnsafe();
2573 Class<?> k = CompletableFuture.class;
2574 RESULT = UNSAFE.objectFieldOffset
2575 (k.getDeclaredField("result"));
2576 WAITERS = UNSAFE.objectFieldOffset
2577 (k.getDeclaredField("waiters"));
2578 COMPLETIONS = UNSAFE.objectFieldOffset
2579 (k.getDeclaredField("completions"));
2580 } catch (Exception e) {
2581 throw new Error(e);
2582 }
2583 }
2584 }