ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.21
Committed: Mon Dec 31 16:59:29 2012 UTC (11 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.20: +63 -55 lines
Log Message:
More consistent style

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Block;
10 import java.util.function.BiBlock;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.concurrent.locks.LockSupport;
25
26 /**
27 * A {@link Future} that may be explicitly completed (setting its
28 * value and status), and may include dependent functions and actions
29 * that trigger upon its completion.
30 *
31 * <p>Similar methods are available for Functions, Blocks, and
32 * Runnables, depending on whether actions require arguments and/or
33 * produce results. Functions and actions supplied for dependent
34 * completions (mainly using methods with prefix {@code then}) may be
35 * performed by the thread that completes the current
36 * CompletableFuture, or by any other caller of these methods. There
37 * are no guarantees about the order of processing completions unless
38 * constrained by these methods.
39 *
40 * <p>When two or more threads attempt to {@link #complete} or {@link
41 * #completeExceptionally} a CompletableFuture, only one of them
42 * succeeds.
43 *
44 * <p> Upon exceptional completion, or when a completion entails
45 * computation of a function or action, and it terminates abruptly
46 * with an (unchecked) exception or error, then further completions
47 * act as {@code completeExceptionally} with a {@link
48 * CompletionException} holding that exception as its cause. If a
49 * CompletableFuture completes exceptionally, and is not followed by a
50 * {@link #exceptionally} or {@link #handle} completion, then all of
51 * its dependents (and their dependents) also complete exceptionally
52 * with CompletionExceptions holding the ultimate cause. In case of a
53 * CompletionException, methods {@link #get()} and {@link #get(long,
54 * TimeUnit)} throw a {@link ExecutionException} with the same cause
55 * as would be held in the corresponding CompletionException. However,
56 * in these cases, methods {@link #join()} and {@link #getNow} throw
57 * the CompletionException, which simplifies usage especially within
58 * other completion functions.
59 *
60 * <p>CompletableFutures themselves do not execute asynchronously.
61 * However, the {@code async} methods provide commonly useful ways to
62 * commence asynchronous processing, using either a given {@link
63 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
64 * function or action that will result in the completion of a new
65 * CompletableFuture.
66 *
67 * @author Doug Lea
68 * @since 1.8
69 */
70 public class CompletableFuture<T> implements Future<T> {
71 /*
72 * Overview:
73 *
74 * 1. Non-nullness of field result (set via CAS) indicates
75 * done. An AltResult is used to box null as a result, as well as
76 * to hold exceptions. Using a single field makes completion fast
77 * and simple to detect and trigger, at the expense of a lot of
78 * encoding and decoding that infiltrates many methods. One minor
79 * simplification relies on the (static) NIL (to box null results)
80 * being the only AltResult with a null exception field, so we
81 * don't usually need explicit comparisons with NIL.
82 *
83 * 2. Waiters are held in a Treiber stack similar to the one used
84 * in FutureTask, Phaser, and SynchronousQueue. See their
85 * internal documentation for details.
86 *
87 * 3. Completions are also kept in a list/stack, and pulled off
88 * and run when completion is triggered. (We could in fact use the
89 * the same stack as for waiters, but would give up the potential
90 * parallelism obtained because woken waiters help release/run
91 * others (see method postComplete). Because post-processing may
92 * race with direct calls, completions extend AtomicInteger so
93 * callers can claim the action via compareAndSet(0, 1). The
94 * Completion.run methods are all written a boringly similar
95 * uniform way (that sometimes includes unnecessary-looking
96 * checks, kept to maintain uniformity). There are enough
97 * dimensions upon which they differ that factoring to use common
98 * code isn't worthwhile.
99 *
100 * 4. The exported then/and/or methods do support a bit of
101 * factoring (see thenFunction, andBlock, etc). They must cope
102 * with the intrinsic races surrounding addition of a dependent
103 * action versus performing the action directly because the task
104 * is already complete. For example, a CF may not be complete
105 * upon entry, so a dependent completion is added, but by the time
106 * it is added, the target CF is complete, so must be directly
107 * executed. This is all done while avoiding unnecessary object
108 * construction in safe-bypass cases.
109 */
110
111 // preliminary class definitions
112
113 static final class AltResult {
114 final Throwable ex; // null only for NIL
115 AltResult(Throwable ex) { this.ex = ex; }
116 }
117
118 static final AltResult NIL = new AltResult(null);
119
120 /**
121 * Simple linked list nodes to record waiting threads in a Treiber
122 * stack. See other classes such as Phaser and SynchronousQueue
123 * for more detailed explanation.
124 */
125 static final class WaitNode {
126 volatile Thread thread;
127 volatile WaitNode next;
128 }
129
130 /**
131 * Simple linked list nodes to record completions, used in
132 * basically the same way as WaitNodes. (We separate nodes from
133 * the Completions themselves mainly because for the And and Or
134 * methods, the same Completion object resides in two lists.)
135 */
136 static final class CompletionNode {
137 final Completion completion;
138 volatile CompletionNode next;
139 CompletionNode(Completion completion) { this.completion = completion; }
140 }
141
142
143 // Fields
144
145 volatile Object result; // Either the result or boxed AltResult
146 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
147 volatile CompletionNode completions; // list (Treiber stack) of completions
148
149 // Basic utilities for triggering and processing completions
150 // (The Completion and Async classes and internal methods that
151 // use them are defined after the public methods.)
152
153 /**
154 * Removes and signals all waiting threads and runs all completions.
155 */
156 final void postComplete() {
157 WaitNode q; Thread t;
158 while ((q = waiters) != null) {
159 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
160 (t = q.thread) != null) {
161 q.thread = null;
162 LockSupport.unpark(t);
163 }
164 }
165
166 CompletionNode h; Completion c;
167 while ((h = completions) != null) {
168 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
169 (c = h.completion) != null)
170 c.run();
171 }
172 }
173
174 /**
175 * Triggers completion with the encoding of the given arguments:
176 * if the exception is non-null, encodes it as a wrapped
177 * CompletionException unless it is one already. Otherwise uses
178 * the given result, boxed as NIL if null.
179 */
180 final void internalComplete(Object v, Throwable ex) {
181 if (result == null)
182 UNSAFE.compareAndSwapObject
183 (this, RESULT, null,
184 (ex == null) ? (v == null) ? NIL : v :
185 new AltResult((ex instanceof CompletionException) ? ex :
186 new CompletionException(ex)));
187 postComplete(); // help out even if not triggered
188 }
189
190 /**
191 * If triggered, help release and/or process completions
192 */
193 final void helpPostComplete() {
194 if (result != null)
195 postComplete();
196 }
197
198 // public methods
199
200 /**
201 * Creates a new incomplete CompletableFuture.
202 */
203 public CompletableFuture() {
204 }
205
206 /**
207 * Asynchronously executes in the {@link
208 * ForkJoinPool#commonPool()}, a task that completes the returned
209 * CompletableFuture with the result of the given Supplier.
210 *
211 * @param supplier a function returning the value to be used
212 * to complete the returned CompletableFuture
213 * @return the CompletableFuture
214 */
215 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
216 if (supplier == null) throw new NullPointerException();
217 CompletableFuture<U> f = new CompletableFuture<U>();
218 ForkJoinPool.commonPool().
219 execute((ForkJoinTask<?>)new AsyncSupplier<U>(supplier, f));
220 return f;
221 }
222
223 /**
224 * Asynchronously executes using the given executor, a task that
225 * completes the returned CompletableFuture with the result of the
226 * given Supplier.
227 *
228 * @param supplier a function returning the value to be used
229 * to complete the returned CompletableFuture
230 * @param executor the executor to use for asynchronous execution
231 * @return the CompletableFuture
232 */
233 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
234 Executor executor) {
235 if (executor == null || supplier == null)
236 throw new NullPointerException();
237 CompletableFuture<U> f = new CompletableFuture<U>();
238 executor.execute(new AsyncSupplier<U>(supplier, f));
239 return f;
240 }
241
242 /**
243 * Asynchronously executes in the {@link
244 * ForkJoinPool#commonPool()} a task that runs the given action,
245 * and then completes the returned CompletableFuture.
246 *
247 * @param runnable the action to run before completing the
248 * returned CompletableFuture
249 * @return the CompletableFuture
250 */
251 public static CompletableFuture<Void> runAsync(Runnable runnable) {
252 if (runnable == null) throw new NullPointerException();
253 CompletableFuture<Void> f = new CompletableFuture<Void>();
254 ForkJoinPool.commonPool().
255 execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
256 return f;
257 }
258
259 /**
260 * Asynchronously executes using the given executor, a task that
261 * runs the given action, and then completes the returned
262 * CompletableFuture.
263 *
264 * @param runnable the action to run before completing the
265 * returned CompletableFuture
266 * @param executor the executor to use for asynchronous execution
267 * @return the CompletableFuture
268 */
269 public static CompletableFuture<Void> runAsync(Runnable runnable,
270 Executor executor) {
271 if (executor == null || runnable == null)
272 throw new NullPointerException();
273 CompletableFuture<Void> f = new CompletableFuture<Void>();
274 executor.execute(new AsyncRunnable(runnable, f));
275 return f;
276 }
277
278 /**
279 * Returns {@code true} if completed in any fashion: normally,
280 * exceptionally, or via cancellation.
281 *
282 * @return {@code true} if completed
283 */
284 public boolean isDone() {
285 return result != null;
286 }
287
288 /**
289 * Waits if necessary for the computation to complete, and then
290 * retrieves its result.
291 *
292 * @return the computed result
293 * @throws CancellationException if the computation was cancelled
294 * @throws ExecutionException if the computation threw an
295 * exception
296 * @throws InterruptedException if the current thread was interrupted
297 * while waiting
298 */
299 @SuppressWarnings("unchecked") public T get() throws InterruptedException, ExecutionException {
300 Object r; Throwable ex, cause;
301 if ((r = result) == null && (r = waitingGet(true)) == null)
302 throw new InterruptedException();
303 if (r instanceof AltResult) {
304 if ((ex = ((AltResult)r).ex) != null) {
305 if (ex instanceof CancellationException)
306 throw (CancellationException)ex;
307 if ((ex instanceof CompletionException) &&
308 (cause = ex.getCause()) != null)
309 ex = cause;
310 throw new ExecutionException(ex);
311 }
312 return null;
313 }
314 return (T)r;
315 }
316
317 /**
318 * Waits if necessary for at most the given time for completion,
319 * and then retrieves its result, if available.
320 *
321 * @param timeout the maximum time to wait
322 * @param unit the time unit of the timeout argument
323 * @return the computed result
324 * @throws CancellationException if the computation was cancelled
325 * @throws ExecutionException if the computation threw an
326 * exception
327 * @throws InterruptedException if the current thread was interrupted
328 * while waiting
329 * @throws TimeoutException if the wait timed out
330 */
331 @SuppressWarnings("unchecked") public T get(long timeout, TimeUnit unit)
332 throws InterruptedException, ExecutionException, TimeoutException {
333 Object r; Throwable ex, cause;
334 long nanos = unit.toNanos(timeout);
335 if (Thread.interrupted())
336 throw new InterruptedException();
337 if ((r = result) == null)
338 r = timedAwaitDone(nanos);
339 if (r instanceof AltResult) {
340 if ((ex = ((AltResult)r).ex) != null) {
341 if (ex instanceof CancellationException)
342 throw (CancellationException)ex;
343 if ((ex instanceof CompletionException) &&
344 (cause = ex.getCause()) != null)
345 ex = cause;
346 throw new ExecutionException(ex);
347 }
348 return null;
349 }
350 return (T)r;
351 }
352
353 /**
354 * Returns the result value when complete, or throws an
355 * (unchecked) exception if completed exceptionally. To better
356 * conform with the use of common functional forms, if a
357 * computation involved in the completion of this
358 * CompletableFuture threw an exception, this method throws an
359 * (unchecked) {@link CompletionException} with the underlying
360 * exception as its cause.
361 *
362 * @return the result value
363 * @throws CancellationException if the computation was cancelled
364 * @throws CompletionException if a completion computation threw
365 * an exception
366 */
367 @SuppressWarnings("unchecked") public T join() {
368 Object r; Throwable ex;
369 if ((r = result) == null)
370 r = waitingGet(false);
371 if (r instanceof AltResult) {
372 if ((ex = ((AltResult)r).ex) != null) {
373 if (ex instanceof CancellationException)
374 throw (CancellationException)ex;
375 if (ex instanceof CompletionException)
376 throw (CompletionException)ex;
377 throw new CompletionException(ex);
378 }
379 return null;
380 }
381 return (T)r;
382 }
383
384 /**
385 * Returns the result value (or throws any encountered exception)
386 * if completed, else returns the given valueIfAbsent.
387 *
388 * @param valueIfAbsent the value to return if not completed
389 * @return the result value, if completed, else the given valueIfAbsent
390 * @throws CancellationException if the computation was cancelled
391 * @throws CompletionException if a completion computation threw
392 * an exception
393 */
394 @SuppressWarnings("unchecked") public T getNow(T valueIfAbsent) {
395 Object r; Throwable ex;
396 if ((r = result) == null)
397 return valueIfAbsent;
398 if (r instanceof AltResult) {
399 if ((ex = ((AltResult)r).ex) != null) {
400 if (ex instanceof CancellationException)
401 throw (CancellationException)ex;
402 if (ex instanceof CompletionException)
403 throw (CompletionException)ex;
404 throw new CompletionException(ex);
405 }
406 return null;
407 }
408 return (T)r;
409 }
410
411 /**
412 * If not already completed, sets the value returned by {@link
413 * #get()} and related methods to the given value.
414 *
415 * @param value the result value
416 * @return true if this invocation caused this CompletableFuture
417 * to transition to a completed state, else false
418 */
419 public boolean complete(T value) {
420 boolean triggered =
421 result == null &&
422 UNSAFE.compareAndSwapObject(this, RESULT, null,
423 value == null ? NIL : value);
424 postComplete();
425 return triggered;
426 }
427
428 /**
429 * If not already completed, causes invocations of {@link #get()}
430 * and related methods to throw the given exception.
431 *
432 * @param ex the exception
433 * @return true if this invocation caused this CompletableFuture
434 * to transition to a completed state, else false
435 */
436 public boolean completeExceptionally(Throwable ex) {
437 if (ex == null) throw new NullPointerException();
438 boolean triggered =
439 result == null &&
440 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
441 postComplete();
442 return triggered;
443 }
444
445 /**
446 * Creates and returns a CompletableFuture that is completed with
447 * the result of the given function of this CompletableFuture.
448 * If this CompletableFuture completes exceptionally,
449 * then the returned CompletableFuture also does so,
450 * with a CompletionException holding this exception as
451 * its cause.
452 *
453 * @param fn the function to use to compute the value of
454 * the returned CompletableFuture
455 * @return the new CompletableFuture
456 */
457 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
458 return thenFunction(fn, null);
459 }
460
461 /**
462 * Creates and returns a CompletableFuture that is asynchronously
463 * completed using the {@link ForkJoinPool#commonPool()} with the
464 * result of the given function of this CompletableFuture. If
465 * this CompletableFuture completes exceptionally, then the
466 * returned CompletableFuture also does so, with a
467 * CompletionException holding this exception as its cause.
468 *
469 * @param fn the function to use to compute the value of
470 * the returned CompletableFuture
471 * @return the new CompletableFuture
472 */
473 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
474 return thenFunction(fn, ForkJoinPool.commonPool());
475 }
476
477 /**
478 * Creates and returns a CompletableFuture that is asynchronously
479 * completed using the given executor with the result of the given
480 * function of this CompletableFuture. If this CompletableFuture
481 * completes exceptionally, then the returned CompletableFuture
482 * also does so, with a CompletionException holding this exception as
483 * its cause.
484 *
485 * @param fn the function to use to compute the value of
486 * the returned CompletableFuture
487 * @param executor the executor to use for asynchronous execution
488 * @return the new CompletableFuture
489 */
490 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
491 Executor executor) {
492 if (executor == null) throw new NullPointerException();
493 return thenFunction(fn, executor);
494 }
495
496 /**
497 * Creates and returns a CompletableFuture that is completed after
498 * performing the given action with this CompletableFuture's
499 * result when it completes. If this CompletableFuture
500 * completes exceptionally, then the returned CompletableFuture
501 * also does so, with a CompletionException holding this exception as
502 * its cause.
503 *
504 * @param block the action to perform before completing the
505 * returned CompletableFuture
506 * @return the new CompletableFuture
507 */
508 public CompletableFuture<Void> thenAccept(Block<? super T> block) {
509 return thenBlock(block, null);
510 }
511
512 /**
513 * Creates and returns a CompletableFuture that is asynchronously
514 * completed using the {@link ForkJoinPool#commonPool()} with this
515 * CompletableFuture's result when it completes. If this
516 * CompletableFuture completes exceptionally, then the returned
517 * CompletableFuture also does so, with a CompletionException holding
518 * this exception as its cause.
519 *
520 * @param block the action to perform before completing the
521 * returned CompletableFuture
522 * @return the new CompletableFuture
523 */
524 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block) {
525 return thenBlock(block, ForkJoinPool.commonPool());
526 }
527
528 /**
529 * Creates and returns a CompletableFuture that is asynchronously
530 * completed using the given executor with this
531 * CompletableFuture's result when it completes. If this
532 * CompletableFuture completes exceptionally, then the returned
533 * CompletableFuture also does so, with a CompletionException holding
534 * this exception as its cause.
535 *
536 * @param block the action to perform before completing the
537 * returned CompletableFuture
538 * @param executor the executor to use for asynchronous execution
539 * @return the new CompletableFuture
540 */
541 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block,
542 Executor executor) {
543 if (executor == null) throw new NullPointerException();
544 return thenBlock(block, executor);
545 }
546
547 /**
548 * Creates and returns a CompletableFuture that is completed after
549 * performing the given action when this CompletableFuture
550 * completes. If this CompletableFuture completes exceptionally,
551 * then the returned CompletableFuture also does so, with a
552 * CompletionException holding this exception as its cause.
553 *
554 * @param action the action to perform before completing the
555 * returned CompletableFuture
556 * @return the new CompletableFuture
557 */
558 public CompletableFuture<Void> thenRun(Runnable action) {
559 return thenRunnable(action, null);
560 }
561
562 /**
563 * Creates and returns a CompletableFuture that is asynchronously
564 * completed using the {@link ForkJoinPool#commonPool()} after
565 * performing the given action when this CompletableFuture
566 * completes. If this CompletableFuture completes exceptionally,
567 * then the returned CompletableFuture also does so, with a
568 * CompletionException holding this exception as its cause.
569 *
570 * @param action the action to perform before completing the
571 * returned CompletableFuture
572 * @return the new CompletableFuture
573 */
574 public CompletableFuture<Void> thenRunAsync(Runnable action) {
575 return thenRunnable(action, ForkJoinPool.commonPool());
576 }
577
578 /**
579 * Creates and returns a CompletableFuture that is asynchronously
580 * completed using the given executor after performing the given
581 * action when this CompletableFuture completes. If this
582 * CompletableFuture completes exceptionally, then the returned
583 * CompletableFuture also does so, with a CompletionException holding
584 * this exception as its cause.
585 *
586 * @param action the action to perform before completing the
587 * returned CompletableFuture
588 * @param executor the executor to use for asynchronous execution
589 * @return the new CompletableFuture
590 */
591 public CompletableFuture<Void> thenRunAsync(Runnable action,
592 Executor executor) {
593 if (executor == null) throw new NullPointerException();
594 return thenRunnable(action, executor);
595 }
596
597 /**
598 * Creates and returns a CompletableFuture that is completed with
599 * the result of the given function of this and the other given
600 * CompletableFuture's results when both complete. If this or
601 * the other CompletableFuture complete exceptionally, then the
602 * returned CompletableFuture also does so, with a
603 * CompletionException holding the exception as its cause.
604 *
605 * @param other the other CompletableFuture
606 * @param fn the function to use to compute the value of
607 * the returned CompletableFuture
608 * @return the new CompletableFuture
609 */
610 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
611 BiFunction<? super T,? super U,? extends V> fn) {
612 return andFunction(other, fn, null);
613 }
614
615 /**
616 * Creates and returns a CompletableFuture that is asynchronously
617 * completed using the {@link ForkJoinPool#commonPool()} with
618 * the result of the given function of this and the other given
619 * CompletableFuture's results when both complete. If this or
620 * the other CompletableFuture complete exceptionally, then the
621 * returned CompletableFuture also does so, with a
622 * CompletionException holding the exception as its cause.
623 *
624 * @param other the other CompletableFuture
625 * @param fn the function to use to compute the value of
626 * the returned CompletableFuture
627 * @return the new CompletableFuture
628 */
629 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
630 BiFunction<? super T,? super U,? extends V> fn) {
631 return andFunction(other, fn, ForkJoinPool.commonPool());
632 }
633
634 /**
635 * Creates and returns a CompletableFuture that is
636 * asynchronously completed using the given executor with the
637 * result of the given function of this and the other given
638 * CompletableFuture's results when both complete. If this or
639 * the other CompletableFuture complete exceptionally, then the
640 * returned CompletableFuture also does so, with a
641 * CompletionException holding the exception as its cause.
642 *
643 * @param other the other CompletableFuture
644 * @param fn the function to use to compute the value of
645 * the returned CompletableFuture
646 * @param executor the executor to use for asynchronous execution
647 * @return the new CompletableFuture
648 */
649
650 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
651 BiFunction<? super T,? super U,? extends V> fn,
652 Executor executor) {
653 if (executor == null) throw new NullPointerException();
654 return andFunction(other, fn, executor);
655 }
656
657 /**
658 * Creates and returns a CompletableFuture that is completed with
659 * the results of this and the other given CompletableFuture if
660 * both complete. If this and/or the other CompletableFuture
661 * complete exceptionally, then the returned CompletableFuture
662 * also does so, with a CompletionException holding one of these
663 * exceptions as its cause.
664 *
665 * @param other the other CompletableFuture
666 * @param block the action to perform before completing the
667 * returned CompletableFuture
668 * @return the new CompletableFuture
669 */
670 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
671 BiBlock<? super T, ? super U> block) {
672 return andBlock(other, block, null);
673 }
674
675 /**
676 * Creates and returns a CompletableFuture that is completed
677 * asynchronously using the {@link ForkJoinPool#commonPool()} with
678 * the results of this and the other given CompletableFuture when
679 * both complete. If this and/or the other CompletableFuture
680 * complete exceptionally, then the returned CompletableFuture
681 * also does so, with a CompletionException holding one of these
682 * exceptions as its cause.
683 *
684 * @param other the other CompletableFuture
685 * @param block the action to perform before completing the
686 * returned CompletableFuture
687 * @return the new CompletableFuture
688 */
689 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
690 BiBlock<? super T, ? super U> block) {
691 return andBlock(other, block, ForkJoinPool.commonPool());
692 }
693
694 /**
695 * Creates and returns a CompletableFuture that is completed
696 * asynchronously using the given executor with the results of
697 * this and the other given CompletableFuture when both complete.
698 * If this and/or the other CompletableFuture complete
699 * exceptionally, then the returned CompletableFuture also does
700 * so, with a CompletionException holding one of these exceptions as
701 * its cause.
702 *
703 * @param other the other CompletableFuture
704 * @param block the action to perform before completing the
705 * returned CompletableFuture
706 * @param executor the executor to use for asynchronous execution
707 * @return the new CompletableFuture
708 */
709 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
710 BiBlock<? super T, ? super U> block,
711 Executor executor) {
712 if (executor == null) throw new NullPointerException();
713 return andBlock(other, block, executor);
714 }
715
716 /**
717 * Creates and returns a CompletableFuture that is completed
718 * when this and the other given CompletableFuture both
719 * complete. If this and/or the other CompletableFuture complete
720 * exceptionally, then the returned CompletableFuture also does
721 * so, with a CompletionException holding one of these exceptions as
722 * its cause.
723 *
724 * @param other the other CompletableFuture
725 * @param action the action to perform before completing the
726 * returned CompletableFuture
727 * @return the new CompletableFuture
728 */
729 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
730 Runnable action) {
731 return andRunnable(other, action, null);
732 }
733
734 /**
735 * Creates and returns a CompletableFuture that is completed
736 * asynchronously using the {@link ForkJoinPool#commonPool()}
737 * when this and the other given CompletableFuture both
738 * complete. If this and/or the other CompletableFuture complete
739 * exceptionally, then the returned CompletableFuture also does
740 * so, with a CompletionException holding one of these exceptions as
741 * its cause.
742 *
743 * @param other the other CompletableFuture
744 * @param action the action to perform before completing the
745 * returned CompletableFuture
746 * @return the new CompletableFuture
747 */
748 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
749 Runnable action) {
750 return andRunnable(other, action, ForkJoinPool.commonPool());
751 }
752
753 /**
754 * Creates and returns a CompletableFuture that is completed
755 * asynchronously using the given executor
756 * when this and the other given CompletableFuture both
757 * complete. If this and/or the other CompletableFuture complete
758 * exceptionally, then the returned CompletableFuture also does
759 * so, with a CompletionException holding one of these exceptions as
760 * its cause.
761 *
762 * @param other the other CompletableFuture
763 * @param action the action to perform before completing the
764 * returned CompletableFuture
765 * @param executor the executor to use for asynchronous execution
766 * @return the new CompletableFuture
767 */
768 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
769 Runnable action,
770 Executor executor) {
771 if (executor == null) throw new NullPointerException();
772 return andRunnable(other, action, executor);
773 }
774
775 /**
776 * Creates and returns a CompletableFuture that is completed with
777 * the result of the given function of either this or the other
778 * given CompletableFuture's results when either complete. If
779 * this and/or the other CompletableFuture complete exceptionally,
780 * then the returned CompletableFuture may also do so, with a
781 * CompletionException holding one of these exceptions as its cause.
782 * No guarantees are made about which result or exception is used
783 * in the returned CompletableFuture.
784 *
785 * @param other the other CompletableFuture
786 * @param fn the function to use to compute the value of
787 * the returned CompletableFuture
788 * @return the new CompletableFuture
789 */
790 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
791 Function<? super T, U> fn) {
792 return orFunction(other, fn, null);
793 }
794
795 /**
796 * Creates and returns a CompletableFuture that is completed
797 * asynchronously using the {@link ForkJoinPool#commonPool()} with
798 * the result of the given function of either this or the other
799 * given CompletableFuture's results when either complete. If
800 * this and/or the other CompletableFuture complete exceptionally,
801 * then the returned CompletableFuture may also do so, with a
802 * CompletionException holding one of these exceptions as its cause.
803 * No guarantees are made about which result or exception is used
804 * in the returned CompletableFuture.
805 *
806 * @param other the other CompletableFuture
807 * @param fn the function to use to compute the value of
808 * the returned CompletableFuture
809 * @return the new CompletableFuture
810 */
811 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
812 Function<? super T, U> fn) {
813 return orFunction(other, fn, ForkJoinPool.commonPool());
814 }
815
816 /**
817 * Creates and returns a CompletableFuture that is completed
818 * asynchronously using the given executor with the result of the
819 * given function of either this or the other given
820 * CompletableFuture's results when either complete. If this
821 * and/or the other CompletableFuture complete exceptionally, then
822 * the returned CompletableFuture may also do so, with a
823 * CompletionException holding one of these exceptions as its cause.
824 * No guarantees are made about which result or exception is used
825 * in the returned CompletableFuture.
826 *
827 * @param other the other CompletableFuture
828 * @param fn the function to use to compute the value of
829 * the returned CompletableFuture
830 * @param executor the executor to use for asynchronous execution
831 * @return the new CompletableFuture
832 */
833 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
834 Function<? super T, U> fn,
835 Executor executor) {
836 if (executor == null) throw new NullPointerException();
837 return orFunction(other, fn, executor);
838 }
839
840 /**
841 * Creates and returns a CompletableFuture that is completed after
842 * performing the given action with the result of either this or the
843 * other given CompletableFuture's result, when either complete.
844 * If this and/or the other CompletableFuture complete
845 * exceptionally, then the returned CompletableFuture may also do
846 * so, with a CompletionException holding one of these exceptions as
847 * its cause. No guarantees are made about which exception is
848 * used in the returned CompletableFuture.
849 *
850 * @param other the other CompletableFuture
851 * @param block the action to perform before completing the
852 * returned CompletableFuture
853 * @return the new CompletableFuture
854 */
855 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
856 Block<? super T> block) {
857 return orBlock(other, block, null);
858 }
859
860 /**
861 * Creates and returns a CompletableFuture that is completed
862 * asynchronously using the {@link ForkJoinPool#commonPool()},
863 * performing the given action with the result of either this or
864 * the other given CompletableFuture's result, when either
865 * complete. If this and/or the other CompletableFuture complete
866 * exceptionally, then the returned CompletableFuture may also do
867 * so, with a CompletionException holding one of these exceptions as
868 * its cause. No guarantees are made about which exception is
869 * used in the returned CompletableFuture.
870 *
871 * @param other the other CompletableFuture
872 * @param block the action to perform before completing the
873 * returned CompletableFuture
874 * @return the new CompletableFuture
875 */
876 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
877 Block<? super T> block) {
878 return orBlock(other, block, ForkJoinPool.commonPool());
879 }
880
881 /**
882 * Creates and returns a CompletableFuture that is completed
883 * asynchronously using the given executor,
884 * performing the given action with the result of either this or
885 * the other given CompletableFuture's result, when either
886 * complete. If this and/or the other CompletableFuture complete
887 * exceptionally, then the returned CompletableFuture may also do
888 * so, with a CompletionException holding one of these exceptions as
889 * its cause. No guarantees are made about which exception is
890 * used in the returned CompletableFuture.
891 *
892 * @param other the other CompletableFuture
893 * @param block the action to perform before completing the
894 * returned CompletableFuture
895 * @param executor the executor to use for asynchronous execution
896 * @return the new CompletableFuture
897 */
898 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
899 Block<? super T> block,
900 Executor executor) {
901 if (executor == null) throw new NullPointerException();
902 return orBlock(other, block, executor);
903 }
904
905 /**
906 * Creates and returns a CompletableFuture that is completed
907 * after this or the other given CompletableFuture complete. If
908 * this and/or the other CompletableFuture complete exceptionally,
909 * then the returned CompletableFuture may also do so, with a
910 * CompletionException holding one of these exceptions as its cause.
911 * No guarantees are made about which exception is used in the
912 * returned CompletableFuture.
913 *
914 * @param other the other CompletableFuture
915 * @param action the action to perform before completing the
916 * returned CompletableFuture
917 * @return the new CompletableFuture
918 */
919 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
920 Runnable action) {
921 return orRunnable(other, action, null);
922 }
923
924 /**
925 * Creates and returns a CompletableFuture that is completed
926 * asynchronously using the {@link ForkJoinPool#commonPool()}
927 * after this or the other given CompletableFuture complete. If
928 * this and/or the other CompletableFuture complete exceptionally,
929 * then the returned CompletableFuture may also do so, with a
930 * CompletionException holding one of these exceptions as its cause.
931 * No guarantees are made about which exception is used in the
932 * returned CompletableFuture.
933 *
934 * @param other the other CompletableFuture
935 * @param action the action to perform before completing the
936 * returned CompletableFuture
937 * @return the new CompletableFuture
938 */
939 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
940 Runnable action) {
941 return orRunnable(other, action, ForkJoinPool.commonPool());
942 }
943
944 /**
945 * Creates and returns a CompletableFuture that is completed
946 * asynchronously using the given executor after this or the other
947 * given CompletableFuture complete. If this and/or the other
948 * CompletableFuture complete exceptionally, then the returned
949 * CompletableFuture may also do so, with a CompletionException
950 * holding one of these exceptions as its cause. No guarantees are
951 * made about which exception is used in the returned
952 * CompletableFuture.
953 *
954 * @param other the other CompletableFuture
955 * @param action the action to perform before completing the
956 * returned CompletableFuture
957 * @param executor the executor to use for asynchronous execution
958 * @return the new CompletableFuture
959 */
960 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
961 Runnable action,
962 Executor executor) {
963 if (executor == null) throw new NullPointerException();
964 return orRunnable(other, action, executor);
965 }
966
967 /**
968 * Returns a CompletableFuture (or an equivalent one) produced by
969 * the given function of the result of this CompletableFuture when
970 * completed. If this CompletableFuture completes exceptionally,
971 * then the returned CompletableFuture also does so, with a
972 * CompletionException holding this exception as its cause.
973 *
974 * @param fn the function returning a new CompletableFuture.
975 * @return the CompletableFuture, that {@code isDone()} upon
976 * return if completed by the given function, or an exception
977 * occurs.
978 */
979 @SuppressWarnings("unchecked") public <U> CompletableFuture<U> thenCompose(Function<? super T,
980 CompletableFuture<U>> fn) {
981 if (fn == null) throw new NullPointerException();
982 CompletableFuture<U> dst = null;
983 ThenCompose<T,U> d = null;
984 Object r;
985 if ((r = result) == null) {
986 dst = new CompletableFuture<U>();
987 CompletionNode p = new CompletionNode
988 (d = new ThenCompose<T,U>(this, fn, dst));
989 while ((r = result) == null) {
990 if (UNSAFE.compareAndSwapObject
991 (this, COMPLETIONS, p.next = completions, p))
992 break;
993 }
994 }
995 if (r != null && (d == null || d.compareAndSet(0, 1))) {
996 T t; Throwable ex = null;
997 if (r instanceof AltResult) {
998 ex = ((AltResult)r).ex;
999 t = null;
1000 }
1001 else
1002 t = (T) r;
1003 if (ex == null) {
1004 try {
1005 dst = fn.apply(t);
1006 } catch (Throwable rex) {
1007 ex = rex;
1008 }
1009 }
1010 if (dst == null) {
1011 dst = new CompletableFuture<U>();
1012 if (ex == null)
1013 ex = new NullPointerException();
1014 }
1015 if (ex != null)
1016 dst.internalComplete(null, ex);
1017 }
1018 helpPostComplete();
1019 dst.helpPostComplete();
1020 return dst;
1021 }
1022
1023 /**
1024 * Creates and returns a CompletableFuture that is completed with
1025 * the result of the given function of the exception triggering
1026 * this CompletableFuture's completion when it completes
1027 * exceptionally; Otherwise, if this CompletableFuture completes
1028 * normally, then the returned CompletableFuture also completes
1029 * normally with the same value.
1030 *
1031 * @param fn the function to use to compute the value of the
1032 * returned CompletableFuture if this CompletableFuture completed
1033 * exceptionally
1034 * @return the new CompletableFuture
1035 */
1036 @SuppressWarnings("unchecked") public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
1037 if (fn == null) throw new NullPointerException();
1038 CompletableFuture<T> dst = new CompletableFuture<T>();
1039 ExceptionAction<T> d = null;
1040 Object r;
1041 if ((r = result) == null) {
1042 CompletionNode p =
1043 new CompletionNode(d = new ExceptionAction<T>(this, fn, dst));
1044 while ((r = result) == null) {
1045 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1046 p.next = completions, p))
1047 break;
1048 }
1049 }
1050 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1051 T t = null; Throwable ex, dx = null;
1052 if (r instanceof AltResult) {
1053 if ((ex = ((AltResult)r).ex) != null) {
1054 try {
1055 t = fn.apply(ex);
1056 } catch (Throwable rex) {
1057 dx = rex;
1058 }
1059 }
1060 }
1061 else
1062 t = (T) r;
1063 dst.internalComplete(t, dx);
1064 }
1065 helpPostComplete();
1066 return dst;
1067 }
1068
1069 /**
1070 * Creates and returns a CompletableFuture that is completed with
1071 * the result of the given function of the result and exception of
1072 * this CompletableFuture's completion when it completes. The
1073 * given function is invoked with the result (or {@code null} if
1074 * none) and the exception (or {@code null} if none) of this
1075 * CompletableFuture when complete.
1076 *
1077 * @param fn the function to use to compute the value of the
1078 * returned CompletableFuture
1079
1080 * @return the new CompletableFuture
1081 */
1082 @SuppressWarnings("unchecked") public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
1083 if (fn == null) throw new NullPointerException();
1084 CompletableFuture<U> dst = new CompletableFuture<U>();
1085 ThenHandle<T,U> d = null;
1086 Object r;
1087 if ((r = result) == null) {
1088 CompletionNode p =
1089 new CompletionNode(d = new ThenHandle<T,U>(this, fn, dst));
1090 while ((r = result) == null) {
1091 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1092 p.next = completions, p))
1093 break;
1094 }
1095 }
1096 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1097 T t; Throwable ex;
1098 if (r instanceof AltResult) {
1099 ex = ((AltResult)r).ex;
1100 t = null;
1101 }
1102 else {
1103 ex = null;
1104 t = (T) r;
1105 }
1106 U u = null; Throwable dx = null;
1107 try {
1108 u = fn.apply(t, ex);
1109 } catch (Throwable rex) {
1110 dx = rex;
1111 }
1112 dst.internalComplete(u, dx);
1113 }
1114 helpPostComplete();
1115 return dst;
1116 }
1117
1118 /**
1119 * Attempts to complete this CompletableFuture with
1120 * a {@link CancellationException}.
1121 *
1122 * @param mayInterruptIfRunning this value has no effect in this
1123 * implementation because interrupts are not used to control
1124 * processing.
1125 *
1126 * @return {@code true} if this task is now cancelled
1127 */
1128 public boolean cancel(boolean mayInterruptIfRunning) {
1129 Object r;
1130 while ((r = result) == null) {
1131 r = new AltResult(new CancellationException());
1132 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
1133 postComplete();
1134 return true;
1135 }
1136 }
1137 return ((r instanceof AltResult) &&
1138 (((AltResult)r).ex instanceof CancellationException));
1139 }
1140
1141 /**
1142 * Returns {@code true} if this CompletableFuture was cancelled
1143 * before it completed normally.
1144 *
1145 * @return {@code true} if this CompletableFuture was cancelled
1146 * before it completed normally
1147 */
1148 public boolean isCancelled() {
1149 Object r;
1150 return ((r = result) != null &&
1151 (r instanceof AltResult) &&
1152 (((AltResult)r).ex instanceof CancellationException));
1153 }
1154
1155 /**
1156 * Forcibly sets or resets the value subsequently returned by
1157 * method get() and related methods, whether or not already
1158 * completed. This method is designed for use only in error
1159 * recovery actions, and even in such situations may result in
1160 * ongoing dependent completions using established versus
1161 * overwritten values.
1162 *
1163 * @param value the completion value
1164 */
1165 public void obtrudeValue(T value) {
1166 result = (value == null) ? NIL : value;
1167 postComplete();
1168 }
1169
1170 /* ------------- waiting for completions -------------- */
1171
1172 /**
1173 * Heuristic spin value for waitingGet() before blocking on
1174 * multiprocessors
1175 */
1176 static final int WAITING_GET_SPINS = 256;
1177
1178 /**
1179 * Returns raw result after waiting, or null if interruptible and
1180 * interrupted.
1181 */
1182 private Object waitingGet(boolean interruptible) {
1183 WaitNode q = null;
1184 boolean queued = false, interrupted = false;
1185 int h = 0, spins = 0;
1186 for (Object r;;) {
1187 if ((r = result) != null) {
1188 Throwable ex;
1189 if (q != null) // suppress unpark
1190 q.thread = null;
1191 postComplete(); // help release others
1192 if (interrupted)
1193 Thread.currentThread().interrupt();
1194 return r;
1195 }
1196 else if (h == 0) {
1197 h = ThreadLocalRandom.current().nextInt();
1198 if (Runtime.getRuntime().availableProcessors() > 1)
1199 spins = WAITING_GET_SPINS;
1200 }
1201 else if (spins > 0) {
1202 h ^= h << 1; // xorshift
1203 h ^= h >>> 3;
1204 if ((h ^= h << 10) >= 0)
1205 --spins;
1206 }
1207 else if (q == null)
1208 q = new WaitNode();
1209 else if (!queued)
1210 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1211 q.next = waiters, q);
1212 else if (Thread.interrupted()) {
1213 if (interruptible) {
1214 removeWaiter(q);
1215 return null;
1216 }
1217 interrupted = true;
1218 }
1219 else if (q.thread == null)
1220 q.thread = Thread.currentThread();
1221 else
1222 LockSupport.park(this);
1223 }
1224 }
1225
1226 /**
1227 * Awaits completion or aborts on interrupt or timeout.
1228 *
1229 * @param nanos time to wait
1230 * @return raw result
1231 */
1232 private Object timedAwaitDone(long nanos)
1233 throws InterruptedException, TimeoutException {
1234 final long deadline = System.nanoTime() + nanos;
1235 WaitNode q = null;
1236 boolean queued = false;
1237 for (Object r;;) {
1238 if (Thread.interrupted()) {
1239 removeWaiter(q);
1240 throw new InterruptedException();
1241 }
1242 else if ((r = result) != null) {
1243 if (q != null)
1244 q.thread = null;
1245 postComplete();
1246 return r;
1247 }
1248 else if (q == null)
1249 q = new WaitNode();
1250 else if (!queued)
1251 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1252 q.next = waiters, q);
1253 else if ((nanos = deadline - System.nanoTime()) <= 0L) {
1254 removeWaiter(q);
1255 throw new TimeoutException();
1256 }
1257 else if (q.thread == null)
1258 q.thread = Thread.currentThread();
1259 else
1260 LockSupport.parkNanos(this, nanos);
1261 }
1262 }
1263
1264 /**
1265 * Tries to unlink a timed-out or interrupted wait node to avoid
1266 * accumulating garbage. Internal nodes are simply unspliced
1267 * without CAS since it is harmless if they are traversed anyway
1268 * by releasers. To avoid effects of unsplicing from already
1269 * removed nodes, the list is retraversed in case of an apparent
1270 * race. This is slow when there are a lot of nodes, but we don't
1271 * expect lists to be long enough to outweigh higher-overhead
1272 * schemes.
1273 */
1274 private void removeWaiter(WaitNode node) {
1275 if (node != null) {
1276 node.thread = null;
1277 retry:
1278 for (;;) { // restart on removeWaiter race
1279 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
1280 s = q.next;
1281 if (q.thread != null)
1282 pred = q;
1283 else if (pred != null) {
1284 pred.next = s;
1285 if (pred.thread == null) // check for race
1286 continue retry;
1287 }
1288 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
1289 continue retry;
1290 }
1291 break;
1292 }
1293 }
1294 }
1295
1296 /* ------------- Async tasks -------------- */
1297
1298 /** Base class can act as either FJ or plain Runnable */
1299 static abstract class Async extends ForkJoinTask<Void> implements Runnable {
1300 public final Void getRawResult() { return null; }
1301 public final void setRawResult(Void v) { }
1302 public final void run() { exec(); }
1303 }
1304
1305 static final class AsyncRunnable extends Async {
1306 final Runnable fn;
1307 final CompletableFuture<Void> dst;
1308 AsyncRunnable(Runnable fn, CompletableFuture<Void> dst) {
1309 this.fn = fn; this.dst = dst;
1310 }
1311 public final boolean exec() {
1312 CompletableFuture<Void> d;
1313 if ((d = this.dst) != null) {
1314 Throwable ex;
1315 try {
1316 fn.run();
1317 ex = null;
1318 } catch (Throwable rex) {
1319 ex = rex;
1320 }
1321 d.internalComplete(null, ex);
1322 }
1323 return true;
1324 }
1325 private static final long serialVersionUID = 5232453952276885070L;
1326 }
1327
1328 static final class AsyncSupplier<U> extends Async {
1329 final Supplier<U> fn;
1330 final CompletableFuture<U> dst;
1331 AsyncSupplier(Supplier<U> fn, CompletableFuture<U> dst) {
1332 this.fn = fn; this.dst = dst;
1333 }
1334 public final boolean exec() {
1335 CompletableFuture<U> d;
1336 if ((d = this.dst) != null) {
1337 U u; Throwable ex;
1338 try {
1339 u = fn.get();
1340 ex = null;
1341 } catch (Throwable rex) {
1342 ex = rex;
1343 u = null;
1344 }
1345 d.internalComplete(u, ex);
1346 }
1347 return true;
1348 }
1349 private static final long serialVersionUID = 5232453952276885070L;
1350 }
1351
1352 static final class AsyncFunction<T,U> extends Async {
1353 Function<? super T,? extends U> fn;
1354 T arg;
1355 final CompletableFuture<U> dst;
1356 AsyncFunction(T arg, Function<? super T,? extends U> fn,
1357 CompletableFuture<U> dst) {
1358 this.arg = arg; this.fn = fn; this.dst = dst;
1359 }
1360 public final boolean exec() {
1361 CompletableFuture<U> d; U u; Throwable ex;
1362 if ((d = this.dst) != null) {
1363 try {
1364 u = fn.apply(arg);
1365 ex = null;
1366 } catch (Throwable rex) {
1367 ex = rex;
1368 u = null;
1369 }
1370 d.internalComplete(u, ex);
1371 }
1372 return true;
1373 }
1374 private static final long serialVersionUID = 5232453952276885070L;
1375 }
1376
1377 static final class AsyncBiFunction<T,U,V> extends Async {
1378 final BiFunction<? super T,? super U,? extends V> fn;
1379 final T arg1;
1380 final U arg2;
1381 final CompletableFuture<V> dst;
1382 AsyncBiFunction(T arg1, U arg2,
1383 BiFunction<? super T,? super U,? extends V> fn,
1384 CompletableFuture<V> dst) {
1385 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1386 }
1387 public final boolean exec() {
1388 CompletableFuture<V> d; V v; Throwable ex;
1389 if ((d = this.dst) != null) {
1390 try {
1391 v = fn.apply(arg1, arg2);
1392 ex = null;
1393 } catch (Throwable rex) {
1394 ex = rex;
1395 v = null;
1396 }
1397 d.internalComplete(v, ex);
1398 }
1399 return true;
1400 }
1401 private static final long serialVersionUID = 5232453952276885070L;
1402 }
1403
1404 static final class AsyncBlock<T> extends Async {
1405 Block<? super T> fn;
1406 T arg;
1407 final CompletableFuture<Void> dst;
1408 AsyncBlock(T arg, Block<? super T> fn,
1409 CompletableFuture<Void> dst) {
1410 this.arg = arg; this.fn = fn; this.dst = dst;
1411 }
1412 public final boolean exec() {
1413 CompletableFuture<Void> d; Throwable ex;
1414 if ((d = this.dst) != null) {
1415 try {
1416 fn.accept(arg);
1417 ex = null;
1418 } catch (Throwable rex) {
1419 ex = rex;
1420 }
1421 d.internalComplete(null, ex);
1422 }
1423 return true;
1424 }
1425 private static final long serialVersionUID = 5232453952276885070L;
1426 }
1427
1428 static final class AsyncBiBlock<T,U> extends Async {
1429 final BiBlock<? super T,? super U> fn;
1430 final T arg1;
1431 final U arg2;
1432 final CompletableFuture<Void> dst;
1433 AsyncBiBlock(T arg1, U arg2,
1434 BiBlock<? super T,? super U> fn,
1435 CompletableFuture<Void> dst) {
1436 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1437 }
1438 public final boolean exec() {
1439 CompletableFuture<Void> d; Throwable ex;
1440 if ((d = this.dst) != null) {
1441 try {
1442 fn.accept(arg1, arg2);
1443 ex = null;
1444 } catch (Throwable rex) {
1445 ex = rex;
1446 }
1447 d.internalComplete(null, ex);
1448 }
1449 return true;
1450 }
1451 private static final long serialVersionUID = 5232453952276885070L;
1452 }
1453
1454 /* ------------- Completions -------------- */
1455
1456 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1457 static abstract class Completion extends AtomicInteger implements Runnable {
1458 }
1459
1460 static final class ThenFunction<T,U> extends Completion {
1461 final CompletableFuture<? extends T> src;
1462 final Function<? super T,? extends U> fn;
1463 final CompletableFuture<U> dst;
1464 final Executor executor;
1465 ThenFunction(CompletableFuture<? extends T> src,
1466 final Function<? super T,? extends U> fn,
1467 final CompletableFuture<U> dst, Executor executor) {
1468 this.src = src; this.fn = fn; this.dst = dst;
1469 this.executor = executor;
1470 }
1471 @SuppressWarnings("unchecked") public final void run() {
1472 CompletableFuture<? extends T> a;
1473 Function<? super T,? extends U> fn;
1474 CompletableFuture<U> dst;
1475 Object r; T t; Throwable ex;
1476 if ((dst = this.dst) != null &&
1477 (fn = this.fn) != null &&
1478 (a = this.src) != null &&
1479 (r = a.result) != null &&
1480 compareAndSet(0, 1)) {
1481 if (r instanceof AltResult) {
1482 ex = ((AltResult)r).ex;
1483 t = null;
1484 }
1485 else {
1486 ex = null;
1487 t = (T) r;
1488 }
1489 Executor e = executor;
1490 U u = null;
1491 if (ex == null) {
1492 try {
1493 if (e != null)
1494 e.execute(new AsyncFunction<T,U>(t, fn, dst));
1495 else
1496 u = fn.apply(t);
1497 } catch (Throwable rex) {
1498 ex = rex;
1499 }
1500 }
1501 if (e == null || ex != null)
1502 dst.internalComplete(u, ex);
1503 }
1504 }
1505 private static final long serialVersionUID = 5232453952276885070L;
1506 }
1507
1508 static final class ThenBlock<T> extends Completion {
1509 final CompletableFuture<? extends T> src;
1510 final Block<? super T> fn;
1511 final CompletableFuture<Void> dst;
1512 final Executor executor;
1513 ThenBlock(CompletableFuture<? extends T> src,
1514 final Block<? super T> fn,
1515 final CompletableFuture<Void> dst, Executor executor) {
1516 this.src = src; this.fn = fn; this.dst = dst;
1517 this.executor = executor;
1518 }
1519 @SuppressWarnings("unchecked") public final void run() {
1520 CompletableFuture<? extends T> a;
1521 Block<? super T> fn;
1522 CompletableFuture<Void> dst;
1523 Object r; T t; Throwable ex;
1524 if ((dst = this.dst) != null &&
1525 (fn = this.fn) != null &&
1526 (a = this.src) != null &&
1527 (r = a.result) != null &&
1528 compareAndSet(0, 1)) {
1529 if (r instanceof AltResult) {
1530 ex = ((AltResult)r).ex;
1531 t = null;
1532 }
1533 else {
1534 ex = null;
1535 t = (T) r;
1536 }
1537 Executor e = executor;
1538 if (ex == null) {
1539 try {
1540 if (e != null)
1541 e.execute(new AsyncBlock<T>(t, fn, dst));
1542 else
1543 fn.accept(t);
1544 } catch (Throwable rex) {
1545 ex = rex;
1546 }
1547 }
1548 if (e == null || ex != null)
1549 dst.internalComplete(null, ex);
1550 }
1551 }
1552 private static final long serialVersionUID = 5232453952276885070L;
1553 }
1554
1555 static final class ThenRunnable<T> extends Completion {
1556 final CompletableFuture<? extends T> src;
1557 final Runnable fn;
1558 final CompletableFuture<Void> dst;
1559 final Executor executor;
1560 ThenRunnable(CompletableFuture<? extends T> src,
1561 Runnable fn,
1562 CompletableFuture<Void> dst,
1563 Executor executor) {
1564 this.src = src; this.fn = fn; this.dst = dst;
1565 this.executor = executor;
1566 }
1567 @SuppressWarnings("unchecked") public final void run() {
1568 CompletableFuture<? extends T> a;
1569 Runnable fn;
1570 CompletableFuture<Void> dst;
1571 Object r; Throwable ex;
1572 if ((dst = this.dst) != null &&
1573 (fn = this.fn) != null &&
1574 (a = this.src) != null &&
1575 (r = a.result) != null &&
1576 compareAndSet(0, 1)) {
1577 if (r instanceof AltResult)
1578 ex = ((AltResult)r).ex;
1579 else
1580 ex = null;
1581 Executor e = executor;
1582 if (ex == null) {
1583 try {
1584 if (e != null)
1585 e.execute(new AsyncRunnable(fn, dst));
1586 else
1587 fn.run();
1588 } catch (Throwable rex) {
1589 ex = rex;
1590 }
1591 }
1592 if (e == null || ex != null)
1593 dst.internalComplete(null, ex);
1594 }
1595 }
1596 private static final long serialVersionUID = 5232453952276885070L;
1597 }
1598
1599 static final class AndFunction<T,U,V> extends Completion {
1600 final CompletableFuture<? extends T> src;
1601 final CompletableFuture<? extends U> snd;
1602 final BiFunction<? super T,? super U,? extends V> fn;
1603 final CompletableFuture<V> dst;
1604 final Executor executor;
1605 AndFunction(CompletableFuture<? extends T> src,
1606 CompletableFuture<? extends U> snd,
1607 BiFunction<? super T,? super U,? extends V> fn,
1608 CompletableFuture<V> dst, Executor executor) {
1609 this.src = src; this.snd = snd;
1610 this.fn = fn; this.dst = dst;
1611 this.executor = executor;
1612 }
1613 @SuppressWarnings("unchecked") public final void run() {
1614 Object r, s; T t; U u; Throwable ex;
1615 CompletableFuture<? extends T> a;
1616 CompletableFuture<? extends U> b;
1617 BiFunction<? super T,? super U,? extends V> fn;
1618 CompletableFuture<V> dst;
1619 if ((dst = this.dst) != null &&
1620 (fn = this.fn) != null &&
1621 (a = this.src) != null &&
1622 (r = a.result) != null &&
1623 (b = this.snd) != null &&
1624 (s = b.result) != null &&
1625 compareAndSet(0, 1)) {
1626 if (r instanceof AltResult) {
1627 ex = ((AltResult)r).ex;
1628 t = null;
1629 }
1630 else {
1631 ex = null;
1632 t = (T) r;
1633 }
1634 if (ex != null)
1635 u = null;
1636 else if (s instanceof AltResult) {
1637 ex = ((AltResult)s).ex;
1638 u = null;
1639 }
1640 else
1641 u = (U) s;
1642 Executor e = executor;
1643 V v = null;
1644 if (ex == null) {
1645 try {
1646 if (e != null)
1647 e.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1648 else
1649 v = fn.apply(t, u);
1650 } catch (Throwable rex) {
1651 ex = rex;
1652 }
1653 }
1654 if (e == null || ex != null)
1655 dst.internalComplete(v, ex);
1656 }
1657 }
1658 private static final long serialVersionUID = 5232453952276885070L;
1659 }
1660
1661 static final class AndBlock<T,U> extends Completion {
1662 final CompletableFuture<? extends T> src;
1663 final CompletableFuture<? extends U> snd;
1664 final BiBlock<? super T,? super U> fn;
1665 final CompletableFuture<Void> dst;
1666 final Executor executor;
1667 AndBlock(CompletableFuture<? extends T> src,
1668 CompletableFuture<? extends U> snd,
1669 BiBlock<? super T,? super U> fn,
1670 CompletableFuture<Void> dst, Executor executor) {
1671 this.src = src; this.snd = snd;
1672 this.fn = fn; this.dst = dst;
1673 this.executor = executor;
1674 }
1675 @SuppressWarnings("unchecked") public final void run() {
1676 Object r, s; T t; U u; Throwable ex;
1677 CompletableFuture<? extends T> a;
1678 CompletableFuture<? extends U> b;
1679 BiBlock<? super T,? super U> fn;
1680 CompletableFuture<Void> dst;
1681 if ((dst = this.dst) != null &&
1682 (fn = this.fn) != null &&
1683 (a = this.src) != null &&
1684 (r = a.result) != null &&
1685 (b = this.snd) != null &&
1686 (s = b.result) != null &&
1687 compareAndSet(0, 1)) {
1688 if (r instanceof AltResult) {
1689 ex = ((AltResult)r).ex;
1690 t = null;
1691 }
1692 else {
1693 ex = null;
1694 t = (T) r;
1695 }
1696 if (ex != null)
1697 u = null;
1698 else if (s instanceof AltResult) {
1699 ex = ((AltResult)s).ex;
1700 u = null;
1701 }
1702 else
1703 u = (U) s;
1704 Executor e = executor;
1705 if (ex == null) {
1706 try {
1707 if (e != null)
1708 e.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
1709 else
1710 fn.accept(t, u);
1711 } catch (Throwable rex) {
1712 ex = rex;
1713 }
1714 }
1715 if (e == null || ex != null)
1716 dst.internalComplete(null, ex);
1717 }
1718 }
1719 private static final long serialVersionUID = 5232453952276885070L;
1720 }
1721
1722 static final class AndRunnable<T> extends Completion {
1723 final CompletableFuture<? extends T> src;
1724 final CompletableFuture<?> snd;
1725 final Runnable fn;
1726 final CompletableFuture<Void> dst;
1727 final Executor executor;
1728 AndRunnable(CompletableFuture<? extends T> src,
1729 CompletableFuture<?> snd,
1730 Runnable fn,
1731 CompletableFuture<Void> dst, Executor executor) {
1732 this.src = src; this.snd = snd;
1733 this.fn = fn; this.dst = dst;
1734 this.executor = executor;
1735 }
1736 @SuppressWarnings("unchecked") public final void run() {
1737 Object r, s; Throwable ex;
1738 final CompletableFuture<? extends T> a;
1739 final CompletableFuture<?> b;
1740 final Runnable fn;
1741 final CompletableFuture<Void> dst;
1742 if ((dst = this.dst) != null &&
1743 (fn = this.fn) != null &&
1744 (a = this.src) != null &&
1745 (r = a.result) != null &&
1746 (b = this.snd) != null &&
1747 (s = b.result) != null &&
1748 compareAndSet(0, 1)) {
1749 if (r instanceof AltResult)
1750 ex = ((AltResult)r).ex;
1751 else
1752 ex = null;
1753 if (ex == null && (s instanceof AltResult))
1754 ex = ((AltResult)s).ex;
1755 Executor e = executor;
1756 if (ex == null) {
1757 try {
1758 if (e != null)
1759 e.execute(new AsyncRunnable(fn, dst));
1760 else
1761 fn.run();
1762 } catch (Throwable rex) {
1763 ex = rex;
1764 }
1765 }
1766 if (e == null || ex != null)
1767 dst.internalComplete(null, ex);
1768 }
1769 }
1770 private static final long serialVersionUID = 5232453952276885070L;
1771 }
1772
1773 static final class OrFunction<T,U> extends Completion {
1774 final CompletableFuture<? extends T> src;
1775 final CompletableFuture<? extends T> snd;
1776 final Function<? super T,? extends U> fn;
1777 final CompletableFuture<U> dst;
1778 final Executor executor;
1779 OrFunction(CompletableFuture<? extends T> src,
1780 CompletableFuture<? extends T> snd,
1781 Function<? super T,? extends U> fn,
1782 CompletableFuture<U> dst, Executor executor) {
1783 this.src = src; this.snd = snd;
1784 this.fn = fn; this.dst = dst;
1785 this.executor = executor;
1786 }
1787 @SuppressWarnings("unchecked") public final void run() {
1788 Object r; T t; Throwable ex;
1789 CompletableFuture<? extends T> a;
1790 CompletableFuture<? extends T> b;
1791 Function<? super T,? extends U> fn;
1792 CompletableFuture<U> dst;
1793 if ((dst = this.dst) != null &&
1794 (fn = this.fn) != null &&
1795 (((a = this.src) != null && (r = a.result) != null) ||
1796 ((b = this.snd) != null && (r = b.result) != null)) &&
1797 compareAndSet(0, 1)) {
1798 if (r instanceof AltResult) {
1799 ex = ((AltResult)r).ex;
1800 t = null;
1801 }
1802 else {
1803 ex = null;
1804 t = (T) r;
1805 }
1806 Executor e = executor;
1807 U u = null;
1808 if (ex == null) {
1809 try {
1810 if (e != null)
1811 e.execute(new AsyncFunction<T,U>(t, fn, dst));
1812 else
1813 u = fn.apply(t);
1814 } catch (Throwable rex) {
1815 ex = rex;
1816 }
1817 }
1818 if (e == null || ex != null)
1819 dst.internalComplete(u, ex);
1820 }
1821 }
1822 private static final long serialVersionUID = 5232453952276885070L;
1823 }
1824
1825 static final class OrBlock<T> extends Completion {
1826 final CompletableFuture<? extends T> src;
1827 final CompletableFuture<? extends T> snd;
1828 final Block<? super T> fn;
1829 final CompletableFuture<Void> dst;
1830 final Executor executor;
1831 OrBlock(CompletableFuture<? extends T> src,
1832 CompletableFuture<? extends T> snd,
1833 Block<? super T> fn,
1834 CompletableFuture<Void> dst, Executor executor) {
1835 this.src = src; this.snd = snd;
1836 this.fn = fn; this.dst = dst;
1837 this.executor = executor;
1838 }
1839 @SuppressWarnings("unchecked") public final void run() {
1840 Object r; T t; Throwable ex;
1841 CompletableFuture<? extends T> a;
1842 CompletableFuture<? extends T> b;
1843 Block<? super T> fn;
1844 CompletableFuture<Void> dst;
1845 if ((dst = this.dst) != null &&
1846 (fn = this.fn) != null &&
1847 (((a = this.src) != null && (r = a.result) != null) ||
1848 ((b = this.snd) != null && (r = b.result) != null)) &&
1849 compareAndSet(0, 1)) {
1850 if (r instanceof AltResult) {
1851 ex = ((AltResult)r).ex;
1852 t = null;
1853 }
1854 else {
1855 ex = null;
1856 t = (T) r;
1857 }
1858 Executor e = executor;
1859 if (ex == null) {
1860 try {
1861 if (e != null)
1862 e.execute(new AsyncBlock<T>(t, fn, dst));
1863 else
1864 fn.accept(t);
1865 } catch (Throwable rex) {
1866 ex = rex;
1867 }
1868 }
1869 if (e == null || ex != null)
1870 dst.internalComplete(null, ex);
1871 }
1872 }
1873 private static final long serialVersionUID = 5232453952276885070L;
1874 }
1875
1876 static final class OrRunnable<T> extends Completion {
1877 final CompletableFuture<? extends T> src;
1878 final CompletableFuture<?> snd;
1879 final Runnable fn;
1880 final CompletableFuture<Void> dst;
1881 final Executor executor;
1882 OrRunnable(CompletableFuture<? extends T> src,
1883 CompletableFuture<?> snd,
1884 Runnable fn,
1885 CompletableFuture<Void> dst, Executor executor) {
1886 this.src = src; this.snd = snd;
1887 this.fn = fn; this.dst = dst;
1888 this.executor = executor;
1889 }
1890 @SuppressWarnings("unchecked") public final void run() {
1891 Object r; Throwable ex;
1892 CompletableFuture<? extends T> a;
1893 final CompletableFuture<?> b;
1894 final Runnable fn;
1895 final CompletableFuture<Void> dst;
1896 if ((dst = this.dst) != null &&
1897 (fn = this.fn) != null &&
1898 (((a = this.src) != null && (r = a.result) != null) ||
1899 ((b = this.snd) != null && (r = b.result) != null)) &&
1900 compareAndSet(0, 1)) {
1901 if (r instanceof AltResult)
1902 ex = ((AltResult)r).ex;
1903 else
1904 ex = null;
1905 Executor e = executor;
1906 if (ex == null) {
1907 try {
1908 if (e != null)
1909 e.execute(new AsyncRunnable(fn, dst));
1910 else
1911 fn.run();
1912 } catch (Throwable rex) {
1913 ex = rex;
1914 }
1915 }
1916 if (e == null || ex != null)
1917 dst.internalComplete(null, ex);
1918 }
1919 }
1920 private static final long serialVersionUID = 5232453952276885070L;
1921 }
1922
1923 static final class ExceptionAction<T> extends Completion {
1924 final CompletableFuture<? extends T> src;
1925 final Function<? super Throwable, ? extends T> fn;
1926 final CompletableFuture<T> dst;
1927 ExceptionAction(CompletableFuture<? extends T> src,
1928 Function<? super Throwable, ? extends T> fn,
1929 CompletableFuture<T> dst) {
1930 this.src = src; this.fn = fn; this.dst = dst;
1931 }
1932 @SuppressWarnings("unchecked") public final void run() {
1933 CompletableFuture<? extends T> a;
1934 Function<? super Throwable, ? extends T> fn;
1935 CompletableFuture<T> dst;
1936 Object r; T t = null; Throwable ex, dx = null;
1937 if ((dst = this.dst) != null &&
1938 (fn = this.fn) != null &&
1939 (a = this.src) != null &&
1940 (r = a.result) != null &&
1941 compareAndSet(0, 1)) {
1942 if ((r instanceof AltResult) &&
1943 (ex = ((AltResult)r).ex) != null) {
1944 try {
1945 t = fn.apply(ex);
1946 } catch (Throwable rex) {
1947 dx = rex;
1948 }
1949 }
1950 else
1951 t = (T) r;
1952 dst.internalComplete(t, dx);
1953 }
1954 }
1955 private static final long serialVersionUID = 5232453952276885070L;
1956 }
1957
1958 static final class ThenCopy<T> extends Completion {
1959 final CompletableFuture<? extends T> src;
1960 final CompletableFuture<T> dst;
1961 ThenCopy(CompletableFuture<? extends T> src,
1962 CompletableFuture<T> dst) {
1963 this.src = src; this.dst = dst;
1964 }
1965 @SuppressWarnings("unchecked") public final void run() {
1966 CompletableFuture<? extends T> a;
1967 CompletableFuture<T> dst;
1968 Object r; Object t; Throwable ex;
1969 if ((dst = this.dst) != null &&
1970 (a = this.src) != null &&
1971 (r = a.result) != null &&
1972 compareAndSet(0, 1)) {
1973 if (r instanceof AltResult) {
1974 ex = ((AltResult)r).ex;
1975 t = null;
1976 }
1977 else {
1978 ex = null;
1979 t = r;
1980 }
1981 dst.internalComplete(t, ex);
1982 }
1983 }
1984 private static final long serialVersionUID = 5232453952276885070L;
1985 }
1986
1987 static final class ThenHandle<T,U> extends Completion {
1988 final CompletableFuture<? extends T> src;
1989 final BiFunction<? super T, Throwable, ? extends U> fn;
1990 final CompletableFuture<U> dst;
1991 ThenHandle(CompletableFuture<? extends T> src,
1992 BiFunction<? super T, Throwable, ? extends U> fn,
1993 final CompletableFuture<U> dst) {
1994 this.src = src; this.fn = fn; this.dst = dst;
1995 }
1996 @SuppressWarnings("unchecked") public final void run() {
1997 CompletableFuture<? extends T> a;
1998 BiFunction<? super T, Throwable, ? extends U> fn;
1999 CompletableFuture<U> dst;
2000 Object r; T t; Throwable ex;
2001 if ((dst = this.dst) != null &&
2002 (fn = this.fn) != null &&
2003 (a = this.src) != null &&
2004 (r = a.result) != null &&
2005 compareAndSet(0, 1)) {
2006 if (r instanceof AltResult) {
2007 ex = ((AltResult)r).ex;
2008 t = null;
2009 }
2010 else {
2011 ex = null;
2012 t = (T) r;
2013 }
2014 U u = null; Throwable dx = null;
2015 try {
2016 u = fn.apply(t, ex);
2017 } catch (Throwable rex) {
2018 dx = rex;
2019 }
2020 dst.internalComplete(u, dx);
2021 }
2022 }
2023 private static final long serialVersionUID = 5232453952276885070L;
2024 }
2025
2026 static final class ThenCompose<T,U> extends Completion {
2027 final CompletableFuture<? extends T> src;
2028 final Function<? super T, CompletableFuture<U>> fn;
2029 final CompletableFuture<U> dst;
2030 ThenCompose(CompletableFuture<? extends T> src,
2031 Function<? super T, CompletableFuture<U>> fn,
2032 final CompletableFuture<U> dst) {
2033 this.src = src; this.fn = fn; this.dst = dst;
2034 }
2035 @SuppressWarnings("unchecked") public final void run() {
2036 CompletableFuture<? extends T> a;
2037 Function<? super T, CompletableFuture<U>> fn;
2038 CompletableFuture<U> dst;
2039 Object r; T t; Throwable ex;
2040 if ((dst = this.dst) != null &&
2041 (fn = this.fn) != null &&
2042 (a = this.src) != null &&
2043 (r = a.result) != null &&
2044 compareAndSet(0, 1)) {
2045 if (r instanceof AltResult) {
2046 ex = ((AltResult)r).ex;
2047 t = null;
2048 }
2049 else {
2050 ex = null;
2051 t = (T) r;
2052 }
2053 CompletableFuture<U> c = null;
2054 U u = null;
2055 boolean complete = false;
2056 if (ex == null) {
2057 try {
2058 c = fn.apply(t);
2059 } catch (Throwable rex) {
2060 ex = rex;
2061 }
2062 }
2063 if (ex != null || c == null) {
2064 if (ex == null)
2065 ex = new NullPointerException();
2066 }
2067 else {
2068 ThenCopy<U> d = null;
2069 Object s;
2070 if ((s = c.result) == null) {
2071 CompletionNode p = new CompletionNode
2072 (d = new ThenCopy<U>(c, dst));
2073 while ((s = c.result) == null) {
2074 if (UNSAFE.compareAndSwapObject
2075 (c, COMPLETIONS, p.next = c.completions, p))
2076 break;
2077 }
2078 }
2079 if (s != null && (d == null || d.compareAndSet(0, 1))) {
2080 complete = true;
2081 if (s instanceof AltResult) {
2082 ex = ((AltResult)s).ex; // no rewrap
2083 u = null;
2084 }
2085 else
2086 u = (U) s;
2087 }
2088 }
2089 if (complete || ex != null)
2090 dst.internalComplete(u, ex);
2091 if (c != null)
2092 c.helpPostComplete();
2093 }
2094 }
2095 private static final long serialVersionUID = 5232453952276885070L;
2096 }
2097
2098 /* ------------- then/and/or implementations -------------- */
2099
2100 @SuppressWarnings("unchecked") private <U> CompletableFuture<U> thenFunction
2101 (Function<? super T,? extends U> fn,
2102 Executor executor) {
2103 if (fn == null) throw new NullPointerException();
2104 CompletableFuture<U> dst = new CompletableFuture<U>();
2105 ThenFunction<T,U> d = null;
2106 Object r;
2107 if ((r = result) == null) {
2108 CompletionNode p = new CompletionNode
2109 (d = new ThenFunction<T,U>(this, fn, dst, executor));
2110 while ((r = result) == null) {
2111 if (UNSAFE.compareAndSwapObject
2112 (this, COMPLETIONS, p.next = completions, p))
2113 break;
2114 }
2115 }
2116 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2117 T t; Throwable ex;
2118 if (r instanceof AltResult) {
2119 ex = ((AltResult)r).ex;
2120 t = null;
2121 }
2122 else {
2123 ex = null;
2124 t = (T) r;
2125 }
2126 Executor e = executor;
2127 U u = null;
2128 if (ex == null) {
2129 try {
2130 if (e != null)
2131 e.execute(new AsyncFunction<T,U>(t, fn, dst));
2132 else
2133 u = fn.apply(t);
2134 } catch (Throwable rex) {
2135 ex = rex;
2136 }
2137 }
2138 if (e == null || ex != null)
2139 dst.internalComplete(u, ex);
2140 }
2141 helpPostComplete();
2142 return dst;
2143 }
2144
2145 @SuppressWarnings("unchecked") private CompletableFuture<Void> thenBlock
2146 (Block<? super T> fn,
2147 Executor executor) {
2148 if (fn == null) throw new NullPointerException();
2149 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2150 ThenBlock<T> d = null;
2151 Object r;
2152 if ((r = result) == null) {
2153 CompletionNode p = new CompletionNode
2154 (d = new ThenBlock<T>(this, fn, dst, executor));
2155 while ((r = result) == null) {
2156 if (UNSAFE.compareAndSwapObject
2157 (this, COMPLETIONS, p.next = completions, p))
2158 break;
2159 }
2160 }
2161 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2162 T t; Throwable ex;
2163 if (r instanceof AltResult) {
2164 ex = ((AltResult)r).ex;
2165 t = null;
2166 }
2167 else {
2168 ex = null;
2169 t = (T) r;
2170 }
2171 Executor e = executor;
2172 if (ex == null) {
2173 try {
2174 if (e != null)
2175 e.execute(new AsyncBlock<T>(t, fn, dst));
2176 else
2177 fn.accept(t);
2178 } catch (Throwable rex) {
2179 ex = rex;
2180 }
2181 }
2182 if (e == null || ex != null)
2183 dst.internalComplete(null, ex);
2184 }
2185 helpPostComplete();
2186 return dst;
2187 }
2188
2189 @SuppressWarnings("unchecked") private CompletableFuture<Void> thenRunnable
2190 (Runnable action,
2191 Executor executor) {
2192 if (action == null) throw new NullPointerException();
2193 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2194 ThenRunnable<T> d = null;
2195 Object r;
2196 if ((r = result) == null) {
2197 CompletionNode p = new CompletionNode
2198 (d = new ThenRunnable<T>(this, action, dst, executor));
2199 while ((r = result) == null) {
2200 if (UNSAFE.compareAndSwapObject
2201 (this, COMPLETIONS, p.next = completions, p))
2202 break;
2203 }
2204 }
2205 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2206 Throwable ex;
2207 if (r instanceof AltResult)
2208 ex = ((AltResult)r).ex;
2209 else
2210 ex = null;
2211 Executor e = executor;
2212 if (ex == null) {
2213 try {
2214 if (e != null)
2215 e.execute(new AsyncRunnable(action, dst));
2216 else
2217 action.run();
2218 } catch (Throwable rex) {
2219 ex = rex;
2220 }
2221 }
2222 if (e == null || ex != null)
2223 dst.internalComplete(null, ex);
2224 }
2225 helpPostComplete();
2226 return dst;
2227 }
2228
2229 @SuppressWarnings("unchecked") private <U,V> CompletableFuture<V> andFunction
2230 (CompletableFuture<? extends U> other,
2231 BiFunction<? super T,? super U,? extends V> fn,
2232 Executor executor) {
2233 if (other == null || fn == null) throw new NullPointerException();
2234 CompletableFuture<V> dst = new CompletableFuture<V>();
2235 AndFunction<T,U,V> d = null;
2236 Object r, s = null;
2237 if ((r = result) == null || (s = other.result) == null) {
2238 d = new AndFunction<T,U,V>(this, other, fn, dst, executor);
2239 CompletionNode q = null, p = new CompletionNode(d);
2240 while ((r == null && (r = result) == null) ||
2241 (s == null && (s = other.result) == null)) {
2242 if (q != null) {
2243 if (s != null ||
2244 UNSAFE.compareAndSwapObject
2245 (other, COMPLETIONS, q.next = other.completions, q))
2246 break;
2247 }
2248 else if (r != null ||
2249 UNSAFE.compareAndSwapObject
2250 (this, COMPLETIONS, p.next = completions, p)) {
2251 if (s != null)
2252 break;
2253 q = new CompletionNode(d);
2254 }
2255 }
2256 }
2257 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2258 T t; U u; Throwable ex;
2259 if (r instanceof AltResult) {
2260 ex = ((AltResult)r).ex;
2261 t = null;
2262 }
2263 else {
2264 ex = null;
2265 t = (T) r;
2266 }
2267 if (ex != null)
2268 u = null;
2269 else if (s instanceof AltResult) {
2270 ex = ((AltResult)s).ex;
2271 u = null;
2272 }
2273 else
2274 u = (U) s;
2275 Executor e = executor;
2276 V v = null;
2277 if (ex == null) {
2278 try {
2279 if (e != null)
2280 e.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
2281 else
2282 v = fn.apply(t, u);
2283 } catch (Throwable rex) {
2284 ex = rex;
2285 }
2286 }
2287 if (e == null || ex != null)
2288 dst.internalComplete(v, ex);
2289 }
2290 helpPostComplete();
2291 other.helpPostComplete();
2292 return dst;
2293 }
2294
2295 @SuppressWarnings("unchecked") private <U> CompletableFuture<Void> andBlock
2296 (CompletableFuture<? extends U> other,
2297 BiBlock<? super T,? super U> fn,
2298 Executor executor) {
2299 if (other == null || fn == null) throw new NullPointerException();
2300 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2301 AndBlock<T,U> d = null;
2302 Object r, s = null;
2303 if ((r = result) == null || (s = other.result) == null) {
2304 d = new AndBlock<T,U>(this, other, fn, dst, executor);
2305 CompletionNode q = null, p = new CompletionNode(d);
2306 while ((r == null && (r = result) == null) ||
2307 (s == null && (s = other.result) == null)) {
2308 if (q != null) {
2309 if (s != null ||
2310 UNSAFE.compareAndSwapObject
2311 (other, COMPLETIONS, q.next = other.completions, q))
2312 break;
2313 }
2314 else if (r != null ||
2315 UNSAFE.compareAndSwapObject
2316 (this, COMPLETIONS, p.next = completions, p)) {
2317 if (s != null)
2318 break;
2319 q = new CompletionNode(d);
2320 }
2321 }
2322 }
2323 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2324 T t; U u; Throwable ex;
2325 if (r instanceof AltResult) {
2326 ex = ((AltResult)r).ex;
2327 t = null;
2328 }
2329 else {
2330 ex = null;
2331 t = (T) r;
2332 }
2333 if (ex != null)
2334 u = null;
2335 else if (s instanceof AltResult) {
2336 ex = ((AltResult)s).ex;
2337 u = null;
2338 }
2339 else
2340 u = (U) s;
2341 Executor e = executor;
2342 if (ex == null) {
2343 try {
2344 if (e != null)
2345 e.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
2346 else
2347 fn.accept(t, u);
2348 } catch (Throwable rex) {
2349 ex = rex;
2350 }
2351 }
2352 if (e == null || ex != null)
2353 dst.internalComplete(null, ex);
2354 }
2355 helpPostComplete();
2356 other.helpPostComplete();
2357 return dst;
2358 }
2359
2360 @SuppressWarnings("unchecked") private CompletableFuture<Void> andRunnable
2361 (CompletableFuture<?> other,
2362 Runnable action,
2363 Executor executor) {
2364 if (other == null || action == null) throw new NullPointerException();
2365 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2366 AndRunnable<T> d = null;
2367 Object r, s = null;
2368 if ((r = result) == null || (s = other.result) == null) {
2369 d = new AndRunnable<T>(this, other, action, dst, executor);
2370 CompletionNode q = null, p = new CompletionNode(d);
2371 while ((r == null && (r = result) == null) ||
2372 (s == null && (s = other.result) == null)) {
2373 if (q != null) {
2374 if (s != null ||
2375 UNSAFE.compareAndSwapObject
2376 (other, COMPLETIONS, q.next = other.completions, q))
2377 break;
2378 }
2379 else if (r != null ||
2380 UNSAFE.compareAndSwapObject
2381 (this, COMPLETIONS, p.next = completions, p)) {
2382 if (s != null)
2383 break;
2384 q = new CompletionNode(d);
2385 }
2386 }
2387 }
2388 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2389 Throwable ex;
2390 if (r instanceof AltResult)
2391 ex = ((AltResult)r).ex;
2392 else
2393 ex = null;
2394 if (ex == null && (s instanceof AltResult))
2395 ex = ((AltResult)s).ex;
2396 Executor e = executor;
2397 if (ex == null) {
2398 try {
2399 if (e != null)
2400 e.execute(new AsyncRunnable(action, dst));
2401 else
2402 action.run();
2403 } catch (Throwable rex) {
2404 ex = rex;
2405 }
2406 }
2407 if (e == null || ex != null)
2408 dst.internalComplete(null, ex);
2409 }
2410 helpPostComplete();
2411 other.helpPostComplete();
2412 return dst;
2413 }
2414
2415 @SuppressWarnings("unchecked") private <U> CompletableFuture<U> orFunction
2416 (CompletableFuture<? extends T> other,
2417 Function<? super T, U> fn,
2418 Executor executor) {
2419 if (other == null || fn == null) throw new NullPointerException();
2420 CompletableFuture<U> dst = new CompletableFuture<U>();
2421 OrFunction<T,U> d = null;
2422 Object r;
2423 if ((r = result) == null && (r = other.result) == null) {
2424 d = new OrFunction<T,U>(this, other, fn, dst, executor);
2425 CompletionNode q = null, p = new CompletionNode(d);
2426 while ((r = result) == null && (r = other.result) == null) {
2427 if (q != null) {
2428 if (UNSAFE.compareAndSwapObject
2429 (other, COMPLETIONS, q.next = other.completions, q))
2430 break;
2431 }
2432 else if (UNSAFE.compareAndSwapObject
2433 (this, COMPLETIONS, p.next = completions, p))
2434 q = new CompletionNode(d);
2435 }
2436 }
2437 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2438 T t; Throwable ex;
2439 if (r instanceof AltResult) {
2440 ex = ((AltResult)r).ex;
2441 t = null;
2442 }
2443 else {
2444 ex = null;
2445 t = (T) r;
2446 }
2447 Executor e = executor;
2448 U u = null;
2449 if (ex == null) {
2450 try {
2451 if (e != null)
2452 e.execute(new AsyncFunction<T,U>(t, fn, dst));
2453 else
2454 u = fn.apply(t);
2455 } catch (Throwable rex) {
2456 ex = rex;
2457 }
2458 }
2459 if (e == null || ex != null)
2460 dst.internalComplete(u, ex);
2461 }
2462 helpPostComplete();
2463 other.helpPostComplete();
2464 return dst;
2465 }
2466
2467 @SuppressWarnings("unchecked") private CompletableFuture<Void> orBlock
2468 (CompletableFuture<? extends T> other,
2469 Block<? super T> fn,
2470 Executor executor) {
2471 if (other == null || fn == null) throw new NullPointerException();
2472 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2473 OrBlock<T> d = null;
2474 Object r;
2475 if ((r = result) == null && (r = other.result) == null) {
2476 d = new OrBlock<T>(this, other, fn, dst, executor);
2477 CompletionNode q = null, p = new CompletionNode(d);
2478 while ((r = result) == null && (r = other.result) == null) {
2479 if (q != null) {
2480 if (UNSAFE.compareAndSwapObject
2481 (other, COMPLETIONS, q.next = other.completions, q))
2482 break;
2483 }
2484 else if (UNSAFE.compareAndSwapObject
2485 (this, COMPLETIONS, p.next = completions, p))
2486 q = new CompletionNode(d);
2487 }
2488 }
2489 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2490 T t; Throwable ex;
2491 if (r instanceof AltResult) {
2492 ex = ((AltResult)r).ex;
2493 t = null;
2494 }
2495 else {
2496 ex = null;
2497 t = (T) r;
2498 }
2499 Executor e = executor;
2500 if (ex == null) {
2501 try {
2502 if (e != null)
2503 e.execute(new AsyncBlock<T>(t, fn, dst));
2504 else
2505 fn.accept(t);
2506 } catch (Throwable rex) {
2507 ex = rex;
2508 }
2509 }
2510 if (e == null || ex != null)
2511 dst.internalComplete(null, ex);
2512 }
2513 helpPostComplete();
2514 other.helpPostComplete();
2515 return dst;
2516 }
2517
2518 @SuppressWarnings("unchecked") private CompletableFuture<Void> orRunnable
2519 (CompletableFuture<?> other,
2520 Runnable action,
2521 Executor executor) {
2522 if (other == null || action == null) throw new NullPointerException();
2523 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2524 OrRunnable<T> d = null;
2525 Object r;
2526 if ((r = result) == null && (r = other.result) == null) {
2527 d = new OrRunnable<T>(this, other, action, dst, executor);
2528 CompletionNode q = null, p = new CompletionNode(d);
2529 while ((r = result) == null && (r = other.result) == null) {
2530 if (q != null) {
2531 if (UNSAFE.compareAndSwapObject
2532 (other, COMPLETIONS, q.next = other.completions, q))
2533 break;
2534 }
2535 else if (UNSAFE.compareAndSwapObject
2536 (this, COMPLETIONS, p.next = completions, p))
2537 q = new CompletionNode(d);
2538 }
2539 }
2540 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2541 Throwable ex;
2542 if (r instanceof AltResult)
2543 ex = ((AltResult)r).ex;
2544 else
2545 ex = null;
2546 Executor e = executor;
2547 if (ex == null) {
2548 try {
2549 if (e != null)
2550 e.execute(new AsyncRunnable(action, dst));
2551 else
2552 action.run();
2553 } catch (Throwable rex) {
2554 ex = rex;
2555 }
2556 }
2557 if (e == null || ex != null)
2558 dst.internalComplete(null, ex);
2559 }
2560 helpPostComplete();
2561 other.helpPostComplete();
2562 return dst;
2563 }
2564
2565 // Unsafe mechanics
2566 private static final sun.misc.Unsafe UNSAFE;
2567 private static final long RESULT;
2568 private static final long WAITERS;
2569 private static final long COMPLETIONS;
2570 static {
2571 try {
2572 UNSAFE = sun.misc.Unsafe.getUnsafe();
2573 Class<?> k = CompletableFuture.class;
2574 RESULT = UNSAFE.objectFieldOffset
2575 (k.getDeclaredField("result"));
2576 WAITERS = UNSAFE.objectFieldOffset
2577 (k.getDeclaredField("waiters"));
2578 COMPLETIONS = UNSAFE.objectFieldOffset
2579 (k.getDeclaredField("completions"));
2580 } catch (Exception e) {
2581 throw new Error(e);
2582 }
2583 }
2584 }