ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.19
Committed: Sun Dec 30 22:59:10 2012 UTC (11 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.18: +323 -236 lines
Log Message:
Use CompletionExceptions

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Block;
10 import java.util.function.BiBlock;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.concurrent.locks.LockSupport;
25
26 /**
27 * A {@link Future} that may be explicitly completed (setting its
28 * value and status), and may include dependent functions and actions
29 * that trigger upon its completion.
30 *
31 * <p>Similar methods are available for Functions, Blocks, and
32 * Runnables, depending on whether actions require arguments and/or
33 * produce results. Functions and actions supplied for dependent
34 * completions (mainly using methods with prefix {@code then}) may be
35 * performed by the thread that completes the current
36 * CompletableFuture, or by any other caller of these methods. There
37 * are no guarantees about the order of processing completions unless
38 * constrained by these methods.
39 *
40 * <p>When two or more threads attempt to {@link #complete} or {@link
41 * #completeExceptionally} a CompletableFuture, only one of them
42 * succeeds.
43 *
44 * <p> Upon exceptional completion, or when a completion entails
45 * computation of a function or action, and it terminates abruptly
46 * with an (unchecked) exception or error, then further completions
47 * act as {@code completeExceptionally} with a {@link
48 * CompletionException} holding that exception as its cause. If a
49 * CompletableFuture completes exceptionally, and is not followed by a
50 * {@link #exceptionally} or {@link #handle} completion, then all of
51 * its dependents (and their dependents) also complete exceptionally
52 * with CompletionExceptions holding the ultimate cause. For
53 * compatibility with {@link Future}, in case of a CompletionException,
54 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw a
55 * {@link ExecutionException} with the same cause as would be held in
56 * the corresponding CompletionException.
57 *
58 * <p>CompletableFutures themselves do not execute asynchronously.
59 * However, the {@code async} methods provide commonly useful ways to
60 * commence asynchronous processing, using either a given {@link
61 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
62 * function or action that will result in the completion of a new
63 * CompletableFuture.
64 *
65 * @author Doug Lea
66 * @since 1.8
67 */
68 public class CompletableFuture<T> implements Future<T> {
69 /*
70 * Quick overview (more to come):
71 *
72 * 1. Non-nullness of field result indicates done. An AltResult is
73 * used to box null as a result, as well as to hold exceptions.
74 *
75 * 2. Waiters are held in a Treiber stack similar to the one used
76 * in FutureTask.
77 *
78 * 3. Completions are also kept in a list/stack, and pulled off
79 * and run when completion is triggered. Because post-processing
80 * may race with direct calls, completions extend AtomicInteger so
81 * callers can claim the action via compareAndSet(0, 1).
82 */
83
84 static final class AltResult {
85 final Throwable ex; // null only for NIL
86 AltResult(Throwable ex) { this.ex = ex; }
87 }
88
89 static final AltResult NIL = new AltResult(null);
90
91 /**
92 * Simple linked list nodes to record waiting threads in a Treiber
93 * stack. See other classes such as Phaser and SynchronousQueue
94 * for more detailed explanation.
95 */
96 static final class WaitNode {
97 volatile Thread thread;
98 volatile WaitNode next;
99 }
100
101 /**
102 * Simple linked list nodes to record completions, used in
103 * basically the same way as WaitNodes
104 */
105 static final class CompletionNode {
106 final Completion completion;
107 volatile CompletionNode next;
108 CompletionNode(Completion completion) { this.completion = completion; }
109 }
110
111 volatile Object result; // either the result or boxed AltResult
112 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
113 volatile CompletionNode completions; // list (Treiber stack) of completions
114
115 /**
116 * Creates a new incomplete CompletableFuture.
117 */
118 public CompletableFuture() {
119 }
120
121 /**
122 * Asynchronously executes in the {@link
123 * ForkJoinPool#commonPool()}, a task that completes the returned
124 * CompletableFuture with the result of the given Supplier.
125 *
126 * @param supplier a function returning the value to be used
127 * to complete the returned CompletableFuture
128 * @return the CompletableFuture
129 */
130 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
131 if (supplier == null) throw new NullPointerException();
132 CompletableFuture<U> f = new CompletableFuture<U>();
133 ForkJoinPool.commonPool().
134 execute((ForkJoinTask<?>)new AsyncSupplier<U>(supplier, f));
135 return f;
136 }
137
138 /**
139 * Asynchronously executes using the given executor, a task that
140 * completes the returned CompletableFuture with the result of the
141 * given Supplier.
142 *
143 * @param supplier a function returning the value to be used
144 * to complete the returned CompletableFuture
145 * @param executor the executor to use for asynchronous execution
146 * @return the CompletableFuture
147 */
148 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
149 Executor executor) {
150 if (executor == null || supplier == null)
151 throw new NullPointerException();
152 CompletableFuture<U> f = new CompletableFuture<U>();
153 executor.execute(new AsyncSupplier<U>(supplier, f));
154 return f;
155 }
156
157 /**
158 * Asynchronously executes in the {@link
159 * ForkJoinPool#commonPool()} a task that runs the given action,
160 * and then completes the returned CompletableFuture.
161 *
162 * @param runnable the action to run before completing the
163 * returned CompletableFuture
164 * @return the CompletableFuture
165 */
166 public static CompletableFuture<Void> runAsync(Runnable runnable) {
167 if (runnable == null) throw new NullPointerException();
168 CompletableFuture<Void> f = new CompletableFuture<Void>();
169 ForkJoinPool.commonPool().
170 execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
171 return f;
172 }
173
174 /**
175 * Asynchronously executes using the given executor, a task that
176 * runs the given action, and then completes the returned
177 * CompletableFuture.
178 *
179 * @param runnable the action to run before completing the
180 * returned CompletableFuture
181 * @param executor the executor to use for asynchronous execution
182 * @return the CompletableFuture
183 */
184 public static CompletableFuture<Void> runAsync(Runnable runnable,
185 Executor executor) {
186 if (executor == null || runnable == null)
187 throw new NullPointerException();
188 CompletableFuture<Void> f = new CompletableFuture<Void>();
189 executor.execute(new AsyncRunnable(runnable, f));
190 return f;
191 }
192
193 /**
194 * Returns {@code true} if completed in any fashion: normally,
195 * exceptionally, or via cancellation.
196 *
197 * @return {@code true} if completed
198 */
199 public boolean isDone() {
200 return result != null;
201 }
202
203 /**
204 * Waits if necessary for the computation to complete, and then
205 * retrieves its result.
206 *
207 * @return the computed result
208 * @throws CancellationException if the computation was cancelled
209 * @throws ExecutionException if the computation threw an
210 * exception
211 * @throws InterruptedException if the current thread was interrupted
212 * while waiting
213 */
214 public T get() throws InterruptedException, ExecutionException {
215 Object r; Throwable ex;
216 if ((r = result) == null && (r = waitingGet(true)) == null)
217 throw new InterruptedException();
218 if (r instanceof AltResult) {
219 if ((ex = ((AltResult)r).ex) != null) {
220 if (ex instanceof CancellationException)
221 throw (CancellationException)ex;
222 if (ex instanceof CompletionException) {
223 Throwable cause = ex.getCause();
224 if (cause != null)
225 ex = cause;
226 }
227 throw new ExecutionException(ex);
228 }
229 return null;
230 }
231 return (T)r;
232 }
233
234 /**
235 * Returns the result value when complete, or throws an
236 * (unchecked) exception if completed exceptionally. To better
237 * conform with the use of common functional forms, if a
238 * computation involved in the completion of this
239 * CompletableFuture threw an exception, this method throws an
240 * (unchecked) {@link CompletionException} with the
241 * underlying exception as its cause. Additionally, this method
242 * does not abort on interrupt.
243 *
244 * @return the result value
245 * @throws CancellationException if the computation was cancelled
246 * @throws CompletionException if the computation threw an
247 * exception
248 */
249 public T getValue() {
250 Object r; Throwable ex;
251 if ((r = result) == null)
252 r = waitingGet(false);
253 if (r instanceof AltResult) {
254 if ((ex = ((AltResult)r).ex) != null) {
255 if (ex instanceof CancellationException)
256 throw (CancellationException)ex;
257 if (ex instanceof CompletionException)
258 throw (CompletionException)ex;
259 throw new CompletionException(ex);
260 }
261 return null;
262 }
263 return (T)r;
264 }
265
266 /**
267 * Returns the result value (or throws any encountered exception)
268 * if completed, else returns the given valueIfAbsent.
269 *
270 * @param valueIfAbsent the value to return if not completed
271 * @return the result value, if completed, else the given valueIfAbsent
272 * @throws CancellationException if the computation was cancelled
273 * @throws CompletionException if the computation threw an
274 * exception
275 */
276 public T getNow(T valueIfAbsent) {
277 Object r; Throwable ex;
278 if ((r = result) == null)
279 return valueIfAbsent;
280 if (r instanceof AltResult) {
281 if ((ex = ((AltResult)r).ex) != null) {
282 if (ex instanceof CancellationException)
283 throw (CancellationException)ex;
284 if (ex instanceof CompletionException)
285 throw (CompletionException)ex;
286 throw new CompletionException(ex);
287 }
288 return null;
289 }
290 return (T)r;
291 }
292
293 /**
294 * Waits if necessary for at most the given time for completion,
295 * and then retrieves its result, if available.
296 *
297 * @param timeout the maximum time to wait
298 * @param unit the time unit of the timeout argument
299 * @return the computed result
300 * @throws CancellationException if the computation was cancelled
301 * @throws ExecutionException if the computation threw an
302 * exception
303 * @throws InterruptedException if the current thread was interrupted
304 * while waiting
305 * @throws TimeoutException if the wait timed out
306 */
307 public T get(long timeout, TimeUnit unit)
308 throws InterruptedException, ExecutionException, TimeoutException {
309 Object r; Throwable ex;
310 long nanos = unit.toNanos(timeout);
311 if (Thread.interrupted())
312 throw new InterruptedException();
313 if ((r = result) == null)
314 r = timedAwaitDone(nanos);
315 if (r instanceof AltResult) {
316 if ((ex = ((AltResult)r).ex) != null) {
317 if (ex instanceof CancellationException)
318 throw (CancellationException)ex;
319 if (ex instanceof CompletionException) {
320 Throwable cause = ex.getCause();
321 if (cause != null)
322 ex = cause;
323 }
324 throw new ExecutionException(ex);
325 }
326 return null;
327 }
328 return (T)r;
329 }
330
331 /**
332 * If not already completed, sets the value returned by {@link
333 * #get()} and related methods to the given value.
334 *
335 * @param value the result value
336 * @return true if this invocation caused this CompletableFuture
337 * to transition to a completed state, else false
338 */
339 public boolean complete(T value) {
340 if (result == null &&
341 UNSAFE.compareAndSwapObject(this, RESULT, null,
342 (value == null) ? NIL : value)) {
343 postComplete();
344 return true;
345 }
346 return false;
347 }
348
349 /**
350 * Internal version of complete; CASes in an existing result or
351 * AltResult.
352 */
353 private void propagateCompletion(Object r) {
354 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r))
355 postComplete();
356 }
357
358 /**
359 * If not already completed, causes invocations of {@link #get()}
360 * and related methods to throw the given exception.
361 *
362 * @param ex the exception
363 * @return true if this invocation caused this CompletableFuture
364 * to transition to a completed state, else false
365 */
366 public boolean completeExceptionally(Throwable ex) {
367 if (ex == null) throw new NullPointerException();
368 if (result == null) {
369 AltResult r = new AltResult(ex);
370 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
371 postComplete();
372 return true;
373 }
374 }
375 return false;
376 }
377
378 /**
379 * Internal version of completeExceptionally that avoids creating
380 * chains of CompletionExceptions
381 */
382 private void propagateException(Throwable ex) {
383 if (result == null) {
384 AltResult r = new AltResult
385 ((ex instanceof CompletionException) ? ex :
386 new CompletionException(ex));
387 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r))
388 postComplete();
389 }
390 }
391
392 /**
393 * Creates and returns a CompletableFuture that is completed with
394 * the result of the given function of this CompletableFuture.
395 * If this CompletableFuture completes exceptionally,
396 * then the returned CompletableFuture also does so,
397 * with a CompletionException holding this exception as
398 * its cause.
399 *
400 * @param fn the function to use to compute the value of
401 * the returned CompletableFuture
402 * @return the new CompletableFuture
403 */
404 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
405 return thenFunction(fn, null);
406 }
407
408 /**
409 * Creates and returns a CompletableFuture that is asynchronously
410 * completed using the {@link ForkJoinPool#commonPool()} with the
411 * result of the given function of this CompletableFuture. If
412 * this CompletableFuture completes exceptionally, then the
413 * returned CompletableFuture also does so, with a
414 * CompletionException holding this exception as its cause.
415 *
416 * @param fn the function to use to compute the value of
417 * the returned CompletableFuture
418 * @return the new CompletableFuture
419 */
420 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
421 return thenFunction(fn, ForkJoinPool.commonPool());
422 }
423
424 /**
425 * Creates and returns a CompletableFuture that is asynchronously
426 * completed using the given executor with the result of the given
427 * function of this CompletableFuture. If this CompletableFuture
428 * completes exceptionally, then the returned CompletableFuture
429 * also does so, with a CompletionException holding this exception as
430 * its cause.
431 *
432 * @param fn the function to use to compute the value of
433 * the returned CompletableFuture
434 * @param executor the executor to use for asynchronous execution
435 * @return the new CompletableFuture
436 */
437 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
438 Executor executor) {
439 if (executor == null) throw new NullPointerException();
440 return thenFunction(fn, executor);
441 }
442
443 /**
444 * Creates and returns a CompletableFuture that is completed after
445 * performing the given action with this CompletableFuture's
446 * result when it completes. If this CompletableFuture
447 * completes exceptionally, then the returned CompletableFuture
448 * also does so, with a CompletionException holding this exception as
449 * its cause.
450 *
451 * @param block the action to perform before completing the
452 * returned CompletableFuture
453 * @return the new CompletableFuture
454 */
455 public CompletableFuture<Void> thenAccept(Block<? super T> block) {
456 return thenBlock(block, null);
457 }
458
459 /**
460 * Creates and returns a CompletableFuture that is asynchronously
461 * completed using the {@link ForkJoinPool#commonPool()} with this
462 * CompletableFuture's result when it completes. If this
463 * CompletableFuture completes exceptionally, then the returned
464 * CompletableFuture also does so, with a CompletionException holding
465 * this exception as its cause.
466 *
467 * @param block the action to perform before completing the
468 * returned CompletableFuture
469 * @return the new CompletableFuture
470 */
471 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block) {
472 return thenBlock(block, ForkJoinPool.commonPool());
473 }
474
475 /**
476 * Creates and returns a CompletableFuture that is asynchronously
477 * completed using the given executor with this
478 * CompletableFuture's result when it completes. If this
479 * CompletableFuture completes exceptionally, then the returned
480 * CompletableFuture also does so, with a CompletionException holding
481 * this exception as its cause.
482 *
483 * @param block the action to perform before completing the
484 * returned CompletableFuture
485 * @param executor the executor to use for asynchronous execution
486 * @return the new CompletableFuture
487 */
488 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block,
489 Executor executor) {
490 if (executor == null) throw new NullPointerException();
491 return thenBlock(block, executor);
492 }
493
494 /**
495 * Creates and returns a CompletableFuture that is completed after
496 * performing the given action when this CompletableFuture
497 * completes. If this CompletableFuture completes exceptionally,
498 * then the returned CompletableFuture also does so, with a
499 * CompletionException holding this exception as its cause.
500 *
501 * @param action the action to perform before completing the
502 * returned CompletableFuture
503 * @return the new CompletableFuture
504 */
505 public CompletableFuture<Void> thenRun(Runnable action) {
506 return thenRunnable(action, null);
507 }
508
509 /**
510 * Creates and returns a CompletableFuture that is asynchronously
511 * completed using the {@link ForkJoinPool#commonPool()} after
512 * performing the given action when this CompletableFuture
513 * completes. If this CompletableFuture completes exceptionally,
514 * then the returned CompletableFuture also does so, with a
515 * CompletionException holding this exception as its cause.
516 *
517 * @param action the action to perform before completing the
518 * returned CompletableFuture
519 * @return the new CompletableFuture
520 */
521 public CompletableFuture<Void> thenRunAsync(Runnable action) {
522 return thenRunnable(action, ForkJoinPool.commonPool());
523 }
524
525 /**
526 * Creates and returns a CompletableFuture that is asynchronously
527 * completed using the given executor after performing the given
528 * action when this CompletableFuture completes. If this
529 * CompletableFuture completes exceptionally, then the returned
530 * CompletableFuture also does so, with a CompletionException holding
531 * this exception as its cause.
532 *
533 * @param action the action to perform before completing the
534 * returned CompletableFuture
535 * @param executor the executor to use for asynchronous execution
536 * @return the new CompletableFuture
537 */
538 public CompletableFuture<Void> thenRunAsync(Runnable action,
539 Executor executor) {
540 if (executor == null) throw new NullPointerException();
541 return thenRunnable(action, executor);
542 }
543
544 /**
545 * Creates and returns a CompletableFuture that is completed with
546 * the result of the given function of this and the other given
547 * CompletableFuture's results when both complete. If this or
548 * the other CompletableFuture complete exceptionally, then the
549 * returned CompletableFuture also does so, with a
550 * CompletionException holding the exception as its cause.
551 *
552 * @param other the other CompletableFuture
553 * @param fn the function to use to compute the value of
554 * the returned CompletableFuture
555 * @return the new CompletableFuture
556 */
557 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
558 BiFunction<? super T,? super U,? extends V> fn) {
559 return andFunction(other, fn, null);
560 }
561
562 /**
563 * Creates and returns a CompletableFuture that is asynchronously
564 * completed using the {@link ForkJoinPool#commonPool()} with
565 * the result of the given function of this and the other given
566 * CompletableFuture's results when both complete. If this or
567 * the other CompletableFuture complete exceptionally, then the
568 * returned CompletableFuture also does so, with a
569 * CompletionException holding the exception as its cause.
570 *
571 * @param other the other CompletableFuture
572 * @param fn the function to use to compute the value of
573 * the returned CompletableFuture
574 * @return the new CompletableFuture
575 */
576 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
577 BiFunction<? super T,? super U,? extends V> fn) {
578 return andFunction(other, fn, ForkJoinPool.commonPool());
579 }
580
581 /**
582 * Creates and returns a CompletableFuture that is
583 * asynchronously completed using the given executor with the
584 * result of the given function of this and the other given
585 * CompletableFuture's results when both complete. If this or
586 * the other CompletableFuture complete exceptionally, then the
587 * returned CompletableFuture also does so, with a
588 * CompletionException holding the exception as its cause.
589 *
590 * @param other the other CompletableFuture
591 * @param fn the function to use to compute the value of
592 * the returned CompletableFuture
593 * @param executor the executor to use for asynchronous execution
594 * @return the new CompletableFuture
595 */
596
597 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
598 BiFunction<? super T,? super U,? extends V> fn,
599 Executor executor) {
600 if (executor == null) throw new NullPointerException();
601 return andFunction(other, fn, executor);
602 }
603
604 /**
605 * Creates and returns a CompletableFuture that is completed with
606 * the results of this and the other given CompletableFuture if
607 * both complete. If this and/or the other CompletableFuture
608 * complete exceptionally, then the returned CompletableFuture
609 * also does so, with a CompletionException holding one of these
610 * exceptions as its cause.
611 *
612 * @param other the other CompletableFuture
613 * @param block the action to perform before completing the
614 * returned CompletableFuture
615 * @return the new CompletableFuture
616 */
617 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
618 BiBlock<? super T, ? super U> block) {
619 return andBlock(other, block, null);
620 }
621
622 /**
623 * Creates and returns a CompletableFuture that is completed
624 * asynchronously using the {@link ForkJoinPool#commonPool()} with
625 * the results of this and the other given CompletableFuture when
626 * both complete. If this and/or the other CompletableFuture
627 * complete exceptionally, then the returned CompletableFuture
628 * also does so, with a CompletionException holding one of these
629 * exceptions as its cause.
630 *
631 * @param other the other CompletableFuture
632 * @param block the action to perform before completing the
633 * returned CompletableFuture
634 * @return the new CompletableFuture
635 */
636 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
637 BiBlock<? super T, ? super U> block) {
638 return andBlock(other, block, ForkJoinPool.commonPool());
639 }
640
641 /**
642 * Creates and returns a CompletableFuture that is completed
643 * asynchronously using the given executor with the results of
644 * this and the other given CompletableFuture when both complete.
645 * If this and/or the other CompletableFuture complete
646 * exceptionally, then the returned CompletableFuture also does
647 * so, with a CompletionException holding one of these exceptions as
648 * its cause.
649 *
650 * @param other the other CompletableFuture
651 * @param block the action to perform before completing the
652 * returned CompletableFuture
653 * @param executor the executor to use for asynchronous execution
654 * @return the new CompletableFuture
655 */
656 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
657 BiBlock<? super T, ? super U> block,
658 Executor executor) {
659 if (executor == null) throw new NullPointerException();
660 return andBlock(other, block, executor);
661 }
662
663 /**
664 * Creates and returns a CompletableFuture that is completed
665 * when this and the other given CompletableFuture both
666 * complete. If this and/or the other CompletableFuture complete
667 * exceptionally, then the returned CompletableFuture also does
668 * so, with a CompletionException holding one of these exceptions as
669 * its cause.
670 *
671 * @param other the other CompletableFuture
672 * @param action the action to perform before completing the
673 * returned CompletableFuture
674 * @return the new CompletableFuture
675 */
676 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
677 Runnable action) {
678 return andRunnable(other, action, null);
679 }
680
681 /**
682 * Creates and returns a CompletableFuture that is completed
683 * asynchronously using the {@link ForkJoinPool#commonPool()}
684 * when this and the other given CompletableFuture both
685 * complete. If this and/or the other CompletableFuture complete
686 * exceptionally, then the returned CompletableFuture also does
687 * so, with a CompletionException holding one of these exceptions as
688 * its cause.
689 *
690 * @param other the other CompletableFuture
691 * @param action the action to perform before completing the
692 * returned CompletableFuture
693 * @return the new CompletableFuture
694 */
695 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
696 Runnable action) {
697 return andRunnable(other, action, ForkJoinPool.commonPool());
698 }
699
700 /**
701 * Creates and returns a CompletableFuture that is completed
702 * asynchronously using the given executor
703 * when this and the other given CompletableFuture both
704 * complete. If this and/or the other CompletableFuture complete
705 * exceptionally, then the returned CompletableFuture also does
706 * so, with a CompletionException holding one of these exceptions as
707 * its cause.
708 *
709 * @param other the other CompletableFuture
710 * @param action the action to perform before completing the
711 * returned CompletableFuture
712 * @param executor the executor to use for asynchronous execution
713 * @return the new CompletableFuture
714 */
715 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
716 Runnable action,
717 Executor executor) {
718 if (executor == null) throw new NullPointerException();
719 return andRunnable(other, action, executor);
720 }
721
722 /**
723 * Creates and returns a CompletableFuture that is completed with
724 * the result of the given function of either this or the other
725 * given CompletableFuture's results when either complete. If
726 * this and/or the other CompletableFuture complete exceptionally,
727 * then the returned CompletableFuture may also do so, with a
728 * CompletionException holding one of these exceptions as its cause.
729 * No guarantees are made about which result or exception is used
730 * in the returned CompletableFuture.
731 *
732 * @param other the other CompletableFuture
733 * @param fn the function to use to compute the value of
734 * the returned CompletableFuture
735 * @return the new CompletableFuture
736 */
737 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
738 Function<? super T, U> fn) {
739 return orFunction(other, fn, null);
740 }
741
742 /**
743 * Creates and returns a CompletableFuture that is completed
744 * asynchronously using the {@link ForkJoinPool#commonPool()} with
745 * the result of the given function of either this or the other
746 * given CompletableFuture's results when either complete. If
747 * this and/or the other CompletableFuture complete exceptionally,
748 * then the returned CompletableFuture may also do so, with a
749 * CompletionException holding one of these exceptions as its cause.
750 * No guarantees are made about which result or exception is used
751 * in the returned CompletableFuture.
752 *
753 * @param other the other CompletableFuture
754 * @param fn the function to use to compute the value of
755 * the returned CompletableFuture
756 * @return the new CompletableFuture
757 */
758 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
759 Function<? super T, U> fn) {
760 return orFunction(other, fn, ForkJoinPool.commonPool());
761 }
762
763 /**
764 * Creates and returns a CompletableFuture that is completed
765 * asynchronously using the given executor with the result of the
766 * given function of either this or the other given
767 * CompletableFuture's results when either complete. If this
768 * and/or the other CompletableFuture complete exceptionally, then
769 * the returned CompletableFuture may also do so, with a
770 * CompletionException holding one of these exceptions as its cause.
771 * No guarantees are made about which result or exception is used
772 * in the returned CompletableFuture.
773 *
774 * @param other the other CompletableFuture
775 * @param fn the function to use to compute the value of
776 * the returned CompletableFuture
777 * @param executor the executor to use for asynchronous execution
778 * @return the new CompletableFuture
779 */
780 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
781 Function<? super T, U> fn,
782 Executor executor) {
783 if (executor == null) throw new NullPointerException();
784 return orFunction(other, fn, executor);
785 }
786
787 /**
788 * Creates and returns a CompletableFuture that is completed after
789 * performing the given action with the result of either this or the
790 * other given CompletableFuture's result, when either complete.
791 * If this and/or the other CompletableFuture complete
792 * exceptionally, then the returned CompletableFuture may also do
793 * so, with a CompletionException holding one of these exceptions as
794 * its cause. No guarantees are made about which exception is
795 * used in the returned CompletableFuture.
796 *
797 * @param other the other CompletableFuture
798 * @param block the action to perform before completing the
799 * returned CompletableFuture
800 * @return the new CompletableFuture
801 */
802 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
803 Block<? super T> block) {
804 return orBlock(other, block, null);
805 }
806
807 /**
808 * Creates and returns a CompletableFuture that is completed
809 * asynchronously using the {@link ForkJoinPool#commonPool()},
810 * performing the given action with the result of either this or
811 * the other given CompletableFuture's result, when either
812 * complete. If this and/or the other CompletableFuture complete
813 * exceptionally, then the returned CompletableFuture may also do
814 * so, with a CompletionException holding one of these exceptions as
815 * its cause. No guarantees are made about which exception is
816 * used in the returned CompletableFuture.
817 *
818 * @param other the other CompletableFuture
819 * @param block the action to perform before completing the
820 * returned CompletableFuture
821 * @return the new CompletableFuture
822 */
823 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
824 Block<? super T> block) {
825 return orBlock(other, block, ForkJoinPool.commonPool());
826 }
827
828 /**
829 * Creates and returns a CompletableFuture that is completed
830 * asynchronously using the given executor,
831 * performing the given action with the result of either this or
832 * the other given CompletableFuture's result, when either
833 * complete. If this and/or the other CompletableFuture complete
834 * exceptionally, then the returned CompletableFuture may also do
835 * so, with a CompletionException holding one of these exceptions as
836 * its cause. No guarantees are made about which exception is
837 * used in the returned CompletableFuture.
838 *
839 * @param other the other CompletableFuture
840 * @param block the action to perform before completing the
841 * returned CompletableFuture
842 * @param executor the executor to use for asynchronous execution
843 * @return the new CompletableFuture
844 */
845 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
846 Block<? super T> block,
847 Executor executor) {
848 if (executor == null) throw new NullPointerException();
849 return orBlock(other, block, executor);
850 }
851
852 /**
853 * Creates and returns a CompletableFuture that is completed
854 * after this or the other given CompletableFuture complete. If
855 * this and/or the other CompletableFuture complete exceptionally,
856 * then the returned CompletableFuture may also do so, with a
857 * CompletionException holding one of these exceptions as its cause.
858 * No guarantees are made about which exception is used in the
859 * returned CompletableFuture.
860 *
861 * @param other the other CompletableFuture
862 * @param action the action to perform before completing the
863 * returned CompletableFuture
864 * @return the new CompletableFuture
865 */
866 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
867 Runnable action) {
868 return orRunnable(other, action, null);
869 }
870
871 /**
872 * Creates and returns a CompletableFuture that is completed
873 * asynchronously using the {@link ForkJoinPool#commonPool()}
874 * after this or the other given CompletableFuture complete. If
875 * this and/or the other CompletableFuture complete exceptionally,
876 * then the returned CompletableFuture may also do so, with a
877 * CompletionException holding one of these exceptions as its cause.
878 * No guarantees are made about which exception is used in the
879 * returned CompletableFuture.
880 *
881 * @param other the other CompletableFuture
882 * @param action the action to perform before completing the
883 * returned CompletableFuture
884 * @return the new CompletableFuture
885 */
886 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
887 Runnable action) {
888 return orRunnable(other, action, ForkJoinPool.commonPool());
889 }
890
891 /**
892 * Creates and returns a CompletableFuture that is completed
893 * asynchronously using the given executor after this or the other
894 * given CompletableFuture complete. If this and/or the other
895 * CompletableFuture complete exceptionally, then the returned
896 * CompletableFuture may also do so, with a CompletionException
897 * holding one of these exceptions as its cause. No guarantees are
898 * made about which exception is used in the returned
899 * CompletableFuture.
900 *
901 * @param other the other CompletableFuture
902 * @param action the action to perform before completing the
903 * returned CompletableFuture
904 * @param executor the executor to use for asynchronous execution
905 * @return the new CompletableFuture
906 */
907 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
908 Runnable action,
909 Executor executor) {
910 if (executor == null) throw new NullPointerException();
911 return orRunnable(other, action, executor);
912 }
913
914 /**
915 * Returns a CompletableFuture equal or equivalent to that
916 * produced by the given function of the result of this
917 * CompletableFuture when completed. If this CompletableFuture
918 * completes exceptionally, then the returned CompletableFuture
919 * also does so, with a CompletionException holding this exception as
920 * its cause.
921 *
922 * @param fn the function returning a new CompletableFuture.
923 * @return the CompletableFuture, that {@code isDone()} upon
924 * return if completed by the given function, or an exception
925 * occurs.
926 */
927 public <U> CompletableFuture<U> thenCompose(Function<? super T,
928 CompletableFuture<U>> fn) {
929 if (fn == null) throw new NullPointerException();
930 CompletableFuture<U> dst = null;
931 ThenCompose<T,U> d = null;
932 Object r;
933 if ((r = result) == null) {
934 dst = new CompletableFuture<U>();
935 CompletionNode p = new CompletionNode
936 (d = new ThenCompose<T,U>(this, fn, dst));
937 while ((r = result) == null) {
938 if (UNSAFE.compareAndSwapObject
939 (this, COMPLETIONS, p.next = completions, p))
940 break;
941 }
942 }
943 if (r != null && (d == null || d.compareAndSet(0, 1))) {
944 T t; Throwable ex = null;
945 if (r instanceof AltResult) {
946 ex = ((AltResult)r).ex;
947 t = null;
948 }
949 else
950 t = (T) r;
951 if (ex == null) {
952 try {
953 dst = fn.apply(t);
954 } catch (Throwable rex) {
955 ex = rex;
956 }
957 }
958 if (dst == null) {
959 dst = new CompletableFuture<U>();
960 if (ex == null)
961 ex = new NullPointerException();
962 }
963 if (ex != null)
964 dst.propagateException(ex);
965 }
966 if (r != null || result != null)
967 postComplete();
968 return dst;
969 }
970
971 /**
972 * Creates and returns a CompletableFuture that is completed with
973 * the result of the given function of the exception triggering
974 * this CompletableFuture's completion when it completes
975 * exceptionally; Otherwise, if this CompletableFuture completes
976 * normally, then the returned CompletableFuture also completes
977 * normally with the same value.
978 *
979 * @param fn the function to use to compute the value of the
980 * returned CompletableFuture if this CompletableFuture completed
981 * exceptionally
982 * @return the new CompletableFuture
983 */
984 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
985 if (fn == null) throw new NullPointerException();
986 CompletableFuture<T> dst = new CompletableFuture<T>();
987 ExceptionAction<T> d = null;
988 Object r;
989 if ((r = result) == null) {
990 CompletionNode p =
991 new CompletionNode(d = new ExceptionAction<T>(this, fn, dst));
992 while ((r = result) == null) {
993 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
994 p.next = completions, p))
995 break;
996 }
997 }
998 if (r != null && (d == null || d.compareAndSet(0, 1))) {
999 T t; Throwable ex = null;
1000 if (r instanceof AltResult) {
1001 if ((ex = ((AltResult)r).ex) != null) {
1002 try {
1003 dst.complete(fn.apply(ex));
1004 } catch (Throwable rex) {
1005 dst.completeExceptionally(rex);
1006 }
1007 }
1008 t = null;
1009 }
1010 else
1011 t = (T) r;
1012 if (ex == null)
1013 dst.complete(t);
1014 }
1015 if (r != null || result != null)
1016 postComplete();
1017 return dst;
1018 }
1019
1020 /**
1021 * Creates and returns a CompletableFuture that is completed with
1022 * the result of the given function of the result and exception of
1023 * this CompletableFuture's completion when it completes. The
1024 * given function is invoked with the result (or {@code null} if
1025 * none) and the exception (or {@code null} if none) of this
1026 * CompletableFuture when complete.
1027 *
1028 * @param fn the function to use to compute the value of the
1029 * returned CompletableFuture
1030
1031 * @return the new CompletableFuture
1032 */
1033 public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
1034 if (fn == null) throw new NullPointerException();
1035 CompletableFuture<U> dst = new CompletableFuture<U>();
1036 ThenHandle<T,U> d = null;
1037 Object r;
1038 if ((r = result) == null) {
1039 CompletionNode p =
1040 new CompletionNode(d = new ThenHandle<T,U>(this, fn, dst));
1041 while ((r = result) == null) {
1042 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1043 p.next = completions, p))
1044 break;
1045 }
1046 }
1047 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1048 T t; Throwable ex;
1049 if (r instanceof AltResult) {
1050 ex = ((AltResult)r).ex;
1051 t = null;
1052 }
1053 else {
1054 ex = null;
1055 t = (T) r;
1056 }
1057 try {
1058 dst.complete(fn.apply(t, ex));
1059 } catch (Throwable rex) {
1060 dst.completeExceptionally(rex);
1061 }
1062 }
1063 if (r != null || result != null)
1064 postComplete();
1065 return dst;
1066 }
1067
1068 /**
1069 * Attempts to complete this CompletableFuture with
1070 * a {@link CancellationException}.
1071 *
1072 * @param mayInterruptIfRunning this value has no effect in this
1073 * implementation because interrupts are not used to control
1074 * processing.
1075 *
1076 * @return {@code true} if this task is now cancelled
1077 */
1078 public boolean cancel(boolean mayInterruptIfRunning) {
1079 Object r;
1080 while ((r = result) == null) {
1081 r = new AltResult(new CancellationException());
1082 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
1083 postComplete();
1084 return true;
1085 }
1086 }
1087 return ((r instanceof AltResult) &&
1088 (((AltResult)r).ex instanceof CancellationException));
1089 }
1090
1091 /**
1092 * Returns {@code true} if this CompletableFuture was cancelled
1093 * before it completed normally.
1094 *
1095 * @return {@code true} if this CompletableFuture was cancelled
1096 * before it completed normally
1097 */
1098 public boolean isCancelled() {
1099 Object r;
1100 return ((r = result) != null &&
1101 (r instanceof AltResult) &&
1102 (((AltResult)r).ex instanceof CancellationException));
1103 }
1104
1105 /**
1106 * Forcibly sets or resets the value subsequently returned by
1107 * method get() and related methods, whether or not already
1108 * completed. This method is designed for use only in error
1109 * recovery actions, and even in such situations may result in
1110 * ongoing dependent completions using established versus
1111 * overwritten values.
1112 *
1113 * @param value the completion value
1114 */
1115 public void obtrudeValue(T value) {
1116 result = (value == null) ? NIL : value;
1117 postComplete();
1118 }
1119
1120 /**
1121 * Removes and signals all waiting threads and runs all completions.
1122 */
1123 private void postComplete() {
1124 WaitNode q; Thread t;
1125 while ((q = waiters) != null) {
1126 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
1127 (t = q.thread) != null) {
1128 q.thread = null;
1129 LockSupport.unpark(t);
1130 }
1131 }
1132
1133 CompletionNode h; Completion c;
1134 while ((h = completions) != null) {
1135 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
1136 (c = h.completion) != null)
1137 c.run();
1138 }
1139 }
1140
1141 /* ------------- waiting for completions -------------- */
1142
1143 /**
1144 * Heuristic spin value for waitingGet() before blocking on
1145 * multiprocessors
1146 */
1147 static final int WAITING_GET_SPINS = 256;
1148
1149 /**
1150 * Returns raw result after waiting, or null if interruptible and
1151 * interrupted.
1152 */
1153 private Object waitingGet(boolean interruptible) {
1154 WaitNode q = null;
1155 boolean queued = false, interrupted = false;
1156 int h = 0, spins = 0;
1157 for (Object r;;) {
1158 if ((r = result) != null) {
1159 Throwable ex;
1160 if (q != null) // suppress unpark
1161 q.thread = null;
1162 postComplete(); // help release others
1163 if (interrupted)
1164 Thread.currentThread().interrupt();
1165 return r;
1166 }
1167 else if (h == 0) {
1168 h = ThreadLocalRandom.current().nextInt();
1169 if (Runtime.getRuntime().availableProcessors() > 1)
1170 spins = WAITING_GET_SPINS;
1171 }
1172 else if (spins > 0) {
1173 h ^= h << 1; // xorshift
1174 h ^= h >>> 3;
1175 if ((h ^= h << 10) >= 0)
1176 --spins;
1177 }
1178 else if (q == null)
1179 q = new WaitNode();
1180 else if (!queued)
1181 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1182 q.next = waiters, q);
1183 else if (Thread.interrupted()) {
1184 if (interruptible)
1185 return null;
1186 interrupted = true;
1187 }
1188 else if (q.thread == null)
1189 q.thread = Thread.currentThread();
1190 else
1191 LockSupport.park(this);
1192 }
1193 }
1194
1195 /**
1196 * Awaits completion or aborts on interrupt or timeout.
1197 *
1198 * @param nanos time to wait
1199 * @return raw result
1200 */
1201 private Object timedAwaitDone(long nanos)
1202 throws InterruptedException, TimeoutException {
1203 final long deadline = System.nanoTime() + nanos;
1204 WaitNode q = null;
1205 boolean queued = false;
1206 for (Object r;;) {
1207 if (Thread.interrupted()) {
1208 removeWaiter(q);
1209 throw new InterruptedException();
1210 }
1211 else if ((r = result) != null) {
1212 if (q != null)
1213 q.thread = null;
1214 postComplete();
1215 return r;
1216 }
1217 else if (q == null)
1218 q = new WaitNode();
1219 else if (!queued)
1220 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1221 q.next = waiters, q);
1222 else if ((nanos = deadline - System.nanoTime()) <= 0L) {
1223 removeWaiter(q);
1224 throw new TimeoutException();
1225 }
1226 else if (q.thread == null)
1227 q.thread = Thread.currentThread();
1228 else
1229 LockSupport.parkNanos(this, nanos);
1230 }
1231 }
1232
1233 /**
1234 * Tries to unlink a timed-out or interrupted wait node to avoid
1235 * accumulating garbage. Internal nodes are simply unspliced
1236 * without CAS since it is harmless if they are traversed anyway
1237 * by releasers. To avoid effects of unsplicing from already
1238 * removed nodes, the list is retraversed in case of an apparent
1239 * race. This is slow when there are a lot of nodes, but we don't
1240 * expect lists to be long enough to outweigh higher-overhead
1241 * schemes.
1242 */
1243 private void removeWaiter(WaitNode node) {
1244 if (node != null) {
1245 node.thread = null;
1246 retry:
1247 for (;;) { // restart on removeWaiter race
1248 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
1249 s = q.next;
1250 if (q.thread != null)
1251 pred = q;
1252 else if (pred != null) {
1253 pred.next = s;
1254 if (pred.thread == null) // check for race
1255 continue retry;
1256 }
1257 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
1258 continue retry;
1259 }
1260 break;
1261 }
1262 }
1263 }
1264
1265 /* ------------- Async tasks -------------- */
1266
1267 /** Base class can act as either FJ or plain Runnable */
1268 static abstract class Async extends ForkJoinTask<Void> implements Runnable {
1269 public final Void getRawResult() { return null; }
1270 public final void setRawResult(Void v) { }
1271 public final void run() { exec(); }
1272 }
1273
1274 static final class AsyncRunnable extends Async {
1275 final Runnable runnable;
1276 final CompletableFuture<Void> dst;
1277 AsyncRunnable(Runnable runnable, CompletableFuture<Void> dst) {
1278 this.runnable = runnable; this.dst = dst;
1279 }
1280 public final boolean exec() {
1281 Runnable fn;
1282 CompletableFuture<Void> d;
1283 if ((fn = this.runnable) == null || (d = this.dst) == null)
1284 throw new NullPointerException();
1285 try {
1286 fn.run();
1287 d.complete(null);
1288 } catch (Throwable ex) {
1289 d.completeExceptionally(ex);
1290 }
1291 return true;
1292 }
1293 private static final long serialVersionUID = 5232453952276885070L;
1294 }
1295
1296 static final class AsyncSupplier<U> extends Async {
1297 final Supplier<U> supplier;
1298 final CompletableFuture<U> dst;
1299 AsyncSupplier(Supplier<U> supplier, CompletableFuture<U> dst) {
1300 this.supplier = supplier; this.dst = dst;
1301 }
1302 public final boolean exec() {
1303 Supplier<U> fn;
1304 CompletableFuture<U> d;
1305 if ((fn = this.supplier) == null || (d = this.dst) == null)
1306 throw new NullPointerException();
1307 try {
1308 d.complete(fn.get());
1309 } catch (Throwable ex) {
1310 d.completeExceptionally(ex);
1311 }
1312 return true;
1313 }
1314 private static final long serialVersionUID = 5232453952276885070L;
1315 }
1316
1317 static final class AsyncFunction<T,U> extends Async {
1318 Function<? super T,? extends U> fn;
1319 T arg;
1320 final CompletableFuture<U> dst;
1321 AsyncFunction(T arg, Function<? super T,? extends U> fn,
1322 CompletableFuture<U> dst) {
1323 this.arg = arg; this.fn = fn; this.dst = dst;
1324 }
1325 public final boolean exec() {
1326 Function<? super T,? extends U> fn;
1327 CompletableFuture<U> d;
1328 if ((fn = this.fn) == null || (d = this.dst) == null)
1329 throw new NullPointerException();
1330 try {
1331 d.complete(fn.apply(arg));
1332 } catch (Throwable ex) {
1333 d.completeExceptionally(ex);
1334 }
1335 return true;
1336 }
1337 private static final long serialVersionUID = 5232453952276885070L;
1338 }
1339
1340 static final class AsyncBiFunction<T,U,V> extends Async {
1341 final BiFunction<? super T,? super U,? extends V> fn;
1342 final T arg1;
1343 final U arg2;
1344 final CompletableFuture<V> dst;
1345 AsyncBiFunction(T arg1, U arg2,
1346 BiFunction<? super T,? super U,? extends V> fn,
1347 CompletableFuture<V> dst) {
1348 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1349 }
1350 public final boolean exec() {
1351 BiFunction<? super T,? super U,? extends V> fn;
1352 CompletableFuture<V> d;
1353 if ((fn = this.fn) == null || (d = this.dst) == null)
1354 throw new NullPointerException();
1355 try {
1356 d.complete(fn.apply(arg1, arg2));
1357 } catch (Throwable ex) {
1358 d.completeExceptionally(ex);
1359 }
1360 return true;
1361 }
1362 private static final long serialVersionUID = 5232453952276885070L;
1363 }
1364
1365 static final class AsyncBlock<T> extends Async {
1366 Block<? super T> fn;
1367 T arg;
1368 final CompletableFuture<Void> dst;
1369 AsyncBlock(T arg, Block<? super T> fn,
1370 CompletableFuture<Void> dst) {
1371 this.arg = arg; this.fn = fn; this.dst = dst;
1372 }
1373 public final boolean exec() {
1374 Block<? super T> fn;
1375 CompletableFuture<Void> d;
1376 if ((fn = this.fn) == null || (d = this.dst) == null)
1377 throw new NullPointerException();
1378 try {
1379 fn.accept(arg);
1380 d.complete(null);
1381 } catch (Throwable ex) {
1382 d.completeExceptionally(ex);
1383 }
1384 return true;
1385 }
1386 private static final long serialVersionUID = 5232453952276885070L;
1387 }
1388
1389 static final class AsyncBiBlock<T,U> extends Async {
1390 final BiBlock<? super T,? super U> fn;
1391 final T arg1;
1392 final U arg2;
1393 final CompletableFuture<Void> dst;
1394 AsyncBiBlock(T arg1, U arg2,
1395 BiBlock<? super T,? super U> fn,
1396 CompletableFuture<Void> dst) {
1397 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1398 }
1399 public final boolean exec() {
1400 BiBlock<? super T,? super U> fn;
1401 CompletableFuture<Void> d;
1402 if ((fn = this.fn) == null || (d = this.dst) == null)
1403 throw new NullPointerException();
1404 try {
1405 fn.accept(arg1, arg2);
1406 d.complete(null);
1407 } catch (Throwable ex) {
1408 d.completeExceptionally(ex);
1409 }
1410 return true;
1411 }
1412 private static final long serialVersionUID = 5232453952276885070L;
1413 }
1414
1415 /* ------------- Completions -------------- */
1416
1417 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1418 static abstract class Completion extends AtomicInteger implements Runnable {
1419 }
1420
1421 static final class ThenFunction<T,U> extends Completion {
1422 final CompletableFuture<? extends T> src;
1423 final Function<? super T,? extends U> fn;
1424 final CompletableFuture<U> dst;
1425 final Executor executor;
1426 ThenFunction(CompletableFuture<? extends T> src,
1427 final Function<? super T,? extends U> fn,
1428 final CompletableFuture<U> dst, Executor executor) {
1429 this.src = src; this.fn = fn; this.dst = dst;
1430 this.executor = executor;
1431 }
1432 public void run() {
1433 CompletableFuture<? extends T> a;
1434 Function<? super T,? extends U> fn;
1435 CompletableFuture<U> dst;
1436 Object r; T t; Throwable ex;
1437 if ((dst = this.dst) != null &&
1438 (fn = this.fn) != null &&
1439 (a = this.src) != null &&
1440 (r = a.result) != null &&
1441 compareAndSet(0, 1)) {
1442 if (r instanceof AltResult) {
1443 ex = ((AltResult)r).ex;
1444 t = null;
1445 }
1446 else {
1447 ex = null;
1448 t = (T) r;
1449 }
1450 if (ex == null) {
1451 try {
1452 if (executor != null)
1453 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
1454 else
1455 dst.complete(fn.apply(t));
1456 } catch (Throwable rex) {
1457 ex = rex;
1458 }
1459 }
1460 if (ex != null)
1461 dst.propagateException(ex);
1462 }
1463 }
1464 }
1465
1466 static final class ThenBlock<T> extends Completion {
1467 final CompletableFuture<? extends T> src;
1468 final Block<? super T> fn;
1469 final CompletableFuture<Void> dst;
1470 final Executor executor;
1471 ThenBlock(CompletableFuture<? extends T> src,
1472 final Block<? super T> fn,
1473 final CompletableFuture<Void> dst, Executor executor) {
1474 this.src = src; this.fn = fn; this.dst = dst;
1475 this.executor = executor;
1476 }
1477 public void run() {
1478 CompletableFuture<? extends T> a;
1479 Block<? super T> fn;
1480 CompletableFuture<Void> dst;
1481 Object r; T t; Throwable ex;
1482 if ((dst = this.dst) != null &&
1483 (fn = this.fn) != null &&
1484 (a = this.src) != null &&
1485 (r = a.result) != null &&
1486 compareAndSet(0, 1)) {
1487 if (r instanceof AltResult) {
1488 ex = ((AltResult)r).ex;
1489 t = null;
1490 }
1491 else {
1492 ex = null;
1493 t = (T) r;
1494 }
1495 if (ex == null) {
1496 try {
1497 if (executor != null)
1498 executor.execute(new AsyncBlock<T>(t, fn, dst));
1499 else {
1500 fn.accept(t);
1501 dst.complete(null);
1502 }
1503 } catch (Throwable rex) {
1504 ex = rex;
1505 }
1506 }
1507 if (ex != null)
1508 dst.propagateException(ex);
1509 }
1510 }
1511 }
1512
1513 static final class ThenRunnable<T> extends Completion {
1514 final CompletableFuture<? extends T> src;
1515 final Runnable fn;
1516 final CompletableFuture<Void> dst;
1517 final Executor executor;
1518 ThenRunnable(CompletableFuture<? extends T> src,
1519 Runnable fn,
1520 CompletableFuture<Void> dst,
1521 Executor executor) {
1522 this.src = src; this.fn = fn; this.dst = dst;
1523 this.executor = executor;
1524 }
1525 public void run() {
1526 CompletableFuture<? extends T> a;
1527 Runnable fn;
1528 CompletableFuture<Void> dst;
1529 Object r; Throwable ex;
1530 if ((dst = this.dst) != null &&
1531 (fn = this.fn) != null &&
1532 (a = this.src) != null &&
1533 (r = a.result) != null &&
1534 compareAndSet(0, 1)) {
1535 if (r instanceof AltResult)
1536 ex = ((AltResult)r).ex;
1537 else
1538 ex = null;
1539 if (ex == null) {
1540 try {
1541 if (executor != null)
1542 executor.execute(new AsyncRunnable(fn, dst));
1543 else {
1544 fn.run();
1545 dst.complete(null);
1546 }
1547 } catch (Throwable rex) {
1548 ex = rex;
1549 }
1550 }
1551 if (ex != null)
1552 dst.propagateException(ex);
1553 }
1554 }
1555 }
1556
1557 static final class AndFunction<T,U,V> extends Completion {
1558 final CompletableFuture<? extends T> src;
1559 final CompletableFuture<? extends U> snd;
1560 final BiFunction<? super T,? super U,? extends V> fn;
1561 final CompletableFuture<V> dst;
1562 final Executor executor;
1563 AndFunction(CompletableFuture<? extends T> src,
1564 CompletableFuture<? extends U> snd,
1565 BiFunction<? super T,? super U,? extends V> fn,
1566 CompletableFuture<V> dst, Executor executor) {
1567 this.src = src; this.snd = snd;
1568 this.fn = fn; this.dst = dst;
1569 this.executor = executor;
1570 }
1571 public void run() {
1572 Object r, s; T t; U u; Throwable ex;
1573 CompletableFuture<? extends T> a;
1574 CompletableFuture<? extends U> b;
1575 BiFunction<? super T,? super U,? extends V> fn;
1576 CompletableFuture<V> dst;
1577 if ((dst = this.dst) != null &&
1578 (fn = this.fn) != null &&
1579 (a = this.src) != null &&
1580 (r = a.result) != null &&
1581 (b = this.snd) != null &&
1582 (s = b.result) != null &&
1583 compareAndSet(0, 1)) {
1584 if (r instanceof AltResult) {
1585 ex = ((AltResult)r).ex;
1586 t = null;
1587 }
1588 else {
1589 ex = null;
1590 t = (T) r;
1591 }
1592 if (ex != null)
1593 u = null;
1594 else if (s instanceof AltResult) {
1595 ex = ((AltResult)s).ex;
1596 u = null;
1597 }
1598 else
1599 u = (U) s;
1600 if (ex == null) {
1601 try {
1602 if (executor != null)
1603 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1604 else
1605 dst.complete(fn.apply(t, u));
1606 } catch (Throwable rex) {
1607 ex = rex;
1608 }
1609 }
1610 if (ex != null)
1611 dst.propagateException(ex);
1612 }
1613 }
1614 }
1615
1616 static final class AndBlock<T,U> extends Completion {
1617 final CompletableFuture<? extends T> src;
1618 final CompletableFuture<? extends U> snd;
1619 final BiBlock<? super T,? super U> fn;
1620 final CompletableFuture<Void> dst;
1621 final Executor executor;
1622 AndBlock(CompletableFuture<? extends T> src,
1623 CompletableFuture<? extends U> snd,
1624 BiBlock<? super T,? super U> fn,
1625 CompletableFuture<Void> dst, Executor executor) {
1626 this.src = src; this.snd = snd;
1627 this.fn = fn; this.dst = dst;
1628 this.executor = executor;
1629 }
1630 public void run() {
1631 Object r, s; T t; U u; Throwable ex;
1632 CompletableFuture<? extends T> a;
1633 CompletableFuture<? extends U> b;
1634 BiBlock<? super T,? super U> fn;
1635 CompletableFuture<Void> dst;
1636 if ((dst = this.dst) != null &&
1637 (fn = this.fn) != null &&
1638 (a = this.src) != null &&
1639 (r = a.result) != null &&
1640 (b = this.snd) != null &&
1641 (s = b.result) != null &&
1642 compareAndSet(0, 1)) {
1643 if (r instanceof AltResult) {
1644 ex = ((AltResult)r).ex;
1645 t = null;
1646 }
1647 else {
1648 ex = null;
1649 t = (T) r;
1650 }
1651 if (ex != null)
1652 u = null;
1653 else if (s instanceof AltResult) {
1654 ex = ((AltResult)s).ex;
1655 u = null;
1656 }
1657 else
1658 u = (U) s;
1659 if (ex == null) {
1660 try {
1661 if (executor != null)
1662 executor.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
1663 else {
1664 fn.accept(t, u);
1665 dst.complete(null);
1666 }
1667 } catch (Throwable rex) {
1668 ex = rex;
1669 }
1670 }
1671 if (ex != null)
1672 dst.propagateException(ex);
1673 }
1674 }
1675 }
1676
1677 static final class AndRunnable<T> extends Completion {
1678 final CompletableFuture<? extends T> src;
1679 final CompletableFuture<?> snd;
1680 final Runnable fn;
1681 final CompletableFuture<Void> dst;
1682 final Executor executor;
1683 AndRunnable(CompletableFuture<? extends T> src,
1684 CompletableFuture<?> snd,
1685 Runnable fn,
1686 CompletableFuture<Void> dst, Executor executor) {
1687 this.src = src; this.snd = snd;
1688 this.fn = fn; this.dst = dst;
1689 this.executor = executor;
1690 }
1691 public void run() {
1692 Object r, s; Throwable ex;
1693 final CompletableFuture<? extends T> a;
1694 final CompletableFuture<?> b;
1695 final Runnable fn;
1696 final CompletableFuture<Void> dst;
1697 if ((dst = this.dst) != null &&
1698 (fn = this.fn) != null &&
1699 (a = this.src) != null &&
1700 (r = a.result) != null &&
1701 (b = this.snd) != null &&
1702 (s = b.result) != null &&
1703 compareAndSet(0, 1)) {
1704 if (r instanceof AltResult)
1705 ex = ((AltResult)r).ex;
1706 else
1707 ex = null;
1708 if (ex == null && (s instanceof AltResult))
1709 ex = ((AltResult)s).ex;
1710 if (ex == null) {
1711 try {
1712 if (executor != null)
1713 executor.execute(new AsyncRunnable(fn, dst));
1714 else {
1715 fn.run();
1716 dst.complete(null);
1717 }
1718 } catch (Throwable rex) {
1719 ex = rex;
1720 }
1721 }
1722 if (ex != null)
1723 dst.propagateException(ex);
1724 }
1725 }
1726 }
1727
1728 static final class OrFunction<T,U> extends Completion {
1729 final CompletableFuture<? extends T> src;
1730 final CompletableFuture<? extends T> snd;
1731 final Function<? super T,? extends U> fn;
1732 final CompletableFuture<U> dst;
1733 final Executor executor;
1734 OrFunction(CompletableFuture<? extends T> src,
1735 CompletableFuture<? extends T> snd,
1736 Function<? super T,? extends U> fn,
1737 CompletableFuture<U> dst, Executor executor) {
1738 this.src = src; this.snd = snd;
1739 this.fn = fn; this.dst = dst;
1740 this.executor = executor;
1741 }
1742 public void run() {
1743 Object r; T t; Throwable ex;
1744 CompletableFuture<? extends T> a;
1745 CompletableFuture<? extends T> b;
1746 Function<? super T,? extends U> fn;
1747 CompletableFuture<U> dst;
1748 if ((dst = this.dst) != null &&
1749 (fn = this.fn) != null &&
1750 (((a = this.src) != null && (r = a.result) != null) ||
1751 ((b = this.snd) != null && (r = b.result) != null)) &&
1752 compareAndSet(0, 1)) {
1753 if (r instanceof AltResult) {
1754 ex = ((AltResult)r).ex;
1755 t = null;
1756 }
1757 else {
1758 ex = null;
1759 t = (T) r;
1760 }
1761 if (ex == null) {
1762 try {
1763 if (executor != null)
1764 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
1765 else
1766 dst.complete(fn.apply(t));
1767 } catch (Throwable rex) {
1768 ex = rex;
1769 }
1770 }
1771 if (ex != null)
1772 dst.propagateException(ex);
1773 }
1774 }
1775 }
1776
1777 static final class OrBlock<T> extends Completion {
1778 final CompletableFuture<? extends T> src;
1779 final CompletableFuture<? extends T> snd;
1780 final Block<? super T> fn;
1781 final CompletableFuture<Void> dst;
1782 final Executor executor;
1783 OrBlock(CompletableFuture<? extends T> src,
1784 CompletableFuture<? extends T> snd,
1785 Block<? super T> fn,
1786 CompletableFuture<Void> dst, Executor executor) {
1787 this.src = src; this.snd = snd;
1788 this.fn = fn; this.dst = dst;
1789 this.executor = executor;
1790 }
1791 public void run() {
1792 Object r; T t; Throwable ex;
1793 CompletableFuture<? extends T> a;
1794 CompletableFuture<? extends T> b;
1795 Block<? super T> fn;
1796 CompletableFuture<Void> dst;
1797 if ((dst = this.dst) != null &&
1798 (fn = this.fn) != null &&
1799 (((a = this.src) != null && (r = a.result) != null) ||
1800 ((b = this.snd) != null && (r = b.result) != null)) &&
1801 compareAndSet(0, 1)) {
1802 if (r instanceof AltResult) {
1803 ex = ((AltResult)r).ex;
1804 t = null;
1805 }
1806 else {
1807 ex = null;
1808 t = (T) r;
1809 }
1810 if (ex == null) {
1811 try {
1812 if (executor != null)
1813 executor.execute(new AsyncBlock<T>(t, fn, dst));
1814 else {
1815 fn.accept(t);
1816 dst.complete(null);
1817 }
1818 } catch (Throwable rex) {
1819 ex = rex;
1820 }
1821 }
1822 if (ex != null)
1823 dst.propagateException(ex);
1824 }
1825 }
1826 }
1827
1828 static final class OrRunnable<T> extends Completion {
1829 final CompletableFuture<? extends T> src;
1830 final CompletableFuture<?> snd;
1831 final Runnable fn;
1832 final CompletableFuture<Void> dst;
1833 final Executor executor;
1834 OrRunnable(CompletableFuture<? extends T> src,
1835 CompletableFuture<?> snd,
1836 Runnable fn,
1837 CompletableFuture<Void> dst, Executor executor) {
1838 this.src = src; this.snd = snd;
1839 this.fn = fn; this.dst = dst;
1840 this.executor = executor;
1841 }
1842 public void run() {
1843 Object r; Throwable ex;
1844 CompletableFuture<? extends T> a;
1845 final CompletableFuture<?> b;
1846 final Runnable fn;
1847 final CompletableFuture<Void> dst;
1848 if ((dst = this.dst) != null &&
1849 (fn = this.fn) != null &&
1850 (((a = this.src) != null && (r = a.result) != null) ||
1851 ((b = this.snd) != null && (r = b.result) != null)) &&
1852 compareAndSet(0, 1)) {
1853 if (r instanceof AltResult)
1854 ex = ((AltResult)r).ex;
1855 else
1856 ex = null;
1857 if (ex == null) {
1858 try {
1859 if (executor != null)
1860 executor.execute(new AsyncRunnable(fn, dst));
1861 else {
1862 fn.run();
1863 dst.complete(null);
1864 }
1865 } catch (Throwable rex) {
1866 ex = rex;
1867 }
1868 }
1869 if (ex != null)
1870 dst.propagateException(ex);
1871 }
1872 }
1873 }
1874
1875 static final class ExceptionAction<T> extends Completion {
1876 final CompletableFuture<? extends T> src;
1877 final Function<? super Throwable, ? extends T> fn;
1878 final CompletableFuture<T> dst;
1879 ExceptionAction(CompletableFuture<? extends T> src,
1880 Function<? super Throwable, ? extends T> fn,
1881 CompletableFuture<T> dst) {
1882 this.src = src; this.fn = fn; this.dst = dst;
1883 }
1884 public void run() {
1885 CompletableFuture<? extends T> a;
1886 Function<? super Throwable, ? extends T> fn;
1887 CompletableFuture<T> dst;
1888 Object r; T t; Throwable ex;
1889 if ((dst = this.dst) != null &&
1890 (fn = this.fn) != null &&
1891 (a = this.src) != null &&
1892 (r = a.result) != null &&
1893 compareAndSet(0, 1)) {
1894 if (r instanceof AltResult) {
1895 if ((ex = ((AltResult)r).ex) != null) {
1896 try {
1897 dst.complete(fn.apply(ex));
1898 } catch (Throwable rex) {
1899 dst.completeExceptionally(rex);
1900 }
1901 return;
1902 }
1903 t = null;
1904 }
1905 else
1906 t = (T) r;
1907 dst.complete(t);
1908 }
1909 }
1910 }
1911
1912 static final class ThenCopy<T> extends Completion {
1913 final CompletableFuture<? extends T> src;
1914 final CompletableFuture<T> dst;
1915 ThenCopy(CompletableFuture<? extends T> src,
1916 CompletableFuture<T> dst) {
1917 this.src = src; this.dst = dst;
1918 }
1919 public void run() {
1920 CompletableFuture<? extends T> a;
1921 CompletableFuture<T> dst;
1922 Object r; T t; Throwable ex;
1923 if ((dst = this.dst) != null &&
1924 (a = this.src) != null &&
1925 (r = a.result) != null &&
1926 compareAndSet(0, 1)) {
1927 dst.propagateCompletion(r);
1928 }
1929 }
1930 }
1931
1932 static final class ThenHandle<T,U> extends Completion {
1933 final CompletableFuture<? extends T> src;
1934 final BiFunction<? super T, Throwable, ? extends U> fn;
1935 final CompletableFuture<U> dst;
1936 ThenHandle(CompletableFuture<? extends T> src,
1937 BiFunction<? super T, Throwable, ? extends U> fn,
1938 final CompletableFuture<U> dst) {
1939 this.src = src; this.fn = fn; this.dst = dst;
1940 }
1941 public void run() {
1942 CompletableFuture<? extends T> a;
1943 BiFunction<? super T, Throwable, ? extends U> fn;
1944 CompletableFuture<U> dst;
1945 Object r; T t; Throwable ex;
1946 if ((dst = this.dst) != null &&
1947 (fn = this.fn) != null &&
1948 (a = this.src) != null &&
1949 (r = a.result) != null &&
1950 compareAndSet(0, 1)) {
1951 if (r instanceof AltResult) {
1952 ex = ((AltResult)r).ex;
1953 t = null;
1954 }
1955 else {
1956 ex = null;
1957 t = (T) r;
1958 }
1959 try {
1960 dst.complete(fn.apply(t, ex));
1961 } catch (Throwable rex) {
1962 dst.completeExceptionally(rex);
1963 }
1964 }
1965 }
1966 }
1967
1968 static final class ThenCompose<T,U> extends Completion {
1969 final CompletableFuture<? extends T> src;
1970 final Function<? super T, CompletableFuture<U>> fn;
1971 final CompletableFuture<U> dst;
1972 ThenCompose(CompletableFuture<? extends T> src,
1973 Function<? super T, CompletableFuture<U>> fn,
1974 final CompletableFuture<U> dst) {
1975 this.src = src; this.fn = fn; this.dst = dst;
1976 }
1977 public void run() {
1978 CompletableFuture<? extends T> a;
1979 Function<? super T, CompletableFuture<U>> fn;
1980 CompletableFuture<U> dst;
1981 Object r; T t; Throwable ex;
1982 if ((dst = this.dst) != null &&
1983 (fn = this.fn) != null &&
1984 (a = this.src) != null &&
1985 (r = a.result) != null &&
1986 compareAndSet(0, 1)) {
1987 if (r instanceof AltResult) {
1988 ex = ((AltResult)r).ex;
1989 t = null;
1990 }
1991 else {
1992 ex = null;
1993 t = (T) r;
1994 }
1995 CompletableFuture<U> c = null;
1996 if (ex == null) {
1997 try {
1998 c = fn.apply(t);
1999 } catch (Throwable rex) {
2000 ex = rex;
2001 }
2002 }
2003 if (ex != null || c == null) {
2004 if (ex == null)
2005 ex = new NullPointerException();
2006 }
2007 else {
2008 ThenCopy<U> d = null;
2009 Object s;
2010 if ((s = c.result) == null) {
2011 CompletionNode p = new CompletionNode
2012 (d = new ThenCopy<U>(c, dst));
2013 while ((s = c.result) == null) {
2014 if (UNSAFE.compareAndSwapObject
2015 (c, COMPLETIONS, p.next = c.completions, p))
2016 break;
2017 }
2018 }
2019 if (s != null && (d == null || d.compareAndSet(0, 1))) {
2020 U u;
2021 if (s instanceof AltResult) {
2022 ex = ((AltResult)s).ex; // no rewrap
2023 u = null;
2024 }
2025 else
2026 u = (U) s;
2027 if (ex == null)
2028 dst.complete(u);
2029 }
2030 }
2031 if (ex != null)
2032 dst.propagateException(ex);
2033 else if (c != null && c.result != null)
2034 c.postComplete();
2035 }
2036 }
2037 }
2038
2039 /* ------------- then/and/or implementations -------------- */
2040
2041 private <U> CompletableFuture<U> thenFunction(Function<? super T,? extends U> fn,
2042 Executor executor) {
2043 if (fn == null) throw new NullPointerException();
2044 CompletableFuture<U> dst = new CompletableFuture<U>();
2045 ThenFunction<T,U> d = null;
2046 Object r;
2047 if ((r = result) == null) {
2048 CompletionNode p = new CompletionNode
2049 (d = new ThenFunction<T,U>(this, fn, dst, executor));
2050 while ((r = result) == null) {
2051 if (UNSAFE.compareAndSwapObject
2052 (this, COMPLETIONS, p.next = completions, p))
2053 break;
2054 }
2055 }
2056 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2057 T t; Throwable ex;
2058 if (r instanceof AltResult) {
2059 ex = ((AltResult)r).ex;
2060 t = null;
2061 }
2062 else {
2063 ex = null;
2064 t = (T) r;
2065 }
2066 if (ex == null) {
2067 try {
2068 if (executor != null)
2069 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
2070 else
2071 dst.complete(fn.apply(t));
2072 } catch (Throwable rex) {
2073 ex = rex;
2074 }
2075 }
2076 if (ex != null)
2077 dst.propagateException(ex);
2078 }
2079 if (r != null || result != null)
2080 postComplete();
2081 return dst;
2082 }
2083
2084 private CompletableFuture<Void> thenBlock(Block<? super T> fn,
2085 Executor executor) {
2086 if (fn == null) throw new NullPointerException();
2087 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2088 ThenBlock<T> d = null;
2089 Object r;
2090 if ((r = result) == null) {
2091 CompletionNode p = new CompletionNode
2092 (d = new ThenBlock<T>(this, fn, dst, executor));
2093 while ((r = result) == null) {
2094 if (UNSAFE.compareAndSwapObject
2095 (this, COMPLETIONS, p.next = completions, p))
2096 break;
2097 }
2098 }
2099 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2100 T t; Throwable ex;
2101 if (r instanceof AltResult) {
2102 ex = ((AltResult)r).ex;
2103 t = null;
2104 }
2105 else {
2106 ex = null;
2107 t = (T) r;
2108 }
2109 if (ex == null) {
2110 try {
2111 if (executor != null)
2112 executor.execute(new AsyncBlock<T>(t, fn, dst));
2113 else {
2114 fn.accept(t);
2115 dst.complete(null);
2116 }
2117 } catch (Throwable rex) {
2118 ex = rex;
2119 }
2120 }
2121 if (ex != null)
2122 dst.propagateException(ex);
2123 }
2124 if (r != null || result != null)
2125 postComplete();
2126 return dst;
2127 }
2128
2129 private CompletableFuture<Void> thenRunnable(Runnable action,
2130 Executor executor) {
2131 if (action == null) throw new NullPointerException();
2132 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2133 ThenRunnable<T> d = null;
2134 Object r;
2135 if ((r = result) == null) {
2136 CompletionNode p = new CompletionNode
2137 (d = new ThenRunnable<T>(this, action, dst, executor));
2138 while ((r = result) == null) {
2139 if (UNSAFE.compareAndSwapObject
2140 (this, COMPLETIONS, p.next = completions, p))
2141 break;
2142 }
2143 }
2144 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2145 Throwable ex;
2146 if (r instanceof AltResult)
2147 ex = ((AltResult)r).ex;
2148 else
2149 ex = null;
2150 if (ex == null) {
2151 try {
2152 if (executor != null)
2153 executor.execute(new AsyncRunnable(action, dst));
2154 else {
2155 action.run();
2156 dst.complete(null);
2157 }
2158 } catch (Throwable rex) {
2159 ex = rex;
2160 }
2161 }
2162 if (ex != null)
2163 dst.propagateException(ex);
2164 }
2165 if (r != null || result != null)
2166 postComplete();
2167 return dst;
2168 }
2169
2170 private <U,V> CompletableFuture<V> andFunction(CompletableFuture<? extends U> other,
2171 BiFunction<? super T,? super U,? extends V> fn,
2172 Executor executor) {
2173 if (other == null || fn == null) throw new NullPointerException();
2174 CompletableFuture<V> dst = new CompletableFuture<V>();
2175 AndFunction<T,U,V> d = null;
2176 Object r, s = null;
2177 if ((r = result) == null || (s = other.result) == null) {
2178 d = new AndFunction<T,U,V>(this, other, fn, dst, executor);
2179 CompletionNode q = null, p = new CompletionNode(d);
2180 while ((r == null && (r = result) == null) ||
2181 (s == null && (s = other.result) == null)) {
2182 if (q != null) {
2183 if (s != null ||
2184 UNSAFE.compareAndSwapObject
2185 (other, COMPLETIONS, q.next = other.completions, q))
2186 break;
2187 }
2188 else if (r != null ||
2189 UNSAFE.compareAndSwapObject
2190 (this, COMPLETIONS, p.next = completions, p)) {
2191 if (s != null)
2192 break;
2193 q = new CompletionNode(d);
2194 }
2195 }
2196 }
2197 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2198 T t; U u; Throwable ex;
2199 if (r instanceof AltResult) {
2200 ex = ((AltResult)r).ex;
2201 t = null;
2202 }
2203 else {
2204 ex = null;
2205 t = (T) r;
2206 }
2207 if (ex != null)
2208 u = null;
2209 else if (s instanceof AltResult) {
2210 ex = ((AltResult)s).ex;
2211 u = null;
2212 }
2213 else
2214 u = (U) s;
2215 if (ex == null) {
2216 try {
2217 if (executor != null)
2218 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
2219 else
2220 dst.complete(fn.apply(t, u));
2221 } catch (Throwable rex) {
2222 ex = rex;
2223 }
2224 }
2225 if (ex != null)
2226 dst.propagateException(ex);
2227 }
2228 if (r != null || result != null)
2229 postComplete();
2230 if (s != null || other.result != null)
2231 other.postComplete();
2232 return dst;
2233 }
2234
2235 private <U> CompletableFuture<Void> andBlock(CompletableFuture<? extends U> other,
2236 BiBlock<? super T,? super U> fn,
2237 Executor executor) {
2238 if (other == null || fn == null) throw new NullPointerException();
2239 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2240 AndBlock<T,U> d = null;
2241 Object r, s = null;
2242 if ((r = result) == null || (s = other.result) == null) {
2243 d = new AndBlock<T,U>(this, other, fn, dst, executor);
2244 CompletionNode q = null, p = new CompletionNode(d);
2245 while ((r == null && (r = result) == null) ||
2246 (s == null && (s = other.result) == null)) {
2247 if (q != null) {
2248 if (s != null ||
2249 UNSAFE.compareAndSwapObject
2250 (other, COMPLETIONS, q.next = other.completions, q))
2251 break;
2252 }
2253 else if (r != null ||
2254 UNSAFE.compareAndSwapObject
2255 (this, COMPLETIONS, p.next = completions, p)) {
2256 if (s != null)
2257 break;
2258 q = new CompletionNode(d);
2259 }
2260 }
2261 }
2262 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2263 T t; U u; Throwable ex;
2264 if (r instanceof AltResult) {
2265 ex = ((AltResult)r).ex;
2266 t = null;
2267 }
2268 else {
2269 ex = null;
2270 t = (T) r;
2271 }
2272 if (ex != null)
2273 u = null;
2274 else if (s instanceof AltResult) {
2275 ex = ((AltResult)s).ex;
2276 u = null;
2277 }
2278 else
2279 u = (U) s;
2280 if (ex == null) {
2281 try {
2282 if (executor != null)
2283 executor.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
2284 else {
2285 fn.accept(t, u);
2286 dst.complete(null);
2287 }
2288 } catch (Throwable rex) {
2289 ex = rex;
2290 }
2291 }
2292 if (ex != null)
2293 dst.propagateException(ex);
2294 }
2295 if (r != null || result != null)
2296 postComplete();
2297 if (s != null || other.result != null)
2298 other.postComplete();
2299 return dst;
2300 }
2301
2302 private CompletableFuture<Void> andRunnable(CompletableFuture<?> other,
2303 Runnable action,
2304 Executor executor) {
2305 if (other == null || action == null) throw new NullPointerException();
2306 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2307 AndRunnable<T> d = null;
2308 Object r, s = null;
2309 if ((r = result) == null || (s = other.result) == null) {
2310 d = new AndRunnable<T>(this, other, action, dst, executor);
2311 CompletionNode q = null, p = new CompletionNode(d);
2312 while ((r == null && (r = result) == null) ||
2313 (s == null && (s = other.result) == null)) {
2314 if (q != null) {
2315 if (s != null ||
2316 UNSAFE.compareAndSwapObject
2317 (other, COMPLETIONS, q.next = other.completions, q))
2318 break;
2319 }
2320 else if (r != null ||
2321 UNSAFE.compareAndSwapObject
2322 (this, COMPLETIONS, p.next = completions, p)) {
2323 if (s != null)
2324 break;
2325 q = new CompletionNode(d);
2326 }
2327 }
2328 }
2329 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2330 Throwable ex;
2331 if (r instanceof AltResult)
2332 ex = ((AltResult)r).ex;
2333 else
2334 ex = null;
2335 if (ex == null && (s instanceof AltResult))
2336 ex = ((AltResult)s).ex;
2337 if (ex == null) {
2338 try {
2339 if (executor != null)
2340 executor.execute(new AsyncRunnable(action, dst));
2341 else {
2342 action.run();
2343 dst.complete(null);
2344 }
2345 } catch (Throwable rex) {
2346 ex = rex;
2347 }
2348 }
2349 if (ex != null)
2350 dst.propagateException(ex);
2351 }
2352 if (r != null || result != null)
2353 postComplete();
2354 if (s != null || other.result != null)
2355 other.postComplete();
2356 return dst;
2357 }
2358
2359 private <U> CompletableFuture<U> orFunction(CompletableFuture<? extends T> other,
2360 Function<? super T, U> fn,
2361 Executor executor) {
2362 if (other == null || fn == null) throw new NullPointerException();
2363 CompletableFuture<U> dst = new CompletableFuture<U>();
2364 OrFunction<T,U> d = null;
2365 Object r;
2366 if ((r = result) == null && (r = other.result) == null) {
2367 d = new OrFunction<T,U>(this, other, fn, dst, executor);
2368 CompletionNode q = null, p = new CompletionNode(d);
2369 while ((r = result) == null && (r = other.result) == null) {
2370 if (q != null) {
2371 if (UNSAFE.compareAndSwapObject
2372 (other, COMPLETIONS, q.next = other.completions, q))
2373 break;
2374 }
2375 else if (UNSAFE.compareAndSwapObject
2376 (this, COMPLETIONS, p.next = completions, p))
2377 q = new CompletionNode(d);
2378 }
2379 }
2380 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2381 T t; Throwable ex;
2382 if (r instanceof AltResult) {
2383 ex = ((AltResult)r).ex;
2384 t = null;
2385 }
2386 else {
2387 ex = null;
2388 t = (T) r;
2389 }
2390 if (ex == null) {
2391 try {
2392 if (executor != null)
2393 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
2394 else
2395 dst.complete(fn.apply(t));
2396 } catch (Throwable rex) {
2397 ex = rex;
2398 }
2399 }
2400 if (ex != null)
2401 dst.propagateException(ex);
2402 }
2403 if (result != null)
2404 postComplete();
2405 if (other.result != null)
2406 other.postComplete();
2407 return dst;
2408 }
2409
2410 private CompletableFuture<Void> orBlock(CompletableFuture<? extends T> other,
2411 Block<? super T> fn,
2412 Executor executor) {
2413 if (other == null || fn == null) throw new NullPointerException();
2414 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2415 OrBlock<T> d = null;
2416 Object r;
2417 if ((r = result) == null && (r = other.result) == null) {
2418 d = new OrBlock<T>(this, other, fn, dst, executor);
2419 CompletionNode q = null, p = new CompletionNode(d);
2420 while ((r = result) == null && (r = other.result) == null) {
2421 if (q != null) {
2422 if (UNSAFE.compareAndSwapObject
2423 (other, COMPLETIONS, q.next = other.completions, q))
2424 break;
2425 }
2426 else if (UNSAFE.compareAndSwapObject
2427 (this, COMPLETIONS, p.next = completions, p))
2428 q = new CompletionNode(d);
2429 }
2430 }
2431 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2432 T t; Throwable ex;
2433 if (r instanceof AltResult) {
2434 ex = ((AltResult)r).ex;
2435 t = null;
2436 }
2437 else {
2438 ex = null;
2439 t = (T) r;
2440 }
2441 if (ex == null) {
2442 try {
2443 if (executor != null)
2444 executor.execute(new AsyncBlock<T>(t, fn, dst));
2445 else {
2446 fn.accept(t);
2447 dst.complete(null);
2448 }
2449 } catch (Throwable rex) {
2450 ex = rex;
2451 }
2452 }
2453 if (ex != null)
2454 dst.propagateException(ex);
2455 }
2456 if (result != null)
2457 postComplete();
2458 if (other.result != null)
2459 other.postComplete();
2460 return dst;
2461 }
2462
2463 private CompletableFuture<Void> orRunnable(CompletableFuture<?> other,
2464 Runnable action,
2465 Executor executor) {
2466 if (other == null || action == null) throw new NullPointerException();
2467 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2468 OrRunnable<T> d = null;
2469 Object r;
2470 if ((r = result) == null && (r = other.result) == null) {
2471 d = new OrRunnable<T>(this, other, action, dst, executor);
2472 CompletionNode q = null, p = new CompletionNode(d);
2473 while ((r = result) == null && (r = other.result) == null) {
2474 if (q != null) {
2475 if (UNSAFE.compareAndSwapObject
2476 (other, COMPLETIONS, q.next = other.completions, q))
2477 break;
2478 }
2479 else if (UNSAFE.compareAndSwapObject
2480 (this, COMPLETIONS, p.next = completions, p))
2481 q = new CompletionNode(d);
2482 }
2483 }
2484 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2485 Throwable ex;
2486 if (r instanceof AltResult)
2487 ex = ((AltResult)r).ex;
2488 else
2489 ex = null;
2490 if (ex == null) {
2491 try {
2492 if (executor != null)
2493 executor.execute(new AsyncRunnable(action, dst));
2494 else {
2495 action.run();
2496 dst.complete(null);
2497 }
2498 } catch (Throwable rex) {
2499 ex = rex;
2500 }
2501 }
2502 if (ex != null)
2503 dst.propagateException(ex);
2504 }
2505 if (result != null)
2506 postComplete();
2507 if (other.result != null)
2508 other.postComplete();
2509 return dst;
2510 }
2511
2512 // Unsafe mechanics
2513 private static final sun.misc.Unsafe UNSAFE;
2514 private static final long RESULT;
2515 private static final long WAITERS;
2516 private static final long COMPLETIONS;
2517 static {
2518 try {
2519 UNSAFE = sun.misc.Unsafe.getUnsafe();
2520 Class<?> k = CompletableFuture.class;
2521 RESULT = UNSAFE.objectFieldOffset
2522 (k.getDeclaredField("result"));
2523 WAITERS = UNSAFE.objectFieldOffset
2524 (k.getDeclaredField("waiters"));
2525 COMPLETIONS = UNSAFE.objectFieldOffset
2526 (k.getDeclaredField("completions"));
2527 } catch (Exception e) {
2528 throw new Error(e);
2529 }
2530 }
2531
2532 }