ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.24
Committed: Mon Dec 31 17:50:56 2012 UTC (11 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.23: +1 -1 lines
Log Message:
double trouble

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Block;
10 import java.util.function.BiBlock;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.atomic.AtomicInteger;
24 import java.util.concurrent.locks.LockSupport;
25
26 /**
27 * A {@link Future} that may be explicitly completed (setting its
28 * value and status), and may include dependent functions and actions
29 * that trigger upon its completion.
30 *
31 * <p>Similar methods are available for Functions, Blocks, and
32 * Runnables, depending on whether actions require arguments and/or
33 * produce results. Functions and actions supplied for dependent
34 * completions (mainly using methods with prefix {@code then}) may be
35 * performed by the thread that completes the current
36 * CompletableFuture, or by any other caller of these methods. There
37 * are no guarantees about the order of processing completions unless
38 * constrained by these methods.
39 *
40 * <p>When two or more threads attempt to {@link #complete} or {@link
41 * #completeExceptionally} a CompletableFuture, only one of them
42 * succeeds.
43 *
44 * <p>Upon exceptional completion, or when a completion entails
45 * computation of a function or action, and it terminates abruptly
46 * with an (unchecked) exception or error, then further completions
47 * act as {@code completeExceptionally} with a {@link
48 * CompletionException} holding that exception as its cause. If a
49 * CompletableFuture completes exceptionally, and is not followed by a
50 * {@link #exceptionally} or {@link #handle} completion, then all of
51 * its dependents (and their dependents) also complete exceptionally
52 * with CompletionExceptions holding the ultimate cause. In case of a
53 * CompletionException, methods {@link #get()} and {@link #get(long,
54 * TimeUnit)} throw a {@link ExecutionException} with the same cause
55 * as would be held in the corresponding CompletionException. However,
56 * in these cases, methods {@link #join()} and {@link #getNow} throw
57 * the CompletionException, which simplifies usage especially within
58 * other completion functions.
59 *
60 * <p>CompletableFutures themselves do not execute asynchronously.
61 * However, the {@code async} methods provide commonly useful ways to
62 * commence asynchronous processing, using either a given {@link
63 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
64 * function or action that will result in the completion of a new
65 * CompletableFuture.
66 *
67 * @author Doug Lea
68 * @since 1.8
69 */
70 public class CompletableFuture<T> implements Future<T> {
71 /*
72 * Overview:
73 *
74 * 1. Non-nullness of field result (set via CAS) indicates
75 * done. An AltResult is used to box null as a result, as well as
76 * to hold exceptions. Using a single field makes completion fast
77 * and simple to detect and trigger, at the expense of a lot of
78 * encoding and decoding that infiltrates many methods. One minor
79 * simplification relies on the (static) NIL (to box null results)
80 * being the only AltResult with a null exception field, so we
81 * don't usually need explicit comparisons with NIL.
82 *
83 * 2. Waiters are held in a Treiber stack similar to the one used
84 * in FutureTask, Phaser, and SynchronousQueue. See their
85 * internal documentation for details.
86 *
87 * 3. Completions are also kept in a list/stack, and pulled off
88 * and run when completion is triggered. (We could in fact use the
89 * same stack as for waiters, but would give up the potential
90 * parallelism obtained because woken waiters help release/run
91 * others (see method postComplete). Because post-processing may
92 * race with direct calls, completions extend AtomicInteger so
93 * callers can claim the action via compareAndSet(0, 1). The
94 * Completion.run methods are all written a boringly similar
95 * uniform way (that sometimes includes unnecessary-looking
96 * checks, kept to maintain uniformity). There are enough
97 * dimensions upon which they differ that factoring to use common
98 * code isn't worthwhile.
99 *
100 * 4. The exported then/and/or methods do support a bit of
101 * factoring (see thenFunction, andBlock, etc). They must cope
102 * with the intrinsic races surrounding addition of a dependent
103 * action versus performing the action directly because the task
104 * is already complete. For example, a CF may not be complete
105 * upon entry, so a dependent completion is added, but by the time
106 * it is added, the target CF is complete, so must be directly
107 * executed. This is all done while avoiding unnecessary object
108 * construction in safe-bypass cases.
109 */
110
111 // preliminary class definitions
112
113 static final class AltResult {
114 final Throwable ex; // null only for NIL
115 AltResult(Throwable ex) { this.ex = ex; }
116 }
117
118 static final AltResult NIL = new AltResult(null);
119
120 /**
121 * Simple linked list nodes to record waiting threads in a Treiber
122 * stack. See other classes such as Phaser and SynchronousQueue
123 * for more detailed explanation.
124 */
125 static final class WaitNode {
126 volatile Thread thread;
127 volatile WaitNode next;
128 }
129
130 /**
131 * Simple linked list nodes to record completions, used in
132 * basically the same way as WaitNodes. (We separate nodes from
133 * the Completions themselves mainly because for the And and Or
134 * methods, the same Completion object resides in two lists.)
135 */
136 static final class CompletionNode {
137 final Completion completion;
138 volatile CompletionNode next;
139 CompletionNode(Completion completion) { this.completion = completion; }
140 }
141
142
143 // Fields
144
145 volatile Object result; // Either the result or boxed AltResult
146 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
147 volatile CompletionNode completions; // list (Treiber stack) of completions
148
149 // Basic utilities for triggering and processing completions
150 // (The Completion and Async classes and internal methods that
151 // use them are defined after the public methods.)
152
153 /**
154 * Removes and signals all waiting threads and runs all completions.
155 */
156 final void postComplete() {
157 WaitNode q; Thread t;
158 while ((q = waiters) != null) {
159 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
160 (t = q.thread) != null) {
161 q.thread = null;
162 LockSupport.unpark(t);
163 }
164 }
165
166 CompletionNode h; Completion c;
167 while ((h = completions) != null) {
168 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
169 (c = h.completion) != null)
170 c.run();
171 }
172 }
173
174 /**
175 * Triggers completion with the encoding of the given arguments:
176 * if the exception is non-null, encodes it as a wrapped
177 * CompletionException unless it is one already. Otherwise uses
178 * the given result, boxed as NIL if null.
179 */
180 final void internalComplete(Object v, Throwable ex) {
181 if (result == null)
182 UNSAFE.compareAndSwapObject
183 (this, RESULT, null,
184 (ex == null) ? (v == null) ? NIL : v :
185 new AltResult((ex instanceof CompletionException) ? ex :
186 new CompletionException(ex)));
187 postComplete(); // help out even if not triggered
188 }
189
190 /**
191 * If triggered, help release and/or process completions
192 */
193 final void helpPostComplete() {
194 if (result != null)
195 postComplete();
196 }
197
198 // public methods
199
200 /**
201 * Creates a new incomplete CompletableFuture.
202 */
203 public CompletableFuture() {
204 }
205
206 /**
207 * Asynchronously executes in the {@link
208 * ForkJoinPool#commonPool()}, a task that completes the returned
209 * CompletableFuture with the result of the given Supplier.
210 *
211 * @param supplier a function returning the value to be used
212 * to complete the returned CompletableFuture
213 * @return the CompletableFuture
214 */
215 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
216 if (supplier == null) throw new NullPointerException();
217 CompletableFuture<U> f = new CompletableFuture<U>();
218 ForkJoinPool.commonPool().
219 execute((ForkJoinTask<?>)new AsyncSupplier<U>(supplier, f));
220 return f;
221 }
222
223 /**
224 * Asynchronously executes using the given executor, a task that
225 * completes the returned CompletableFuture with the result of the
226 * given Supplier.
227 *
228 * @param supplier a function returning the value to be used
229 * to complete the returned CompletableFuture
230 * @param executor the executor to use for asynchronous execution
231 * @return the CompletableFuture
232 */
233 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
234 Executor executor) {
235 if (executor == null || supplier == null)
236 throw new NullPointerException();
237 CompletableFuture<U> f = new CompletableFuture<U>();
238 executor.execute(new AsyncSupplier<U>(supplier, f));
239 return f;
240 }
241
242 /**
243 * Asynchronously executes in the {@link
244 * ForkJoinPool#commonPool()} a task that runs the given action,
245 * and then completes the returned CompletableFuture.
246 *
247 * @param runnable the action to run before completing the
248 * returned CompletableFuture
249 * @return the CompletableFuture
250 */
251 public static CompletableFuture<Void> runAsync(Runnable runnable) {
252 if (runnable == null) throw new NullPointerException();
253 CompletableFuture<Void> f = new CompletableFuture<Void>();
254 ForkJoinPool.commonPool().
255 execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
256 return f;
257 }
258
259 /**
260 * Asynchronously executes using the given executor, a task that
261 * runs the given action, and then completes the returned
262 * CompletableFuture.
263 *
264 * @param runnable the action to run before completing the
265 * returned CompletableFuture
266 * @param executor the executor to use for asynchronous execution
267 * @return the CompletableFuture
268 */
269 public static CompletableFuture<Void> runAsync(Runnable runnable,
270 Executor executor) {
271 if (executor == null || runnable == null)
272 throw new NullPointerException();
273 CompletableFuture<Void> f = new CompletableFuture<Void>();
274 executor.execute(new AsyncRunnable(runnable, f));
275 return f;
276 }
277
278 /**
279 * Returns {@code true} if completed in any fashion: normally,
280 * exceptionally, or via cancellation.
281 *
282 * @return {@code true} if completed
283 */
284 public boolean isDone() {
285 return result != null;
286 }
287
288 /**
289 * Waits if necessary for the computation to complete, and then
290 * retrieves its result.
291 *
292 * @return the computed result
293 * @throws CancellationException if the computation was cancelled
294 * @throws ExecutionException if the computation threw an
295 * exception
296 * @throws InterruptedException if the current thread was interrupted
297 * while waiting
298 */
299 @SuppressWarnings("unchecked") public T get() throws InterruptedException, ExecutionException {
300 Object r; Throwable ex, cause;
301 if ((r = result) == null && (r = waitingGet(true)) == null)
302 throw new InterruptedException();
303 if (r instanceof AltResult) {
304 if ((ex = ((AltResult)r).ex) != null) {
305 if (ex instanceof CancellationException)
306 throw (CancellationException)ex;
307 if ((ex instanceof CompletionException) &&
308 (cause = ex.getCause()) != null)
309 ex = cause;
310 throw new ExecutionException(ex);
311 }
312 return null;
313 }
314 return (T)r;
315 }
316
317 /**
318 * Waits if necessary for at most the given time for completion,
319 * and then retrieves its result, if available.
320 *
321 * @param timeout the maximum time to wait
322 * @param unit the time unit of the timeout argument
323 * @return the computed result
324 * @throws CancellationException if the computation was cancelled
325 * @throws ExecutionException if the computation threw an
326 * exception
327 * @throws InterruptedException if the current thread was interrupted
328 * while waiting
329 * @throws TimeoutException if the wait timed out
330 */
331 @SuppressWarnings("unchecked") public T get(long timeout, TimeUnit unit)
332 throws InterruptedException, ExecutionException, TimeoutException {
333 Object r; Throwable ex, cause;
334 long nanos = unit.toNanos(timeout);
335 if (Thread.interrupted())
336 throw new InterruptedException();
337 if ((r = result) == null)
338 r = timedAwaitDone(nanos);
339 if (r instanceof AltResult) {
340 if ((ex = ((AltResult)r).ex) != null) {
341 if (ex instanceof CancellationException)
342 throw (CancellationException)ex;
343 if ((ex instanceof CompletionException) &&
344 (cause = ex.getCause()) != null)
345 ex = cause;
346 throw new ExecutionException(ex);
347 }
348 return null;
349 }
350 return (T)r;
351 }
352
353 /**
354 * Returns the result value when complete, or throws an
355 * (unchecked) exception if completed exceptionally. To better
356 * conform with the use of common functional forms, if a
357 * computation involved in the completion of this
358 * CompletableFuture threw an exception, this method throws an
359 * (unchecked) {@link CompletionException} with the underlying
360 * exception as its cause.
361 *
362 * @return the result value
363 * @throws CancellationException if the computation was cancelled
364 * @throws CompletionException if a completion computation threw
365 * an exception
366 */
367 @SuppressWarnings("unchecked") public T join() {
368 Object r; Throwable ex;
369 if ((r = result) == null)
370 r = waitingGet(false);
371 if (r instanceof AltResult) {
372 if ((ex = ((AltResult)r).ex) != null) {
373 if (ex instanceof CancellationException)
374 throw (CancellationException)ex;
375 if (ex instanceof CompletionException)
376 throw (CompletionException)ex;
377 throw new CompletionException(ex);
378 }
379 return null;
380 }
381 return (T)r;
382 }
383
384 /**
385 * Returns the result value (or throws any encountered exception)
386 * if completed, else returns the given valueIfAbsent.
387 *
388 * @param valueIfAbsent the value to return if not completed
389 * @return the result value, if completed, else the given valueIfAbsent
390 * @throws CancellationException if the computation was cancelled
391 * @throws CompletionException if a completion computation threw
392 * an exception
393 */
394 @SuppressWarnings("unchecked") public T getNow(T valueIfAbsent) {
395 Object r; Throwable ex;
396 if ((r = result) == null)
397 return valueIfAbsent;
398 if (r instanceof AltResult) {
399 if ((ex = ((AltResult)r).ex) != null) {
400 if (ex instanceof CancellationException)
401 throw (CancellationException)ex;
402 if (ex instanceof CompletionException)
403 throw (CompletionException)ex;
404 throw new CompletionException(ex);
405 }
406 return null;
407 }
408 return (T)r;
409 }
410
411 /**
412 * If not already completed, sets the value returned by {@link
413 * #get()} and related methods to the given value.
414 *
415 * @param value the result value
416 * @return true if this invocation caused this CompletableFuture
417 * to transition to a completed state, else false
418 */
419 public boolean complete(T value) {
420 boolean triggered =
421 result == null &&
422 UNSAFE.compareAndSwapObject(this, RESULT, null,
423 value == null ? NIL : value);
424 postComplete();
425 return triggered;
426 }
427
428 /**
429 * If not already completed, causes invocations of {@link #get()}
430 * and related methods to throw the given exception.
431 *
432 * @param ex the exception
433 * @return true if this invocation caused this CompletableFuture
434 * to transition to a completed state, else false
435 */
436 public boolean completeExceptionally(Throwable ex) {
437 if (ex == null) throw new NullPointerException();
438 boolean triggered =
439 result == null &&
440 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
441 postComplete();
442 return triggered;
443 }
444
445 /**
446 * Creates and returns a CompletableFuture that is completed with
447 * the result of the given function of this CompletableFuture.
448 * If this CompletableFuture completes exceptionally,
449 * then the returned CompletableFuture also does so,
450 * with a CompletionException holding this exception as
451 * its cause.
452 *
453 * @param fn the function to use to compute the value of
454 * the returned CompletableFuture
455 * @return the new CompletableFuture
456 */
457 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
458 return thenFunction(fn, null);
459 }
460
461 /**
462 * Creates and returns a CompletableFuture that is asynchronously
463 * completed using the {@link ForkJoinPool#commonPool()} with the
464 * result of the given function of this CompletableFuture. If
465 * this CompletableFuture completes exceptionally, then the
466 * returned CompletableFuture also does so, with a
467 * CompletionException holding this exception as its cause.
468 *
469 * @param fn the function to use to compute the value of
470 * the returned CompletableFuture
471 * @return the new CompletableFuture
472 */
473 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
474 return thenFunction(fn, ForkJoinPool.commonPool());
475 }
476
477 /**
478 * Creates and returns a CompletableFuture that is asynchronously
479 * completed using the given executor with the result of the given
480 * function of this CompletableFuture. If this CompletableFuture
481 * completes exceptionally, then the returned CompletableFuture
482 * also does so, with a CompletionException holding this exception as
483 * its cause.
484 *
485 * @param fn the function to use to compute the value of
486 * the returned CompletableFuture
487 * @param executor the executor to use for asynchronous execution
488 * @return the new CompletableFuture
489 */
490 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
491 Executor executor) {
492 if (executor == null) throw new NullPointerException();
493 return thenFunction(fn, executor);
494 }
495
496 /**
497 * Creates and returns a CompletableFuture that is completed after
498 * performing the given action with this CompletableFuture's
499 * result when it completes. If this CompletableFuture
500 * completes exceptionally, then the returned CompletableFuture
501 * also does so, with a CompletionException holding this exception as
502 * its cause.
503 *
504 * @param block the action to perform before completing the
505 * returned CompletableFuture
506 * @return the new CompletableFuture
507 */
508 public CompletableFuture<Void> thenAccept(Block<? super T> block) {
509 return thenBlock(block, null);
510 }
511
512 /**
513 * Creates and returns a CompletableFuture that is asynchronously
514 * completed using the {@link ForkJoinPool#commonPool()} with this
515 * CompletableFuture's result when it completes. If this
516 * CompletableFuture completes exceptionally, then the returned
517 * CompletableFuture also does so, with a CompletionException holding
518 * this exception as its cause.
519 *
520 * @param block the action to perform before completing the
521 * returned CompletableFuture
522 * @return the new CompletableFuture
523 */
524 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block) {
525 return thenBlock(block, ForkJoinPool.commonPool());
526 }
527
528 /**
529 * Creates and returns a CompletableFuture that is asynchronously
530 * completed using the given executor with this
531 * CompletableFuture's result when it completes. If this
532 * CompletableFuture completes exceptionally, then the returned
533 * CompletableFuture also does so, with a CompletionException holding
534 * this exception as its cause.
535 *
536 * @param block the action to perform before completing the
537 * returned CompletableFuture
538 * @param executor the executor to use for asynchronous execution
539 * @return the new CompletableFuture
540 */
541 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block,
542 Executor executor) {
543 if (executor == null) throw new NullPointerException();
544 return thenBlock(block, executor);
545 }
546
547 /**
548 * Creates and returns a CompletableFuture that is completed after
549 * performing the given action when this CompletableFuture
550 * completes. If this CompletableFuture completes exceptionally,
551 * then the returned CompletableFuture also does so, with a
552 * CompletionException holding this exception as its cause.
553 *
554 * @param action the action to perform before completing the
555 * returned CompletableFuture
556 * @return the new CompletableFuture
557 */
558 public CompletableFuture<Void> thenRun(Runnable action) {
559 return thenRunnable(action, null);
560 }
561
562 /**
563 * Creates and returns a CompletableFuture that is asynchronously
564 * completed using the {@link ForkJoinPool#commonPool()} after
565 * performing the given action when this CompletableFuture
566 * completes. If this CompletableFuture completes exceptionally,
567 * then the returned CompletableFuture also does so, with a
568 * CompletionException holding this exception as its cause.
569 *
570 * @param action the action to perform before completing the
571 * returned CompletableFuture
572 * @return the new CompletableFuture
573 */
574 public CompletableFuture<Void> thenRunAsync(Runnable action) {
575 return thenRunnable(action, ForkJoinPool.commonPool());
576 }
577
578 /**
579 * Creates and returns a CompletableFuture that is asynchronously
580 * completed using the given executor after performing the given
581 * action when this CompletableFuture completes. If this
582 * CompletableFuture completes exceptionally, then the returned
583 * CompletableFuture also does so, with a CompletionException holding
584 * this exception as its cause.
585 *
586 * @param action the action to perform before completing the
587 * returned CompletableFuture
588 * @param executor the executor to use for asynchronous execution
589 * @return the new CompletableFuture
590 */
591 public CompletableFuture<Void> thenRunAsync(Runnable action,
592 Executor executor) {
593 if (executor == null) throw new NullPointerException();
594 return thenRunnable(action, executor);
595 }
596
597 /**
598 * Creates and returns a CompletableFuture that is completed with
599 * the result of the given function of this and the other given
600 * CompletableFuture's results when both complete. If this or
601 * the other CompletableFuture complete exceptionally, then the
602 * returned CompletableFuture also does so, with a
603 * CompletionException holding the exception as its cause.
604 *
605 * @param other the other CompletableFuture
606 * @param fn the function to use to compute the value of
607 * the returned CompletableFuture
608 * @return the new CompletableFuture
609 */
610 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
611 BiFunction<? super T,? super U,? extends V> fn) {
612 return andFunction(other, fn, null);
613 }
614
615 /**
616 * Creates and returns a CompletableFuture that is asynchronously
617 * completed using the {@link ForkJoinPool#commonPool()} with
618 * the result of the given function of this and the other given
619 * CompletableFuture's results when both complete. If this or
620 * the other CompletableFuture complete exceptionally, then the
621 * returned CompletableFuture also does so, with a
622 * CompletionException holding the exception as its cause.
623 *
624 * @param other the other CompletableFuture
625 * @param fn the function to use to compute the value of
626 * the returned CompletableFuture
627 * @return the new CompletableFuture
628 */
629 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
630 BiFunction<? super T,? super U,? extends V> fn) {
631 return andFunction(other, fn, ForkJoinPool.commonPool());
632 }
633
634 /**
635 * Creates and returns a CompletableFuture that is
636 * asynchronously completed using the given executor with the
637 * result of the given function of this and the other given
638 * CompletableFuture's results when both complete. If this or
639 * the other CompletableFuture complete exceptionally, then the
640 * returned CompletableFuture also does so, with a
641 * CompletionException holding the exception as its cause.
642 *
643 * @param other the other CompletableFuture
644 * @param fn the function to use to compute the value of
645 * the returned CompletableFuture
646 * @param executor the executor to use for asynchronous execution
647 * @return the new CompletableFuture
648 */
649
650 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
651 BiFunction<? super T,? super U,? extends V> fn,
652 Executor executor) {
653 if (executor == null) throw new NullPointerException();
654 return andFunction(other, fn, executor);
655 }
656
657 /**
658 * Creates and returns a CompletableFuture that is completed with
659 * the results of this and the other given CompletableFuture if
660 * both complete. If this and/or the other CompletableFuture
661 * complete exceptionally, then the returned CompletableFuture
662 * also does so, with a CompletionException holding one of these
663 * exceptions as its cause.
664 *
665 * @param other the other CompletableFuture
666 * @param block the action to perform before completing the
667 * returned CompletableFuture
668 * @return the new CompletableFuture
669 */
670 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
671 BiBlock<? super T, ? super U> block) {
672 return andBlock(other, block, null);
673 }
674
675 /**
676 * Creates and returns a CompletableFuture that is completed
677 * asynchronously using the {@link ForkJoinPool#commonPool()} with
678 * the results of this and the other given CompletableFuture when
679 * both complete. If this and/or the other CompletableFuture
680 * complete exceptionally, then the returned CompletableFuture
681 * also does so, with a CompletionException holding one of these
682 * exceptions as its cause.
683 *
684 * @param other the other CompletableFuture
685 * @param block the action to perform before completing the
686 * returned CompletableFuture
687 * @return the new CompletableFuture
688 */
689 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
690 BiBlock<? super T, ? super U> block) {
691 return andBlock(other, block, ForkJoinPool.commonPool());
692 }
693
694 /**
695 * Creates and returns a CompletableFuture that is completed
696 * asynchronously using the given executor with the results of
697 * this and the other given CompletableFuture when both complete.
698 * If this and/or the other CompletableFuture complete
699 * exceptionally, then the returned CompletableFuture also does
700 * so, with a CompletionException holding one of these exceptions as
701 * its cause.
702 *
703 * @param other the other CompletableFuture
704 * @param block the action to perform before completing the
705 * returned CompletableFuture
706 * @param executor the executor to use for asynchronous execution
707 * @return the new CompletableFuture
708 */
709 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
710 BiBlock<? super T, ? super U> block,
711 Executor executor) {
712 if (executor == null) throw new NullPointerException();
713 return andBlock(other, block, executor);
714 }
715
716 /**
717 * Creates and returns a CompletableFuture that is completed
718 * when this and the other given CompletableFuture both
719 * complete. If this and/or the other CompletableFuture complete
720 * exceptionally, then the returned CompletableFuture also does
721 * so, with a CompletionException holding one of these exceptions as
722 * its cause.
723 *
724 * @param other the other CompletableFuture
725 * @param action the action to perform before completing the
726 * returned CompletableFuture
727 * @return the new CompletableFuture
728 */
729 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
730 Runnable action) {
731 return andRunnable(other, action, null);
732 }
733
734 /**
735 * Creates and returns a CompletableFuture that is completed
736 * asynchronously using the {@link ForkJoinPool#commonPool()}
737 * when this and the other given CompletableFuture both
738 * complete. If this and/or the other CompletableFuture complete
739 * exceptionally, then the returned CompletableFuture also does
740 * so, with a CompletionException holding one of these exceptions as
741 * its cause.
742 *
743 * @param other the other CompletableFuture
744 * @param action the action to perform before completing the
745 * returned CompletableFuture
746 * @return the new CompletableFuture
747 */
748 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
749 Runnable action) {
750 return andRunnable(other, action, ForkJoinPool.commonPool());
751 }
752
753 /**
754 * Creates and returns a CompletableFuture that is completed
755 * asynchronously using the given executor
756 * when this and the other given CompletableFuture both
757 * complete. If this and/or the other CompletableFuture complete
758 * exceptionally, then the returned CompletableFuture also does
759 * so, with a CompletionException holding one of these exceptions as
760 * its cause.
761 *
762 * @param other the other CompletableFuture
763 * @param action the action to perform before completing the
764 * returned CompletableFuture
765 * @param executor the executor to use for asynchronous execution
766 * @return the new CompletableFuture
767 */
768 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
769 Runnable action,
770 Executor executor) {
771 if (executor == null) throw new NullPointerException();
772 return andRunnable(other, action, executor);
773 }
774
775 /**
776 * Creates and returns a CompletableFuture that is completed with
777 * the result of the given function of either this or the other
778 * given CompletableFuture's results when either complete. If
779 * this and/or the other CompletableFuture complete exceptionally,
780 * then the returned CompletableFuture may also do so, with a
781 * CompletionException holding one of these exceptions as its cause.
782 * No guarantees are made about which result or exception is used
783 * in the returned CompletableFuture.
784 *
785 * @param other the other CompletableFuture
786 * @param fn the function to use to compute the value of
787 * the returned CompletableFuture
788 * @return the new CompletableFuture
789 */
790 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
791 Function<? super T, U> fn) {
792 return orFunction(other, fn, null);
793 }
794
795 /**
796 * Creates and returns a CompletableFuture that is completed
797 * asynchronously using the {@link ForkJoinPool#commonPool()} with
798 * the result of the given function of either this or the other
799 * given CompletableFuture's results when either complete. If
800 * this and/or the other CompletableFuture complete exceptionally,
801 * then the returned CompletableFuture may also do so, with a
802 * CompletionException holding one of these exceptions as its cause.
803 * No guarantees are made about which result or exception is used
804 * in the returned CompletableFuture.
805 *
806 * @param other the other CompletableFuture
807 * @param fn the function to use to compute the value of
808 * the returned CompletableFuture
809 * @return the new CompletableFuture
810 */
811 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
812 Function<? super T, U> fn) {
813 return orFunction(other, fn, ForkJoinPool.commonPool());
814 }
815
816 /**
817 * Creates and returns a CompletableFuture that is completed
818 * asynchronously using the given executor with the result of the
819 * given function of either this or the other given
820 * CompletableFuture's results when either complete. If this
821 * and/or the other CompletableFuture complete exceptionally, then
822 * the returned CompletableFuture may also do so, with a
823 * CompletionException holding one of these exceptions as its cause.
824 * No guarantees are made about which result or exception is used
825 * in the returned CompletableFuture.
826 *
827 * @param other the other CompletableFuture
828 * @param fn the function to use to compute the value of
829 * the returned CompletableFuture
830 * @param executor the executor to use for asynchronous execution
831 * @return the new CompletableFuture
832 */
833 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
834 Function<? super T, U> fn,
835 Executor executor) {
836 if (executor == null) throw new NullPointerException();
837 return orFunction(other, fn, executor);
838 }
839
840 /**
841 * Creates and returns a CompletableFuture that is completed after
842 * performing the given action with the result of either this or the
843 * other given CompletableFuture's result, when either complete.
844 * If this and/or the other CompletableFuture complete
845 * exceptionally, then the returned CompletableFuture may also do
846 * so, with a CompletionException holding one of these exceptions as
847 * its cause. No guarantees are made about which exception is
848 * used in the returned CompletableFuture.
849 *
850 * @param other the other CompletableFuture
851 * @param block the action to perform before completing the
852 * returned CompletableFuture
853 * @return the new CompletableFuture
854 */
855 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
856 Block<? super T> block) {
857 return orBlock(other, block, null);
858 }
859
860 /**
861 * Creates and returns a CompletableFuture that is completed
862 * asynchronously using the {@link ForkJoinPool#commonPool()},
863 * performing the given action with the result of either this or
864 * the other given CompletableFuture's result, when either
865 * complete. If this and/or the other CompletableFuture complete
866 * exceptionally, then the returned CompletableFuture may also do
867 * so, with a CompletionException holding one of these exceptions as
868 * its cause. No guarantees are made about which exception is
869 * used in the returned CompletableFuture.
870 *
871 * @param other the other CompletableFuture
872 * @param block the action to perform before completing the
873 * returned CompletableFuture
874 * @return the new CompletableFuture
875 */
876 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
877 Block<? super T> block) {
878 return orBlock(other, block, ForkJoinPool.commonPool());
879 }
880
881 /**
882 * Creates and returns a CompletableFuture that is completed
883 * asynchronously using the given executor,
884 * performing the given action with the result of either this or
885 * the other given CompletableFuture's result, when either
886 * complete. If this and/or the other CompletableFuture complete
887 * exceptionally, then the returned CompletableFuture may also do
888 * so, with a CompletionException holding one of these exceptions as
889 * its cause. No guarantees are made about which exception is
890 * used in the returned CompletableFuture.
891 *
892 * @param other the other CompletableFuture
893 * @param block the action to perform before completing the
894 * returned CompletableFuture
895 * @param executor the executor to use for asynchronous execution
896 * @return the new CompletableFuture
897 */
898 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
899 Block<? super T> block,
900 Executor executor) {
901 if (executor == null) throw new NullPointerException();
902 return orBlock(other, block, executor);
903 }
904
905 /**
906 * Creates and returns a CompletableFuture that is completed
907 * after this or the other given CompletableFuture complete. If
908 * this and/or the other CompletableFuture complete exceptionally,
909 * then the returned CompletableFuture may also do so, with a
910 * CompletionException holding one of these exceptions as its cause.
911 * No guarantees are made about which exception is used in the
912 * returned CompletableFuture.
913 *
914 * @param other the other CompletableFuture
915 * @param action the action to perform before completing the
916 * returned CompletableFuture
917 * @return the new CompletableFuture
918 */
919 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
920 Runnable action) {
921 return orRunnable(other, action, null);
922 }
923
924 /**
925 * Creates and returns a CompletableFuture that is completed
926 * asynchronously using the {@link ForkJoinPool#commonPool()}
927 * after this or the other given CompletableFuture complete. If
928 * this and/or the other CompletableFuture complete exceptionally,
929 * then the returned CompletableFuture may also do so, with a
930 * CompletionException holding one of these exceptions as its cause.
931 * No guarantees are made about which exception is used in the
932 * returned CompletableFuture.
933 *
934 * @param other the other CompletableFuture
935 * @param action the action to perform before completing the
936 * returned CompletableFuture
937 * @return the new CompletableFuture
938 */
939 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
940 Runnable action) {
941 return orRunnable(other, action, ForkJoinPool.commonPool());
942 }
943
944 /**
945 * Creates and returns a CompletableFuture that is completed
946 * asynchronously using the given executor after this or the other
947 * given CompletableFuture complete. If this and/or the other
948 * CompletableFuture complete exceptionally, then the returned
949 * CompletableFuture may also do so, with a CompletionException
950 * holding one of these exceptions as its cause. No guarantees are
951 * made about which exception is used in the returned
952 * CompletableFuture.
953 *
954 * @param other the other CompletableFuture
955 * @param action the action to perform before completing the
956 * returned CompletableFuture
957 * @param executor the executor to use for asynchronous execution
958 * @return the new CompletableFuture
959 */
960 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
961 Runnable action,
962 Executor executor) {
963 if (executor == null) throw new NullPointerException();
964 return orRunnable(other, action, executor);
965 }
966
967 /**
968 * Returns a CompletableFuture (or an equivalent one) produced by
969 * the given function of the result of this CompletableFuture when
970 * completed. If this CompletableFuture completes exceptionally,
971 * then the returned CompletableFuture also does so, with a
972 * CompletionException holding this exception as its cause.
973 *
974 * @param fn the function returning a new CompletableFuture.
975 * @return the CompletableFuture, that {@code isDone()} upon
976 * return if completed by the given function, or an exception
977 * occurs.
978 */
979 @SuppressWarnings("unchecked") public <U> CompletableFuture<U> thenCompose(Function<? super T,
980 CompletableFuture<U>> fn) {
981 if (fn == null) throw new NullPointerException();
982 CompletableFuture<U> dst = null;
983 ThenCompose<T,U> d = null;
984 Object r;
985 if ((r = result) == null) {
986 dst = new CompletableFuture<U>();
987 CompletionNode p = new CompletionNode
988 (d = new ThenCompose<T,U>(this, fn, dst));
989 while ((r = result) == null) {
990 if (UNSAFE.compareAndSwapObject
991 (this, COMPLETIONS, p.next = completions, p))
992 break;
993 }
994 }
995 if (r != null && (d == null || d.compareAndSet(0, 1))) {
996 T t; Throwable ex;
997 if (r instanceof AltResult) {
998 ex = ((AltResult)r).ex;
999 t = null;
1000 }
1001 else {
1002 ex = null;
1003 t = (T) r;
1004 }
1005 if (ex == null) {
1006 try {
1007 dst = fn.apply(t);
1008 } catch (Throwable rex) {
1009 ex = rex;
1010 }
1011 }
1012 if (dst == null) {
1013 dst = new CompletableFuture<U>();
1014 if (ex == null)
1015 ex = new NullPointerException();
1016 }
1017 if (ex != null)
1018 dst.internalComplete(null, ex);
1019 }
1020 helpPostComplete();
1021 dst.helpPostComplete();
1022 return dst;
1023 }
1024
1025 /**
1026 * Creates and returns a CompletableFuture that is completed with
1027 * the result of the given function of the exception triggering
1028 * this CompletableFuture's completion when it completes
1029 * exceptionally; Otherwise, if this CompletableFuture completes
1030 * normally, then the returned CompletableFuture also completes
1031 * normally with the same value.
1032 *
1033 * @param fn the function to use to compute the value of the
1034 * returned CompletableFuture if this CompletableFuture completed
1035 * exceptionally
1036 * @return the new CompletableFuture
1037 */
1038 @SuppressWarnings("unchecked") public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
1039 if (fn == null) throw new NullPointerException();
1040 CompletableFuture<T> dst = new CompletableFuture<T>();
1041 ExceptionAction<T> d = null;
1042 Object r;
1043 if ((r = result) == null) {
1044 CompletionNode p =
1045 new CompletionNode(d = new ExceptionAction<T>(this, fn, dst));
1046 while ((r = result) == null) {
1047 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1048 p.next = completions, p))
1049 break;
1050 }
1051 }
1052 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1053 T t = null; Throwable ex, dx = null;
1054 if (r instanceof AltResult) {
1055 if ((ex = ((AltResult)r).ex) != null) {
1056 try {
1057 t = fn.apply(ex);
1058 } catch (Throwable rex) {
1059 dx = rex;
1060 }
1061 }
1062 }
1063 else
1064 t = (T) r;
1065 dst.internalComplete(t, dx);
1066 }
1067 helpPostComplete();
1068 return dst;
1069 }
1070
1071 /**
1072 * Creates and returns a CompletableFuture that is completed with
1073 * the result of the given function of the result and exception of
1074 * this CompletableFuture's completion when it completes. The
1075 * given function is invoked with the result (or {@code null} if
1076 * none) and the exception (or {@code null} if none) of this
1077 * CompletableFuture when complete.
1078 *
1079 * @param fn the function to use to compute the value of the
1080 * returned CompletableFuture
1081
1082 * @return the new CompletableFuture
1083 */
1084 @SuppressWarnings("unchecked") public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
1085 if (fn == null) throw new NullPointerException();
1086 CompletableFuture<U> dst = new CompletableFuture<U>();
1087 ThenHandle<T,U> d = null;
1088 Object r;
1089 if ((r = result) == null) {
1090 CompletionNode p =
1091 new CompletionNode(d = new ThenHandle<T,U>(this, fn, dst));
1092 while ((r = result) == null) {
1093 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1094 p.next = completions, p))
1095 break;
1096 }
1097 }
1098 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1099 T t; Throwable ex;
1100 if (r instanceof AltResult) {
1101 ex = ((AltResult)r).ex;
1102 t = null;
1103 }
1104 else {
1105 ex = null;
1106 t = (T) r;
1107 }
1108 U u = null; Throwable dx = null;
1109 try {
1110 u = fn.apply(t, ex);
1111 } catch (Throwable rex) {
1112 dx = rex;
1113 }
1114 dst.internalComplete(u, dx);
1115 }
1116 helpPostComplete();
1117 return dst;
1118 }
1119
1120 /**
1121 * Attempts to complete this CompletableFuture with
1122 * a {@link CancellationException}.
1123 *
1124 * @param mayInterruptIfRunning this value has no effect in this
1125 * implementation because interrupts are not used to control
1126 * processing.
1127 *
1128 * @return {@code true} if this task is now cancelled
1129 */
1130 public boolean cancel(boolean mayInterruptIfRunning) {
1131 Object r;
1132 while ((r = result) == null) {
1133 r = new AltResult(new CancellationException());
1134 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
1135 postComplete();
1136 return true;
1137 }
1138 }
1139 return ((r instanceof AltResult) &&
1140 (((AltResult)r).ex instanceof CancellationException));
1141 }
1142
1143 /**
1144 * Returns {@code true} if this CompletableFuture was cancelled
1145 * before it completed normally.
1146 *
1147 * @return {@code true} if this CompletableFuture was cancelled
1148 * before it completed normally
1149 */
1150 public boolean isCancelled() {
1151 Object r;
1152 return ((r = result) != null &&
1153 (r instanceof AltResult) &&
1154 (((AltResult)r).ex instanceof CancellationException));
1155 }
1156
1157 /**
1158 * Forcibly sets or resets the value subsequently returned by
1159 * method get() and related methods, whether or not already
1160 * completed. This method is designed for use only in error
1161 * recovery actions, and even in such situations may result in
1162 * ongoing dependent completions using established versus
1163 * overwritten values.
1164 *
1165 * @param value the completion value
1166 */
1167 public void obtrudeValue(T value) {
1168 result = (value == null) ? NIL : value;
1169 postComplete();
1170 }
1171
1172 /* ------------- waiting for completions -------------- */
1173
1174 /**
1175 * Heuristic spin value for waitingGet() before blocking on
1176 * multiprocessors
1177 */
1178 static final int WAITING_GET_SPINS = 256;
1179
1180 /**
1181 * Returns raw result after waiting, or null if interruptible and
1182 * interrupted.
1183 */
1184 private Object waitingGet(boolean interruptible) {
1185 WaitNode q = null;
1186 boolean queued = false, interrupted = false;
1187 int h = 0, spins = 0;
1188 for (Object r;;) {
1189 if ((r = result) != null) {
1190 Throwable ex;
1191 if (q != null) // suppress unpark
1192 q.thread = null;
1193 postComplete(); // help release others
1194 if (interrupted)
1195 Thread.currentThread().interrupt();
1196 return r;
1197 }
1198 else if (h == 0) {
1199 h = ThreadLocalRandom.current().nextInt();
1200 if (Runtime.getRuntime().availableProcessors() > 1)
1201 spins = WAITING_GET_SPINS;
1202 }
1203 else if (spins > 0) {
1204 h ^= h << 1; // xorshift
1205 h ^= h >>> 3;
1206 if ((h ^= h << 10) >= 0)
1207 --spins;
1208 }
1209 else if (q == null)
1210 q = new WaitNode();
1211 else if (!queued)
1212 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1213 q.next = waiters, q);
1214 else if (Thread.interrupted()) {
1215 if (interruptible) {
1216 removeWaiter(q);
1217 return null;
1218 }
1219 interrupted = true;
1220 }
1221 else if (q.thread == null)
1222 q.thread = Thread.currentThread();
1223 else
1224 LockSupport.park(this);
1225 }
1226 }
1227
1228 /**
1229 * Awaits completion or aborts on interrupt or timeout.
1230 *
1231 * @param nanos time to wait
1232 * @return raw result
1233 */
1234 private Object timedAwaitDone(long nanos)
1235 throws InterruptedException, TimeoutException {
1236 final long deadline = System.nanoTime() + nanos;
1237 WaitNode q = null;
1238 boolean queued = false;
1239 for (Object r;;) {
1240 if (Thread.interrupted()) {
1241 removeWaiter(q);
1242 throw new InterruptedException();
1243 }
1244 else if ((r = result) != null) {
1245 if (q != null)
1246 q.thread = null;
1247 postComplete();
1248 return r;
1249 }
1250 else if (q == null)
1251 q = new WaitNode();
1252 else if (!queued)
1253 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1254 q.next = waiters, q);
1255 else if ((nanos = deadline - System.nanoTime()) <= 0L) {
1256 removeWaiter(q);
1257 throw new TimeoutException();
1258 }
1259 else if (q.thread == null)
1260 q.thread = Thread.currentThread();
1261 else
1262 LockSupport.parkNanos(this, nanos);
1263 }
1264 }
1265
1266 /**
1267 * Tries to unlink a timed-out or interrupted wait node to avoid
1268 * accumulating garbage. Internal nodes are simply unspliced
1269 * without CAS since it is harmless if they are traversed anyway
1270 * by releasers. To avoid effects of unsplicing from already
1271 * removed nodes, the list is retraversed in case of an apparent
1272 * race. This is slow when there are a lot of nodes, but we don't
1273 * expect lists to be long enough to outweigh higher-overhead
1274 * schemes.
1275 */
1276 private void removeWaiter(WaitNode node) {
1277 if (node != null) {
1278 node.thread = null;
1279 retry:
1280 for (;;) { // restart on removeWaiter race
1281 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
1282 s = q.next;
1283 if (q.thread != null)
1284 pred = q;
1285 else if (pred != null) {
1286 pred.next = s;
1287 if (pred.thread == null) // check for race
1288 continue retry;
1289 }
1290 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
1291 continue retry;
1292 }
1293 break;
1294 }
1295 }
1296 }
1297
1298 /* ------------- Async tasks -------------- */
1299
1300 /** Base class can act as either FJ or plain Runnable */
1301 static abstract class Async extends ForkJoinTask<Void> implements Runnable {
1302 public final Void getRawResult() { return null; }
1303 public final void setRawResult(Void v) { }
1304 public final void run() { exec(); }
1305 }
1306
1307 static final class AsyncRunnable extends Async {
1308 final Runnable fn;
1309 final CompletableFuture<Void> dst;
1310 AsyncRunnable(Runnable fn, CompletableFuture<Void> dst) {
1311 this.fn = fn; this.dst = dst;
1312 }
1313 public final boolean exec() {
1314 CompletableFuture<Void> d; Throwable ex;
1315 if ((d = this.dst) != null) {
1316 try {
1317 fn.run();
1318 ex = null;
1319 } catch (Throwable rex) {
1320 ex = rex;
1321 }
1322 d.internalComplete(null, ex);
1323 }
1324 return true;
1325 }
1326 private static final long serialVersionUID = 5232453952276885070L;
1327 }
1328
1329 static final class AsyncSupplier<U> extends Async {
1330 final Supplier<U> fn;
1331 final CompletableFuture<U> dst;
1332 AsyncSupplier(Supplier<U> fn, CompletableFuture<U> dst) {
1333 this.fn = fn; this.dst = dst;
1334 }
1335 public final boolean exec() {
1336 CompletableFuture<U> d; U u; Throwable ex;
1337 if ((d = this.dst) != null) {
1338 try {
1339 u = fn.get();
1340 ex = null;
1341 } catch (Throwable rex) {
1342 ex = rex;
1343 u = null;
1344 }
1345 d.internalComplete(u, ex);
1346 }
1347 return true;
1348 }
1349 private static final long serialVersionUID = 5232453952276885070L;
1350 }
1351
1352 static final class AsyncFunction<T,U> extends Async {
1353 Function<? super T,? extends U> fn;
1354 T arg;
1355 final CompletableFuture<U> dst;
1356 AsyncFunction(T arg, Function<? super T,? extends U> fn,
1357 CompletableFuture<U> dst) {
1358 this.arg = arg; this.fn = fn; this.dst = dst;
1359 }
1360 public final boolean exec() {
1361 CompletableFuture<U> d; U u; Throwable ex;
1362 if ((d = this.dst) != null) {
1363 try {
1364 u = fn.apply(arg);
1365 ex = null;
1366 } catch (Throwable rex) {
1367 ex = rex;
1368 u = null;
1369 }
1370 d.internalComplete(u, ex);
1371 }
1372 return true;
1373 }
1374 private static final long serialVersionUID = 5232453952276885070L;
1375 }
1376
1377 static final class AsyncBiFunction<T,U,V> extends Async {
1378 final BiFunction<? super T,? super U,? extends V> fn;
1379 final T arg1;
1380 final U arg2;
1381 final CompletableFuture<V> dst;
1382 AsyncBiFunction(T arg1, U arg2,
1383 BiFunction<? super T,? super U,? extends V> fn,
1384 CompletableFuture<V> dst) {
1385 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1386 }
1387 public final boolean exec() {
1388 CompletableFuture<V> d; V v; Throwable ex;
1389 if ((d = this.dst) != null) {
1390 try {
1391 v = fn.apply(arg1, arg2);
1392 ex = null;
1393 } catch (Throwable rex) {
1394 ex = rex;
1395 v = null;
1396 }
1397 d.internalComplete(v, ex);
1398 }
1399 return true;
1400 }
1401 private static final long serialVersionUID = 5232453952276885070L;
1402 }
1403
1404 static final class AsyncBlock<T> extends Async {
1405 Block<? super T> fn;
1406 T arg;
1407 final CompletableFuture<Void> dst;
1408 AsyncBlock(T arg, Block<? super T> fn,
1409 CompletableFuture<Void> dst) {
1410 this.arg = arg; this.fn = fn; this.dst = dst;
1411 }
1412 public final boolean exec() {
1413 CompletableFuture<Void> d; Throwable ex;
1414 if ((d = this.dst) != null) {
1415 try {
1416 fn.accept(arg);
1417 ex = null;
1418 } catch (Throwable rex) {
1419 ex = rex;
1420 }
1421 d.internalComplete(null, ex);
1422 }
1423 return true;
1424 }
1425 private static final long serialVersionUID = 5232453952276885070L;
1426 }
1427
1428 static final class AsyncBiBlock<T,U> extends Async {
1429 final BiBlock<? super T,? super U> fn;
1430 final T arg1;
1431 final U arg2;
1432 final CompletableFuture<Void> dst;
1433 AsyncBiBlock(T arg1, U arg2,
1434 BiBlock<? super T,? super U> fn,
1435 CompletableFuture<Void> dst) {
1436 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1437 }
1438 public final boolean exec() {
1439 CompletableFuture<Void> d; Throwable ex;
1440 if ((d = this.dst) != null) {
1441 try {
1442 fn.accept(arg1, arg2);
1443 ex = null;
1444 } catch (Throwable rex) {
1445 ex = rex;
1446 }
1447 d.internalComplete(null, ex);
1448 }
1449 return true;
1450 }
1451 private static final long serialVersionUID = 5232453952276885070L;
1452 }
1453
1454 /* ------------- Completions -------------- */
1455
1456 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1457 static abstract class Completion extends AtomicInteger implements Runnable {
1458 }
1459
1460 static final class ThenFunction<T,U> extends Completion {
1461 final CompletableFuture<? extends T> src;
1462 final Function<? super T,? extends U> fn;
1463 final CompletableFuture<U> dst;
1464 final Executor executor;
1465 ThenFunction(CompletableFuture<? extends T> src,
1466 final Function<? super T,? extends U> fn,
1467 final CompletableFuture<U> dst, Executor executor) {
1468 this.src = src; this.fn = fn; this.dst = dst;
1469 this.executor = executor;
1470 }
1471 @SuppressWarnings("unchecked") public final void run() {
1472 CompletableFuture<? extends T> a;
1473 Function<? super T,? extends U> fn;
1474 CompletableFuture<U> dst;
1475 Object r; T t; Throwable ex;
1476 if ((dst = this.dst) != null &&
1477 (fn = this.fn) != null &&
1478 (a = this.src) != null &&
1479 (r = a.result) != null &&
1480 compareAndSet(0, 1)) {
1481 if (r instanceof AltResult) {
1482 ex = ((AltResult)r).ex;
1483 t = null;
1484 }
1485 else {
1486 ex = null;
1487 t = (T) r;
1488 }
1489 Executor e = executor;
1490 U u = null;
1491 if (ex == null) {
1492 try {
1493 if (e != null)
1494 e.execute(new AsyncFunction<T,U>(t, fn, dst));
1495 else
1496 u = fn.apply(t);
1497 } catch (Throwable rex) {
1498 ex = rex;
1499 }
1500 }
1501 if (e == null || ex != null)
1502 dst.internalComplete(u, ex);
1503 }
1504 }
1505 private static final long serialVersionUID = 5232453952276885070L;
1506 }
1507
1508 static final class ThenBlock<T> extends Completion {
1509 final CompletableFuture<? extends T> src;
1510 final Block<? super T> fn;
1511 final CompletableFuture<Void> dst;
1512 final Executor executor;
1513 ThenBlock(CompletableFuture<? extends T> src,
1514 final Block<? super T> fn,
1515 final CompletableFuture<Void> dst, Executor executor) {
1516 this.src = src; this.fn = fn; this.dst = dst;
1517 this.executor = executor;
1518 }
1519 @SuppressWarnings("unchecked") public final void run() {
1520 CompletableFuture<? extends T> a;
1521 Block<? super T> fn;
1522 CompletableFuture<Void> dst;
1523 Object r; T t; Throwable ex;
1524 if ((dst = this.dst) != null &&
1525 (fn = this.fn) != null &&
1526 (a = this.src) != null &&
1527 (r = a.result) != null &&
1528 compareAndSet(0, 1)) {
1529 if (r instanceof AltResult) {
1530 ex = ((AltResult)r).ex;
1531 t = null;
1532 }
1533 else {
1534 ex = null;
1535 t = (T) r;
1536 }
1537 Executor e = executor;
1538 if (ex == null) {
1539 try {
1540 if (e != null)
1541 e.execute(new AsyncBlock<T>(t, fn, dst));
1542 else
1543 fn.accept(t);
1544 } catch (Throwable rex) {
1545 ex = rex;
1546 }
1547 }
1548 if (e == null || ex != null)
1549 dst.internalComplete(null, ex);
1550 }
1551 }
1552 private static final long serialVersionUID = 5232453952276885070L;
1553 }
1554
1555 static final class ThenRunnable<T> extends Completion {
1556 final CompletableFuture<? extends T> src;
1557 final Runnable fn;
1558 final CompletableFuture<Void> dst;
1559 final Executor executor;
1560 ThenRunnable(CompletableFuture<? extends T> src,
1561 Runnable fn,
1562 CompletableFuture<Void> dst,
1563 Executor executor) {
1564 this.src = src; this.fn = fn; this.dst = dst;
1565 this.executor = executor;
1566 }
1567 @SuppressWarnings("unchecked") public final void run() {
1568 CompletableFuture<? extends T> a;
1569 Runnable fn;
1570 CompletableFuture<Void> dst;
1571 Object r; Throwable ex;
1572 if ((dst = this.dst) != null &&
1573 (fn = this.fn) != null &&
1574 (a = this.src) != null &&
1575 (r = a.result) != null &&
1576 compareAndSet(0, 1)) {
1577 if (r instanceof AltResult)
1578 ex = ((AltResult)r).ex;
1579 else
1580 ex = null;
1581 Executor e = executor;
1582 if (ex == null) {
1583 try {
1584 if (e != null)
1585 e.execute(new AsyncRunnable(fn, dst));
1586 else
1587 fn.run();
1588 } catch (Throwable rex) {
1589 ex = rex;
1590 }
1591 }
1592 if (e == null || ex != null)
1593 dst.internalComplete(null, ex);
1594 }
1595 }
1596 private static final long serialVersionUID = 5232453952276885070L;
1597 }
1598
1599 static final class AndFunction<T,U,V> extends Completion {
1600 final CompletableFuture<? extends T> src;
1601 final CompletableFuture<? extends U> snd;
1602 final BiFunction<? super T,? super U,? extends V> fn;
1603 final CompletableFuture<V> dst;
1604 final Executor executor;
1605 AndFunction(CompletableFuture<? extends T> src,
1606 CompletableFuture<? extends U> snd,
1607 BiFunction<? super T,? super U,? extends V> fn,
1608 CompletableFuture<V> dst, Executor executor) {
1609 this.src = src; this.snd = snd;
1610 this.fn = fn; this.dst = dst;
1611 this.executor = executor;
1612 }
1613 @SuppressWarnings("unchecked") public final void run() {
1614 Object r, s; T t; U u; Throwable ex;
1615 CompletableFuture<? extends T> a;
1616 CompletableFuture<? extends U> b;
1617 BiFunction<? super T,? super U,? extends V> fn;
1618 CompletableFuture<V> dst;
1619 if ((dst = this.dst) != null &&
1620 (fn = this.fn) != null &&
1621 (a = this.src) != null &&
1622 (r = a.result) != null &&
1623 (b = this.snd) != null &&
1624 (s = b.result) != null &&
1625 compareAndSet(0, 1)) {
1626 if (r instanceof AltResult) {
1627 ex = ((AltResult)r).ex;
1628 t = null;
1629 }
1630 else {
1631 ex = null;
1632 t = (T) r;
1633 }
1634 if (ex != null)
1635 u = null;
1636 else if (s instanceof AltResult) {
1637 ex = ((AltResult)s).ex;
1638 u = null;
1639 }
1640 else
1641 u = (U) s;
1642 Executor e = executor;
1643 V v = null;
1644 if (ex == null) {
1645 try {
1646 if (e != null)
1647 e.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1648 else
1649 v = fn.apply(t, u);
1650 } catch (Throwable rex) {
1651 ex = rex;
1652 }
1653 }
1654 if (e == null || ex != null)
1655 dst.internalComplete(v, ex);
1656 }
1657 }
1658 private static final long serialVersionUID = 5232453952276885070L;
1659 }
1660
1661 static final class AndBlock<T,U> extends Completion {
1662 final CompletableFuture<? extends T> src;
1663 final CompletableFuture<? extends U> snd;
1664 final BiBlock<? super T,? super U> fn;
1665 final CompletableFuture<Void> dst;
1666 final Executor executor;
1667 AndBlock(CompletableFuture<? extends T> src,
1668 CompletableFuture<? extends U> snd,
1669 BiBlock<? super T,? super U> fn,
1670 CompletableFuture<Void> dst, Executor executor) {
1671 this.src = src; this.snd = snd;
1672 this.fn = fn; this.dst = dst;
1673 this.executor = executor;
1674 }
1675 @SuppressWarnings("unchecked") public final void run() {
1676 Object r, s; T t; U u; Throwable ex;
1677 CompletableFuture<? extends T> a;
1678 CompletableFuture<? extends U> b;
1679 BiBlock<? super T,? super U> fn;
1680 CompletableFuture<Void> dst;
1681 if ((dst = this.dst) != null &&
1682 (fn = this.fn) != null &&
1683 (a = this.src) != null &&
1684 (r = a.result) != null &&
1685 (b = this.snd) != null &&
1686 (s = b.result) != null &&
1687 compareAndSet(0, 1)) {
1688 if (r instanceof AltResult) {
1689 ex = ((AltResult)r).ex;
1690 t = null;
1691 }
1692 else {
1693 ex = null;
1694 t = (T) r;
1695 }
1696 if (ex != null)
1697 u = null;
1698 else if (s instanceof AltResult) {
1699 ex = ((AltResult)s).ex;
1700 u = null;
1701 }
1702 else
1703 u = (U) s;
1704 Executor e = executor;
1705 if (ex == null) {
1706 try {
1707 if (e != null)
1708 e.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
1709 else
1710 fn.accept(t, u);
1711 } catch (Throwable rex) {
1712 ex = rex;
1713 }
1714 }
1715 if (e == null || ex != null)
1716 dst.internalComplete(null, ex);
1717 }
1718 }
1719 private static final long serialVersionUID = 5232453952276885070L;
1720 }
1721
1722 static final class AndRunnable<T> extends Completion {
1723 final CompletableFuture<? extends T> src;
1724 final CompletableFuture<?> snd;
1725 final Runnable fn;
1726 final CompletableFuture<Void> dst;
1727 final Executor executor;
1728 AndRunnable(CompletableFuture<? extends T> src,
1729 CompletableFuture<?> snd,
1730 Runnable fn,
1731 CompletableFuture<Void> dst, Executor executor) {
1732 this.src = src; this.snd = snd;
1733 this.fn = fn; this.dst = dst;
1734 this.executor = executor;
1735 }
1736 @SuppressWarnings("unchecked") public final void run() {
1737 Object r, s; Throwable ex;
1738 final CompletableFuture<? extends T> a;
1739 final CompletableFuture<?> b;
1740 final Runnable fn;
1741 final CompletableFuture<Void> dst;
1742 if ((dst = this.dst) != null &&
1743 (fn = this.fn) != null &&
1744 (a = this.src) != null &&
1745 (r = a.result) != null &&
1746 (b = this.snd) != null &&
1747 (s = b.result) != null &&
1748 compareAndSet(0, 1)) {
1749 if (r instanceof AltResult)
1750 ex = ((AltResult)r).ex;
1751 else
1752 ex = null;
1753 if (ex == null && (s instanceof AltResult))
1754 ex = ((AltResult)s).ex;
1755 Executor e = executor;
1756 if (ex == null) {
1757 try {
1758 if (e != null)
1759 e.execute(new AsyncRunnable(fn, dst));
1760 else
1761 fn.run();
1762 } catch (Throwable rex) {
1763 ex = rex;
1764 }
1765 }
1766 if (e == null || ex != null)
1767 dst.internalComplete(null, ex);
1768 }
1769 }
1770 private static final long serialVersionUID = 5232453952276885070L;
1771 }
1772
1773 static final class OrFunction<T,U> extends Completion {
1774 final CompletableFuture<? extends T> src;
1775 final CompletableFuture<? extends T> snd;
1776 final Function<? super T,? extends U> fn;
1777 final CompletableFuture<U> dst;
1778 final Executor executor;
1779 OrFunction(CompletableFuture<? extends T> src,
1780 CompletableFuture<? extends T> snd,
1781 Function<? super T,? extends U> fn,
1782 CompletableFuture<U> dst, Executor executor) {
1783 this.src = src; this.snd = snd;
1784 this.fn = fn; this.dst = dst;
1785 this.executor = executor;
1786 }
1787 @SuppressWarnings("unchecked") public final void run() {
1788 Object r; T t; Throwable ex;
1789 CompletableFuture<? extends T> a;
1790 CompletableFuture<? extends T> b;
1791 Function<? super T,? extends U> fn;
1792 CompletableFuture<U> dst;
1793 if ((dst = this.dst) != null &&
1794 (fn = this.fn) != null &&
1795 (((a = this.src) != null && (r = a.result) != null) ||
1796 ((b = this.snd) != null && (r = b.result) != null)) &&
1797 compareAndSet(0, 1)) {
1798 if (r instanceof AltResult) {
1799 ex = ((AltResult)r).ex;
1800 t = null;
1801 }
1802 else {
1803 ex = null;
1804 t = (T) r;
1805 }
1806 Executor e = executor;
1807 U u = null;
1808 if (ex == null) {
1809 try {
1810 if (e != null)
1811 e.execute(new AsyncFunction<T,U>(t, fn, dst));
1812 else
1813 u = fn.apply(t);
1814 } catch (Throwable rex) {
1815 ex = rex;
1816 }
1817 }
1818 if (e == null || ex != null)
1819 dst.internalComplete(u, ex);
1820 }
1821 }
1822 private static final long serialVersionUID = 5232453952276885070L;
1823 }
1824
1825 static final class OrBlock<T> extends Completion {
1826 final CompletableFuture<? extends T> src;
1827 final CompletableFuture<? extends T> snd;
1828 final Block<? super T> fn;
1829 final CompletableFuture<Void> dst;
1830 final Executor executor;
1831 OrBlock(CompletableFuture<? extends T> src,
1832 CompletableFuture<? extends T> snd,
1833 Block<? super T> fn,
1834 CompletableFuture<Void> dst, Executor executor) {
1835 this.src = src; this.snd = snd;
1836 this.fn = fn; this.dst = dst;
1837 this.executor = executor;
1838 }
1839 @SuppressWarnings("unchecked") public final void run() {
1840 Object r; T t; Throwable ex;
1841 CompletableFuture<? extends T> a;
1842 CompletableFuture<? extends T> b;
1843 Block<? super T> fn;
1844 CompletableFuture<Void> dst;
1845 if ((dst = this.dst) != null &&
1846 (fn = this.fn) != null &&
1847 (((a = this.src) != null && (r = a.result) != null) ||
1848 ((b = this.snd) != null && (r = b.result) != null)) &&
1849 compareAndSet(0, 1)) {
1850 if (r instanceof AltResult) {
1851 ex = ((AltResult)r).ex;
1852 t = null;
1853 }
1854 else {
1855 ex = null;
1856 t = (T) r;
1857 }
1858 Executor e = executor;
1859 if (ex == null) {
1860 try {
1861 if (e != null)
1862 e.execute(new AsyncBlock<T>(t, fn, dst));
1863 else
1864 fn.accept(t);
1865 } catch (Throwable rex) {
1866 ex = rex;
1867 }
1868 }
1869 if (e == null || ex != null)
1870 dst.internalComplete(null, ex);
1871 }
1872 }
1873 private static final long serialVersionUID = 5232453952276885070L;
1874 }
1875
1876 static final class OrRunnable<T> extends Completion {
1877 final CompletableFuture<? extends T> src;
1878 final CompletableFuture<?> snd;
1879 final Runnable fn;
1880 final CompletableFuture<Void> dst;
1881 final Executor executor;
1882 OrRunnable(CompletableFuture<? extends T> src,
1883 CompletableFuture<?> snd,
1884 Runnable fn,
1885 CompletableFuture<Void> dst, Executor executor) {
1886 this.src = src; this.snd = snd;
1887 this.fn = fn; this.dst = dst;
1888 this.executor = executor;
1889 }
1890 @SuppressWarnings("unchecked") public final void run() {
1891 Object r; Throwable ex;
1892 CompletableFuture<? extends T> a;
1893 final CompletableFuture<?> b;
1894 final Runnable fn;
1895 final CompletableFuture<Void> dst;
1896 if ((dst = this.dst) != null &&
1897 (fn = this.fn) != null &&
1898 (((a = this.src) != null && (r = a.result) != null) ||
1899 ((b = this.snd) != null && (r = b.result) != null)) &&
1900 compareAndSet(0, 1)) {
1901 if (r instanceof AltResult)
1902 ex = ((AltResult)r).ex;
1903 else
1904 ex = null;
1905 Executor e = executor;
1906 if (ex == null) {
1907 try {
1908 if (e != null)
1909 e.execute(new AsyncRunnable(fn, dst));
1910 else
1911 fn.run();
1912 } catch (Throwable rex) {
1913 ex = rex;
1914 }
1915 }
1916 if (e == null || ex != null)
1917 dst.internalComplete(null, ex);
1918 }
1919 }
1920 private static final long serialVersionUID = 5232453952276885070L;
1921 }
1922
1923 static final class ExceptionAction<T> extends Completion {
1924 final CompletableFuture<? extends T> src;
1925 final Function<? super Throwable, ? extends T> fn;
1926 final CompletableFuture<T> dst;
1927 ExceptionAction(CompletableFuture<? extends T> src,
1928 Function<? super Throwable, ? extends T> fn,
1929 CompletableFuture<T> dst) {
1930 this.src = src; this.fn = fn; this.dst = dst;
1931 }
1932 @SuppressWarnings("unchecked") public final void run() {
1933 CompletableFuture<? extends T> a;
1934 Function<? super Throwable, ? extends T> fn;
1935 CompletableFuture<T> dst;
1936 Object r; T t = null; Throwable ex, dx = null;
1937 if ((dst = this.dst) != null &&
1938 (fn = this.fn) != null &&
1939 (a = this.src) != null &&
1940 (r = a.result) != null &&
1941 compareAndSet(0, 1)) {
1942 if ((r instanceof AltResult) &&
1943 (ex = ((AltResult)r).ex) != null) {
1944 try {
1945 t = fn.apply(ex);
1946 } catch (Throwable rex) {
1947 dx = rex;
1948 }
1949 }
1950 else
1951 t = (T) r;
1952 dst.internalComplete(t, dx);
1953 }
1954 }
1955 private static final long serialVersionUID = 5232453952276885070L;
1956 }
1957
1958 static final class ThenCopy<T> extends Completion {
1959 final CompletableFuture<? extends T> src;
1960 final CompletableFuture<T> dst;
1961 ThenCopy(CompletableFuture<? extends T> src,
1962 CompletableFuture<T> dst) {
1963 this.src = src; this.dst = dst;
1964 }
1965 @SuppressWarnings("unchecked") public final void run() {
1966 CompletableFuture<? extends T> a;
1967 CompletableFuture<T> dst;
1968 Object r; Object t; Throwable ex;
1969 if ((dst = this.dst) != null &&
1970 (a = this.src) != null &&
1971 (r = a.result) != null &&
1972 compareAndSet(0, 1)) {
1973 if (r instanceof AltResult) {
1974 ex = ((AltResult)r).ex;
1975 t = null;
1976 }
1977 else {
1978 ex = null;
1979 t = r;
1980 }
1981 dst.internalComplete(t, ex);
1982 }
1983 }
1984 private static final long serialVersionUID = 5232453952276885070L;
1985 }
1986
1987 static final class ThenHandle<T,U> extends Completion {
1988 final CompletableFuture<? extends T> src;
1989 final BiFunction<? super T, Throwable, ? extends U> fn;
1990 final CompletableFuture<U> dst;
1991 ThenHandle(CompletableFuture<? extends T> src,
1992 BiFunction<? super T, Throwable, ? extends U> fn,
1993 final CompletableFuture<U> dst) {
1994 this.src = src; this.fn = fn; this.dst = dst;
1995 }
1996 @SuppressWarnings("unchecked") public final void run() {
1997 CompletableFuture<? extends T> a;
1998 BiFunction<? super T, Throwable, ? extends U> fn;
1999 CompletableFuture<U> dst;
2000 Object r; T t; Throwable ex;
2001 if ((dst = this.dst) != null &&
2002 (fn = this.fn) != null &&
2003 (a = this.src) != null &&
2004 (r = a.result) != null &&
2005 compareAndSet(0, 1)) {
2006 if (r instanceof AltResult) {
2007 ex = ((AltResult)r).ex;
2008 t = null;
2009 }
2010 else {
2011 ex = null;
2012 t = (T) r;
2013 }
2014 U u = null; Throwable dx = null;
2015 try {
2016 u = fn.apply(t, ex);
2017 } catch (Throwable rex) {
2018 dx = rex;
2019 }
2020 dst.internalComplete(u, dx);
2021 }
2022 }
2023 private static final long serialVersionUID = 5232453952276885070L;
2024 }
2025
2026 static final class ThenCompose<T,U> extends Completion {
2027 final CompletableFuture<? extends T> src;
2028 final Function<? super T, CompletableFuture<U>> fn;
2029 final CompletableFuture<U> dst;
2030 ThenCompose(CompletableFuture<? extends T> src,
2031 Function<? super T, CompletableFuture<U>> fn,
2032 final CompletableFuture<U> dst) {
2033 this.src = src; this.fn = fn; this.dst = dst;
2034 }
2035 @SuppressWarnings("unchecked") public final void run() {
2036 CompletableFuture<? extends T> a;
2037 Function<? super T, CompletableFuture<U>> fn;
2038 CompletableFuture<U> dst;
2039 Object r; T t; Throwable ex;
2040 if ((dst = this.dst) != null &&
2041 (fn = this.fn) != null &&
2042 (a = this.src) != null &&
2043 (r = a.result) != null &&
2044 compareAndSet(0, 1)) {
2045 if (r instanceof AltResult) {
2046 ex = ((AltResult)r).ex;
2047 t = null;
2048 }
2049 else {
2050 ex = null;
2051 t = (T) r;
2052 }
2053 CompletableFuture<U> c = null;
2054 U u = null;
2055 boolean complete = false;
2056 if (ex == null) {
2057 try {
2058 c = fn.apply(t);
2059 } catch (Throwable rex) {
2060 ex = rex;
2061 }
2062 }
2063 if (ex != null || c == null) {
2064 if (ex == null)
2065 ex = new NullPointerException();
2066 }
2067 else {
2068 ThenCopy<U> d = null;
2069 Object s;
2070 if ((s = c.result) == null) {
2071 CompletionNode p = new CompletionNode
2072 (d = new ThenCopy<U>(c, dst));
2073 while ((s = c.result) == null) {
2074 if (UNSAFE.compareAndSwapObject
2075 (c, COMPLETIONS, p.next = c.completions, p))
2076 break;
2077 }
2078 }
2079 if (s != null && (d == null || d.compareAndSet(0, 1))) {
2080 complete = true;
2081 if (s instanceof AltResult) {
2082 ex = ((AltResult)s).ex; // no rewrap
2083 u = null;
2084 }
2085 else
2086 u = (U) s;
2087 }
2088 }
2089 if (complete || ex != null)
2090 dst.internalComplete(u, ex);
2091 if (c != null)
2092 c.helpPostComplete();
2093 }
2094 }
2095 private static final long serialVersionUID = 5232453952276885070L;
2096 }
2097
2098 /* ------------- then/and/or implementations -------------- */
2099
2100 @SuppressWarnings("unchecked") private <U> CompletableFuture<U> thenFunction
2101 (Function<? super T,? extends U> fn,
2102 Executor e) {
2103 if (fn == null) throw new NullPointerException();
2104 CompletableFuture<U> dst = new CompletableFuture<U>();
2105 ThenFunction<T,U> d = null;
2106 Object r;
2107 if ((r = result) == null) {
2108 CompletionNode p = new CompletionNode
2109 (d = new ThenFunction<T,U>(this, fn, dst, e));
2110 while ((r = result) == null) {
2111 if (UNSAFE.compareAndSwapObject
2112 (this, COMPLETIONS, p.next = completions, p))
2113 break;
2114 }
2115 }
2116 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2117 T t; Throwable ex;
2118 if (r instanceof AltResult) {
2119 ex = ((AltResult)r).ex;
2120 t = null;
2121 }
2122 else {
2123 ex = null;
2124 t = (T) r;
2125 }
2126 U u = null;
2127 if (ex == null) {
2128 try {
2129 if (e != null)
2130 e.execute(new AsyncFunction<T,U>(t, fn, dst));
2131 else
2132 u = fn.apply(t);
2133 } catch (Throwable rex) {
2134 ex = rex;
2135 }
2136 }
2137 if (e == null || ex != null)
2138 dst.internalComplete(u, ex);
2139 }
2140 helpPostComplete();
2141 return dst;
2142 }
2143
2144 @SuppressWarnings("unchecked") private CompletableFuture<Void> thenBlock
2145 (Block<? super T> fn,
2146 Executor e) {
2147 if (fn == null) throw new NullPointerException();
2148 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2149 ThenBlock<T> d = null;
2150 Object r;
2151 if ((r = result) == null) {
2152 CompletionNode p = new CompletionNode
2153 (d = new ThenBlock<T>(this, fn, dst, e));
2154 while ((r = result) == null) {
2155 if (UNSAFE.compareAndSwapObject
2156 (this, COMPLETIONS, p.next = completions, p))
2157 break;
2158 }
2159 }
2160 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2161 T t; Throwable ex;
2162 if (r instanceof AltResult) {
2163 ex = ((AltResult)r).ex;
2164 t = null;
2165 }
2166 else {
2167 ex = null;
2168 t = (T) r;
2169 }
2170 if (ex == null) {
2171 try {
2172 if (e != null)
2173 e.execute(new AsyncBlock<T>(t, fn, dst));
2174 else
2175 fn.accept(t);
2176 } catch (Throwable rex) {
2177 ex = rex;
2178 }
2179 }
2180 if (e == null || ex != null)
2181 dst.internalComplete(null, ex);
2182 }
2183 helpPostComplete();
2184 return dst;
2185 }
2186
2187 @SuppressWarnings("unchecked") private CompletableFuture<Void> thenRunnable
2188 (Runnable action,
2189 Executor e) {
2190 if (action == null) throw new NullPointerException();
2191 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2192 ThenRunnable<T> d = null;
2193 Object r;
2194 if ((r = result) == null) {
2195 CompletionNode p = new CompletionNode
2196 (d = new ThenRunnable<T>(this, action, dst, e));
2197 while ((r = result) == null) {
2198 if (UNSAFE.compareAndSwapObject
2199 (this, COMPLETIONS, p.next = completions, p))
2200 break;
2201 }
2202 }
2203 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2204 Throwable ex;
2205 if (r instanceof AltResult)
2206 ex = ((AltResult)r).ex;
2207 else
2208 ex = null;
2209 if (ex == null) {
2210 try {
2211 if (e != null)
2212 e.execute(new AsyncRunnable(action, dst));
2213 else
2214 action.run();
2215 } catch (Throwable rex) {
2216 ex = rex;
2217 }
2218 }
2219 if (e == null || ex != null)
2220 dst.internalComplete(null, ex);
2221 }
2222 helpPostComplete();
2223 return dst;
2224 }
2225
2226 @SuppressWarnings("unchecked") private <U,V> CompletableFuture<V> andFunction
2227 (CompletableFuture<? extends U> other,
2228 BiFunction<? super T,? super U,? extends V> fn,
2229 Executor e) {
2230 if (other == null || fn == null) throw new NullPointerException();
2231 CompletableFuture<V> dst = new CompletableFuture<V>();
2232 AndFunction<T,U,V> d = null;
2233 Object r, s = null;
2234 if ((r = result) == null || (s = other.result) == null) {
2235 d = new AndFunction<T,U,V>(this, other, fn, dst, e);
2236 CompletionNode q = null, p = new CompletionNode(d);
2237 while ((r == null && (r = result) == null) ||
2238 (s == null && (s = other.result) == null)) {
2239 if (q != null) {
2240 if (s != null ||
2241 UNSAFE.compareAndSwapObject
2242 (other, COMPLETIONS, q.next = other.completions, q))
2243 break;
2244 }
2245 else if (r != null ||
2246 UNSAFE.compareAndSwapObject
2247 (this, COMPLETIONS, p.next = completions, p)) {
2248 if (s != null)
2249 break;
2250 q = new CompletionNode(d);
2251 }
2252 }
2253 }
2254 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2255 T t; U u; Throwable ex;
2256 if (r instanceof AltResult) {
2257 ex = ((AltResult)r).ex;
2258 t = null;
2259 }
2260 else {
2261 ex = null;
2262 t = (T) r;
2263 }
2264 if (ex != null)
2265 u = null;
2266 else if (s instanceof AltResult) {
2267 ex = ((AltResult)s).ex;
2268 u = null;
2269 }
2270 else
2271 u = (U) s;
2272 V v = null;
2273 if (ex == null) {
2274 try {
2275 if (e != null)
2276 e.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
2277 else
2278 v = fn.apply(t, u);
2279 } catch (Throwable rex) {
2280 ex = rex;
2281 }
2282 }
2283 if (e == null || ex != null)
2284 dst.internalComplete(v, ex);
2285 }
2286 helpPostComplete();
2287 other.helpPostComplete();
2288 return dst;
2289 }
2290
2291 @SuppressWarnings("unchecked") private <U> CompletableFuture<Void> andBlock
2292 (CompletableFuture<? extends U> other,
2293 BiBlock<? super T,? super U> fn,
2294 Executor e) {
2295 if (other == null || fn == null) throw new NullPointerException();
2296 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2297 AndBlock<T,U> d = null;
2298 Object r, s = null;
2299 if ((r = result) == null || (s = other.result) == null) {
2300 d = new AndBlock<T,U>(this, other, fn, dst, e);
2301 CompletionNode q = null, p = new CompletionNode(d);
2302 while ((r == null && (r = result) == null) ||
2303 (s == null && (s = other.result) == null)) {
2304 if (q != null) {
2305 if (s != null ||
2306 UNSAFE.compareAndSwapObject
2307 (other, COMPLETIONS, q.next = other.completions, q))
2308 break;
2309 }
2310 else if (r != null ||
2311 UNSAFE.compareAndSwapObject
2312 (this, COMPLETIONS, p.next = completions, p)) {
2313 if (s != null)
2314 break;
2315 q = new CompletionNode(d);
2316 }
2317 }
2318 }
2319 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2320 T t; U u; Throwable ex;
2321 if (r instanceof AltResult) {
2322 ex = ((AltResult)r).ex;
2323 t = null;
2324 }
2325 else {
2326 ex = null;
2327 t = (T) r;
2328 }
2329 if (ex != null)
2330 u = null;
2331 else if (s instanceof AltResult) {
2332 ex = ((AltResult)s).ex;
2333 u = null;
2334 }
2335 else
2336 u = (U) s;
2337 if (ex == null) {
2338 try {
2339 if (e != null)
2340 e.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
2341 else
2342 fn.accept(t, u);
2343 } catch (Throwable rex) {
2344 ex = rex;
2345 }
2346 }
2347 if (e == null || ex != null)
2348 dst.internalComplete(null, ex);
2349 }
2350 helpPostComplete();
2351 other.helpPostComplete();
2352 return dst;
2353 }
2354
2355 @SuppressWarnings("unchecked") private CompletableFuture<Void> andRunnable
2356 (CompletableFuture<?> other,
2357 Runnable action,
2358 Executor e) {
2359 if (other == null || action == null) throw new NullPointerException();
2360 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2361 AndRunnable<T> d = null;
2362 Object r, s = null;
2363 if ((r = result) == null || (s = other.result) == null) {
2364 d = new AndRunnable<T>(this, other, action, dst, e);
2365 CompletionNode q = null, p = new CompletionNode(d);
2366 while ((r == null && (r = result) == null) ||
2367 (s == null && (s = other.result) == null)) {
2368 if (q != null) {
2369 if (s != null ||
2370 UNSAFE.compareAndSwapObject
2371 (other, COMPLETIONS, q.next = other.completions, q))
2372 break;
2373 }
2374 else if (r != null ||
2375 UNSAFE.compareAndSwapObject
2376 (this, COMPLETIONS, p.next = completions, p)) {
2377 if (s != null)
2378 break;
2379 q = new CompletionNode(d);
2380 }
2381 }
2382 }
2383 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2384 Throwable ex;
2385 if (r instanceof AltResult)
2386 ex = ((AltResult)r).ex;
2387 else
2388 ex = null;
2389 if (ex == null && (s instanceof AltResult))
2390 ex = ((AltResult)s).ex;
2391 if (ex == null) {
2392 try {
2393 if (e != null)
2394 e.execute(new AsyncRunnable(action, dst));
2395 else
2396 action.run();
2397 } catch (Throwable rex) {
2398 ex = rex;
2399 }
2400 }
2401 if (e == null || ex != null)
2402 dst.internalComplete(null, ex);
2403 }
2404 helpPostComplete();
2405 other.helpPostComplete();
2406 return dst;
2407 }
2408
2409 @SuppressWarnings("unchecked") private <U> CompletableFuture<U> orFunction
2410 (CompletableFuture<? extends T> other,
2411 Function<? super T, U> fn,
2412 Executor e) {
2413 if (other == null || fn == null) throw new NullPointerException();
2414 CompletableFuture<U> dst = new CompletableFuture<U>();
2415 OrFunction<T,U> d = null;
2416 Object r;
2417 if ((r = result) == null && (r = other.result) == null) {
2418 d = new OrFunction<T,U>(this, other, fn, dst, e);
2419 CompletionNode q = null, p = new CompletionNode(d);
2420 while ((r = result) == null && (r = other.result) == null) {
2421 if (q != null) {
2422 if (UNSAFE.compareAndSwapObject
2423 (other, COMPLETIONS, q.next = other.completions, q))
2424 break;
2425 }
2426 else if (UNSAFE.compareAndSwapObject
2427 (this, COMPLETIONS, p.next = completions, p))
2428 q = new CompletionNode(d);
2429 }
2430 }
2431 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2432 T t; Throwable ex;
2433 if (r instanceof AltResult) {
2434 ex = ((AltResult)r).ex;
2435 t = null;
2436 }
2437 else {
2438 ex = null;
2439 t = (T) r;
2440 }
2441 U u = null;
2442 if (ex == null) {
2443 try {
2444 if (e != null)
2445 e.execute(new AsyncFunction<T,U>(t, fn, dst));
2446 else
2447 u = fn.apply(t);
2448 } catch (Throwable rex) {
2449 ex = rex;
2450 }
2451 }
2452 if (e == null || ex != null)
2453 dst.internalComplete(u, ex);
2454 }
2455 helpPostComplete();
2456 other.helpPostComplete();
2457 return dst;
2458 }
2459
2460 @SuppressWarnings("unchecked") private CompletableFuture<Void> orBlock
2461 (CompletableFuture<? extends T> other,
2462 Block<? super T> fn,
2463 Executor e) {
2464 if (other == null || fn == null) throw new NullPointerException();
2465 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2466 OrBlock<T> d = null;
2467 Object r;
2468 if ((r = result) == null && (r = other.result) == null) {
2469 d = new OrBlock<T>(this, other, fn, dst, e);
2470 CompletionNode q = null, p = new CompletionNode(d);
2471 while ((r = result) == null && (r = other.result) == null) {
2472 if (q != null) {
2473 if (UNSAFE.compareAndSwapObject
2474 (other, COMPLETIONS, q.next = other.completions, q))
2475 break;
2476 }
2477 else if (UNSAFE.compareAndSwapObject
2478 (this, COMPLETIONS, p.next = completions, p))
2479 q = new CompletionNode(d);
2480 }
2481 }
2482 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2483 T t; Throwable ex;
2484 if (r instanceof AltResult) {
2485 ex = ((AltResult)r).ex;
2486 t = null;
2487 }
2488 else {
2489 ex = null;
2490 t = (T) r;
2491 }
2492 if (ex == null) {
2493 try {
2494 if (e != null)
2495 e.execute(new AsyncBlock<T>(t, fn, dst));
2496 else
2497 fn.accept(t);
2498 } catch (Throwable rex) {
2499 ex = rex;
2500 }
2501 }
2502 if (e == null || ex != null)
2503 dst.internalComplete(null, ex);
2504 }
2505 helpPostComplete();
2506 other.helpPostComplete();
2507 return dst;
2508 }
2509
2510 @SuppressWarnings("unchecked") private CompletableFuture<Void> orRunnable
2511 (CompletableFuture<?> other,
2512 Runnable action,
2513 Executor e) {
2514 if (other == null || action == null) throw new NullPointerException();
2515 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2516 OrRunnable<T> d = null;
2517 Object r;
2518 if ((r = result) == null && (r = other.result) == null) {
2519 d = new OrRunnable<T>(this, other, action, dst, e);
2520 CompletionNode q = null, p = new CompletionNode(d);
2521 while ((r = result) == null && (r = other.result) == null) {
2522 if (q != null) {
2523 if (UNSAFE.compareAndSwapObject
2524 (other, COMPLETIONS, q.next = other.completions, q))
2525 break;
2526 }
2527 else if (UNSAFE.compareAndSwapObject
2528 (this, COMPLETIONS, p.next = completions, p))
2529 q = new CompletionNode(d);
2530 }
2531 }
2532 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2533 Throwable ex;
2534 if (r instanceof AltResult)
2535 ex = ((AltResult)r).ex;
2536 else
2537 ex = null;
2538 if (ex == null) {
2539 try {
2540 if (e != null)
2541 e.execute(new AsyncRunnable(action, dst));
2542 else
2543 action.run();
2544 } catch (Throwable rex) {
2545 ex = rex;
2546 }
2547 }
2548 if (e == null || ex != null)
2549 dst.internalComplete(null, ex);
2550 }
2551 helpPostComplete();
2552 other.helpPostComplete();
2553 return dst;
2554 }
2555
2556 // Unsafe mechanics
2557 private static final sun.misc.Unsafe UNSAFE;
2558 private static final long RESULT;
2559 private static final long WAITERS;
2560 private static final long COMPLETIONS;
2561 static {
2562 try {
2563 UNSAFE = sun.misc.Unsafe.getUnsafe();
2564 Class<?> k = CompletableFuture.class;
2565 RESULT = UNSAFE.objectFieldOffset
2566 (k.getDeclaredField("result"));
2567 WAITERS = UNSAFE.objectFieldOffset
2568 (k.getDeclaredField("waiters"));
2569 COMPLETIONS = UNSAFE.objectFieldOffset
2570 (k.getDeclaredField("completions"));
2571 } catch (Exception e) {
2572 throw new Error(e);
2573 }
2574 }
2575 }