ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.17
Committed: Sun Dec 30 14:45:59 2012 UTC (11 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.16: +366 -111 lines
Log Message:
Expand and regularize function-accepting methods

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Block;
10 import java.util.function.BiBlock;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>Similar methods are available for Functions, Blocks, and
31 * Runnables, depending on whether actions require arguments and/or
32 * produce results. Functions and actions supplied for dependent
33 * completions (mainly using methods with prefix {@code then}) may be
34 * performed by the thread that completes the current
35 * CompletableFuture, or by any other caller of these methods. There
36 * are no guarantees about the order of processing completions unless
37 * constrained by these methods.
38 *
39 * <p>When two or more threads attempt to {@link #complete} or {@link
40 * #completeExceptionally} a CompletableFuture, only one of them
41 * succeeds. Upon exceptional completion, or when a completion entails
42 * computation of a function or action, and it terminates abruptly
43 * with an exception, then further completions act as {@code
44 * completeExceptionally} with that exception.
45 *
46 * <p>CompletableFutures themselves do not execute asynchronously.
47 * However, the {@code async} methods provide commonly useful ways to
48 * commence asynchronous processing, using either a given {@link
49 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
50 * function or action that will result in the completion of a new
51 * CompletableFuture.
52 *
53 * @author Doug Lea
54 * @since 1.8
55 */
56 public class CompletableFuture<T> implements Future<T> {
57 /*
58 * Quick overview (more to come):
59 *
60 * 1. Non-nullness of field result indicates done. An AltResult is
61 * used to box null as a result, as well as to hold exceptions.
62 *
63 * 2. Waiters are held in a Treiber stack similar to the one used
64 * in FutureTask.
65 *
66 * 3. Completions are also kept in a list/stack, and pulled off
67 * and run when completion is triggered. Because post-processing
68 * may race with direct calls, completions extend AtomicInteger so
69 * callers can claim the action via compareAndSet(0, 1).
70 */
71
72 static final class AltResult {
73 final Throwable ex; // null only for NIL
74 AltResult(Throwable ex) { this.ex = ex; }
75 }
76
77 static final AltResult NIL = new AltResult(null);
78
79 /**
80 * Simple linked list nodes to record waiting threads in a Treiber
81 * stack. See other classes such as Phaser and SynchronousQueue
82 * for more detailed explanation.
83 */
84 static final class WaitNode {
85 volatile Thread thread;
86 volatile WaitNode next;
87 }
88
89 /**
90 * Simple linked list nodes to record completions, used in
91 * basically the same way as WaitNodes
92 */
93 static final class CompletionNode {
94 final Completion completion;
95 volatile CompletionNode next;
96 CompletionNode(Completion completion) { this.completion = completion; }
97 }
98
99
100 volatile Object result; // either the result or boxed AltResult
101 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
102 volatile CompletionNode completions; // list (Treiber stack) of completions
103
104 /**
105 * Creates a new incomplete CompletableFuture.
106 */
107 public CompletableFuture() {
108 }
109
110 /**
111 * Asynchronously executes in the {@link
112 * ForkJoinPool#commonPool()}, a task that completes the returned
113 * CompletableFuture with the result of the given Supplier.
114 *
115 * @param supplier a function returning the value to be used
116 * to complete the returned CompletableFuture
117 * @return the CompletableFuture
118 */
119 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
120 if (supplier == null) throw new NullPointerException();
121 CompletableFuture<U> f = new CompletableFuture<U>();
122 ForkJoinPool.commonPool().
123 execute((ForkJoinTask<?>)new AsyncSupplier<U>(supplier, f));
124 return f;
125 }
126
127 /**
128 * Asynchronously executes using the given executor, a task that
129 * completes the returned CompletableFuture with the result of the
130 * given Supplier.
131 *
132 * @param supplier a function returning the value to be used
133 * to complete the returned CompletableFuture
134 * @param executor the executor to use for asynchronous execution
135 * @return the CompletableFuture
136 */
137 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
138 Executor executor) {
139 if (executor == null || supplier == null)
140 throw new NullPointerException();
141 CompletableFuture<U> f = new CompletableFuture<U>();
142 executor.execute(new AsyncSupplier<U>(supplier, f));
143 return f;
144 }
145
146 /**
147 * Asynchronously executes in the {@link
148 * ForkJoinPool#commonPool()} a task that runs the given action,
149 * and then completes the returned CompletableFuture.
150 *
151 * @param runnable the action to run before completing the
152 * returned CompletableFuture
153 * @return the CompletableFuture
154 */
155 public static CompletableFuture<Void> runAsync(Runnable runnable) {
156 if (runnable == null) throw new NullPointerException();
157 CompletableFuture<Void> f = new CompletableFuture<Void>();
158 ForkJoinPool.commonPool().
159 execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
160 return f;
161 }
162
163 /**
164 * Asynchronously executes using the given executor, a task that
165 * runs the given action, and then completes the returned
166 * CompletableFuture.
167 *
168 * @param runnable the action to run before completing the
169 * returned CompletableFuture
170 * @param executor the executor to use for asynchronous execution
171 * @return the CompletableFuture
172 */
173 public static CompletableFuture<Void> runAsync(Runnable runnable,
174 Executor executor) {
175 if (executor == null || runnable == null)
176 throw new NullPointerException();
177 CompletableFuture<Void> f = new CompletableFuture<Void>();
178 executor.execute(new AsyncRunnable(runnable, f));
179 return f;
180 }
181
182 /**
183 * Returns {@code true} if completed in any fashion: normally,
184 * exceptionally, or via cancellation.
185 *
186 * @return {@code true} if completed
187 */
188 public boolean isDone() {
189 return result != null;
190 }
191
192 /**
193 * Returns the result value when complete, or throws an
194 * (unchecked) exception if completed exceptionally. To better
195 * conform with the use of common functional forms, this method
196 * transforms any checked exception possible with {@link
197 * Future#get} into an (unchecked) {@link RuntimeException} with
198 * the underlying exception as its cause. (The checked exception
199 * convention is available using the timed form of get.)
200 *
201 * @return the result value
202 */
203 public T get() {
204 Object r; Throwable ex;
205 if ((r = result) == null)
206 return waitingGet();
207 if (r instanceof AltResult) {
208 if ((ex = ((AltResult)r).ex) != null) {
209 if (ex instanceof Error)
210 throw (Error)ex;
211 if (ex instanceof RuntimeException)
212 throw (RuntimeException)ex;
213 throw new RuntimeException(ex);
214 }
215 return null;
216 }
217 return (T)r;
218 }
219
220 /**
221 * Returns the result value (or throws any encountered exception)
222 * if completed, else returns the given valueIfAbsent.
223 *
224 * @param valueIfAbsent the value to return if not completed
225 * @return the result value, if completed, else the given valueIfAbsent
226 */
227 public T getNow(T valueIfAbsent) {
228 Object r; Throwable ex;
229 if ((r = result) == null)
230 return valueIfAbsent;
231 if (r instanceof AltResult) {
232 if ((ex = ((AltResult)r).ex) != null) {
233 if (ex instanceof Error)
234 throw (Error)ex;
235 if (ex instanceof RuntimeException)
236 throw (RuntimeException)ex;
237 throw new RuntimeException(ex);
238 }
239 return null;
240 }
241 return (T)r;
242 }
243
244 /**
245 * Waits if necessary for at most the given time for completion,
246 * and then retrieves its result, if available.
247 *
248 * @param timeout the maximum time to wait
249 * @param unit the time unit of the timeout argument
250 * @return the computed result
251 * @throws CancellationException if the computation was cancelled
252 * @throws ExecutionException if the computation threw an
253 * exception
254 * @throws InterruptedException if the current thread was interrupted
255 * while waiting
256 * @throws TimeoutException if the wait timed out
257 */
258 public T get(long timeout, TimeUnit unit)
259 throws InterruptedException, ExecutionException, TimeoutException {
260 Object r; Throwable ex;
261 long nanos = unit.toNanos(timeout);
262 if (Thread.interrupted())
263 throw new InterruptedException();
264 if ((r = result) == null)
265 r = timedAwaitDone(nanos);
266 if (r instanceof AltResult) {
267 if ((ex = ((AltResult)r).ex) != null) {
268 if (ex instanceof ExecutionException) // avoid re-wrap
269 throw (ExecutionException)ex;
270 throw new ExecutionException(ex);
271 }
272 return null;
273 }
274 return (T)r;
275 }
276
277 /**
278 * If not already completed, sets the value returned by {@link
279 * #get()} and related methods to the given value.
280 *
281 * @param value the result value
282 * @return true if this invocation caused this CompletableFuture
283 * to transition to a completed state, else false
284 */
285 public boolean complete(T value) {
286 if (result == null &&
287 UNSAFE.compareAndSwapObject(this, RESULT, null,
288 (value == null) ? NIL : value)) {
289 postComplete();
290 return true;
291 }
292 return false;
293 }
294
295 /**
296 * If not already completed, causes invocations of {@link #get()}
297 * and related methods to throw the given exception.
298 *
299 * @param ex the exception
300 * @return true if this invocation caused this CompletableFuture
301 * to transition to a completed state, else false
302 */
303 public boolean completeExceptionally(Throwable ex) {
304 if (ex == null) throw new NullPointerException();
305 if (result == null) {
306 Object r = new AltResult(ex);
307 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
308 postComplete();
309 return true;
310 }
311 }
312 return false;
313 }
314
315 /**
316 * Creates and returns a CompletableFuture that is completed with
317 * the result of the given function of this CompletableFuture.
318 * If this CompletableFuture completes exceptionally,
319 * then the returned CompletableFuture also does so,
320 * with a RuntimeException having this exception as
321 * its cause.
322 *
323 * @param fn the function to use to compute the value of
324 * the returned CompletableFuture
325 * @return the new CompletableFuture
326 */
327 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
328 return thenFunction(fn, null);
329 }
330
331 /**
332 * Creates and returns a CompletableFuture that is asynchronously
333 * completed using the {@link ForkJoinPool#commonPool()} with the
334 * result of the given function of this CompletableFuture. If
335 * this CompletableFuture completes exceptionally, then the
336 * returned CompletableFuture also does so, with a
337 * RuntimeException having this exception as its cause.
338 *
339 * @param fn the function to use to compute the value of
340 * the returned CompletableFuture
341 * @return the new CompletableFuture
342 */
343 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
344 return thenFunction(fn, ForkJoinPool.commonPool());
345 }
346
347 /**
348 * Creates and returns a CompletableFuture that is asynchronously
349 * completed using the given executor with the result of the given
350 * function of this CompletableFuture. If this CompletableFuture
351 * completes exceptionally, then the returned CompletableFuture
352 * also does so, with a RuntimeException having this exception as
353 * its cause.
354 *
355 * @param fn the function to use to compute the value of
356 * the returned CompletableFuture
357 * @param executor the executor to use for asynchronous execution
358 * @return the new CompletableFuture
359 */
360 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
361 Executor executor) {
362 if (executor == null) throw new NullPointerException();
363 return thenFunction(fn, executor);
364 }
365
366 /**
367 * Creates and returns a CompletableFuture that is completed after
368 * performing the given action with this CompletableFuture's
369 * result when it completes. If this CompletableFuture
370 * completes exceptionally, then the returned CompletableFuture
371 * also does so, with a RuntimeException having this exception as
372 * its cause.
373 *
374 * @param block the action to perform before completing the
375 * returned CompletableFuture
376 * @return the new CompletableFuture
377 */
378 public CompletableFuture<Void> thenAccept(Block<? super T> block) {
379 return thenBlock(block, null);
380 }
381
382 /**
383 * Creates and returns a CompletableFuture that is asynchronously
384 * completed using the {@link ForkJoinPool#commonPool()} with this
385 * CompletableFuture's result when it completes. If this
386 * CompletableFuture completes exceptionally, then the returned
387 * CompletableFuture also does so, with a RuntimeException having
388 * this exception as its cause.
389 *
390 * @param block the action to perform before completing the
391 * returned CompletableFuture
392 * @return the new CompletableFuture
393 */
394 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block) {
395 return thenBlock(block, ForkJoinPool.commonPool());
396 }
397
398 /**
399 * Creates and returns a CompletableFuture that is asynchronously
400 * completed using the given executor with this
401 * CompletableFuture's result when it completes. If this
402 * CompletableFuture completes exceptionally, then the returned
403 * CompletableFuture also does so, with a RuntimeException having
404 * this exception as its cause.
405 *
406 * @param block the action to perform before completing the
407 * returned CompletableFuture
408 * @param executor the executor to use for asynchronous execution
409 * @return the new CompletableFuture
410 */
411 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block,
412 Executor executor) {
413 if (executor == null) throw new NullPointerException();
414 return thenBlock(block, executor);
415 }
416
417 /**
418 * Creates and returns a CompletableFuture that is completed after
419 * performing the given action when this CompletableFuture
420 * completes. If this CompletableFuture completes exceptionally,
421 * then the returned CompletableFuture also does so, with a
422 * RuntimeException having this exception as its cause.
423 *
424 * @param action the action to perform before completing the
425 * returned CompletableFuture
426 * @return the new CompletableFuture
427 */
428 public CompletableFuture<Void> thenRun(Runnable action) {
429 return thenRunnable(action, null);
430 }
431
432 /**
433 * Creates and returns a CompletableFuture that is asynchronously
434 * completed using the {@link ForkJoinPool#commonPool()} after
435 * performing the given action when this CompletableFuture
436 * completes. If this CompletableFuture completes exceptionally,
437 * then the returned CompletableFuture also does so, with a
438 * RuntimeException having this exception as its cause.
439 *
440 * @param action the action to perform before completing the
441 * returned CompletableFuture
442 * @return the new CompletableFuture
443 */
444 public CompletableFuture<Void> thenRunAsync(Runnable action) {
445 return thenRunnable(action, ForkJoinPool.commonPool());
446 }
447
448 /**
449 * Creates and returns a CompletableFuture that is asynchronously
450 * completed using the given executor after performing the given
451 * action when this CompletableFuture completes. If this
452 * CompletableFuture completes exceptionally, then the returned
453 * CompletableFuture also does so, with a RuntimeException having
454 * this exception as its cause.
455 *
456 * @param action the action to perform before completing the
457 * returned CompletableFuture
458 * @param executor the executor to use for asynchronous execution
459 * @return the new CompletableFuture
460 */
461 public CompletableFuture<Void> thenRunAsync(Runnable action,
462 Executor executor) {
463 if (executor == null) throw new NullPointerException();
464 return thenRunnable(action, executor);
465 }
466
467 /**
468 * Creates and returns a CompletableFuture that is completed with
469 * the result of the given function of this and the other given
470 * CompletableFuture's results when both complete. If this or
471 * the other CompletableFuture complete exceptionally, then the
472 * returned CompletableFuture also does so, with a
473 * RuntimeException having the exception as its cause.
474 *
475 * @param other the other CompletableFuture
476 * @param fn the function to use to compute the value of
477 * the returned CompletableFuture
478 * @return the new CompletableFuture
479 */
480 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
481 BiFunction<? super T,? super U,? extends V> fn) {
482 return andFunction(other, fn, null);
483 }
484
485 /**
486 * Creates and returns a CompletableFuture that is asynchronously
487 * completed using the {@link ForkJoinPool#commonPool()} with
488 * the result of the given function of this and the other given
489 * CompletableFuture's results when both complete. If this or
490 * the other CompletableFuture complete exceptionally, then the
491 * returned CompletableFuture also does so, with a
492 * RuntimeException having the exception as its cause.
493 *
494 * @param other the other CompletableFuture
495 * @param fn the function to use to compute the value of
496 * the returned CompletableFuture
497 * @return the new CompletableFuture
498 */
499 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
500 BiFunction<? super T,? super U,? extends V> fn) {
501 return andFunction(other, fn, ForkJoinPool.commonPool());
502 }
503
504 /**
505 * Creates and returns a CompletableFuture that is
506 * asynchronously completed using the given executor with the
507 * result of the given function of this and the other given
508 * CompletableFuture's results when both complete. If this or
509 * the other CompletableFuture complete exceptionally, then the
510 * returned CompletableFuture also does so, with a
511 * RuntimeException having the exception as its cause.
512 *
513 * @param other the other CompletableFuture
514 * @param fn the function to use to compute the value of
515 * the returned CompletableFuture
516 * @param executor the executor to use for asynchronous execution
517 * @return the new CompletableFuture
518 */
519
520 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
521 BiFunction<? super T,? super U,? extends V> fn,
522 Executor executor) {
523 if (executor == null) throw new NullPointerException();
524 return andFunction(other, fn, executor);
525 }
526
527 /**
528 * Creates and returns a CompletableFuture that is completed with
529 * the results of this and the other given CompletableFuture if
530 * both complete. If this and/or the other CompletableFuture
531 * complete exceptionally, then the returned CompletableFuture
532 * also does so, with a RuntimeException having one of these
533 * exceptions as its cause.
534 *
535 * @param other the other CompletableFuture
536 * @param block the action to perform before completing the
537 * returned CompletableFuture
538 * @return the new CompletableFuture
539 */
540 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
541 BiBlock<? super T, ? super U> block) {
542 return andBlock(other, block, null);
543 }
544
545 /**
546 * Creates and returns a CompletableFuture that is completed
547 * asynchronously using the {@link ForkJoinPool#commonPool()} with
548 * the results of this and the other given CompletableFuture when
549 * both complete. If this and/or the other CompletableFuture
550 * complete exceptionally, then the returned CompletableFuture
551 * also does so, with a RuntimeException having one of these
552 * exceptions as its cause.
553 *
554 * @param other the other CompletableFuture
555 * @param block the action to perform before completing the
556 * returned CompletableFuture
557 * @return the new CompletableFuture
558 */
559 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
560 BiBlock<? super T, ? super U> block) {
561 return andBlock(other, block, ForkJoinPool.commonPool());
562 }
563
564 /**
565 * Creates and returns a CompletableFuture that is completed
566 * asynchronously using the given executor with the results of
567 * this and the other given CompletableFuture when both complete.
568 * If this and/or the other CompletableFuture complete
569 * exceptionally, then the returned CompletableFuture also does
570 * so, with a RuntimeException having one of these exceptions as
571 * its cause.
572 *
573 * @param other the other CompletableFuture
574 * @param block the action to perform before completing the
575 * returned CompletableFuture
576 * @param executor the executor to use for asynchronous execution
577 * @return the new CompletableFuture
578 */
579 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
580 BiBlock<? super T, ? super U> block,
581 Executor executor) {
582 if (executor == null) throw new NullPointerException();
583 return andBlock(other, block, executor);
584 }
585
586 /**
587 * Creates and returns a CompletableFuture that is completed
588 * when this and the other given CompletableFuture both
589 * complete. If this and/or the other CompletableFuture complete
590 * exceptionally, then the returned CompletableFuture also does
591 * so, with a RuntimeException having one of these exceptions as
592 * its cause.
593 *
594 * @param other the other CompletableFuture
595 * @param action the action to perform before completing the
596 * returned CompletableFuture
597 * @return the new CompletableFuture
598 */
599 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
600 Runnable action) {
601 return andRunnable(other, action, null);
602 }
603
604 /**
605 * Creates and returns a CompletableFuture that is completed
606 * asynchronously using the {@link ForkJoinPool#commonPool()}
607 * when this and the other given CompletableFuture both
608 * complete. If this and/or the other CompletableFuture complete
609 * exceptionally, then the returned CompletableFuture also does
610 * so, with a RuntimeException having one of these exceptions as
611 * its cause.
612 *
613 * @param other the other CompletableFuture
614 * @param action the action to perform before completing the
615 * returned CompletableFuture
616 * @return the new CompletableFuture
617 */
618 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
619 Runnable action) {
620 return andRunnable(other, action, ForkJoinPool.commonPool());
621 }
622
623 /**
624 * Creates and returns a CompletableFuture that is completed
625 * asynchronously using the given executor
626 * when this and the other given CompletableFuture both
627 * complete. If this and/or the other CompletableFuture complete
628 * exceptionally, then the returned CompletableFuture also does
629 * so, with a RuntimeException having one of these exceptions as
630 * its cause.
631 *
632 * @param other the other CompletableFuture
633 * @param action the action to perform before completing the
634 * returned CompletableFuture
635 * @param executor the executor to use for asynchronous execution
636 * @return the new CompletableFuture
637 */
638 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
639 Runnable action,
640 Executor executor) {
641 if (executor == null) throw new NullPointerException();
642 return andRunnable(other, action, executor);
643 }
644
645 /**
646 * Creates and returns a CompletableFuture that is completed with
647 * the result of the given function of either this or the other
648 * given CompletableFuture's results when either complete. If
649 * this and/or the other CompletableFuture complete exceptionally,
650 * then the returned CompletableFuture may also do so, with a
651 * RuntimeException having one of these exceptions as its cause.
652 * No guarantees are made about which result or exception is used
653 * in the returned CompletableFuture.
654 *
655 * @param other the other CompletableFuture
656 * @param fn the function to use to compute the value of
657 * the returned CompletableFuture
658 * @return the new CompletableFuture
659 */
660 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
661 Function<? super T, U> fn) {
662 return orFunction(other, fn, null);
663 }
664
665 /**
666 * Creates and returns a CompletableFuture that is completed
667 * asynchronously using the {@link ForkJoinPool#commonPool()} with
668 * the result of the given function of either this or the other
669 * given CompletableFuture's results when either complete. If
670 * this and/or the other CompletableFuture complete exceptionally,
671 * then the returned CompletableFuture may also do so, with a
672 * RuntimeException having one of these exceptions as its cause.
673 * No guarantees are made about which result or exception is used
674 * in the returned CompletableFuture.
675 *
676 * @param other the other CompletableFuture
677 * @param fn the function to use to compute the value of
678 * the returned CompletableFuture
679 * @return the new CompletableFuture
680 */
681 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
682 Function<? super T, U> fn) {
683 return orFunction(other, fn, ForkJoinPool.commonPool());
684 }
685
686 /**
687 * Creates and returns a CompletableFuture that is completed
688 * asynchronously using the given executor with the result of the
689 * given function of either this or the other given
690 * CompletableFuture's results when either complete. If this
691 * and/or the other CompletableFuture complete exceptionally, then
692 * the returned CompletableFuture may also do so, with a
693 * RuntimeException having one of these exceptions as its cause.
694 * No guarantees are made about which result or exception is used
695 * in the returned CompletableFuture.
696 *
697 * @param other the other CompletableFuture
698 * @param fn the function to use to compute the value of
699 * the returned CompletableFuture
700 * @param executor the executor to use for asynchronous execution
701 * @return the new CompletableFuture
702 */
703 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
704 Function<? super T, U> fn,
705 Executor executor) {
706 if (executor == null) throw new NullPointerException();
707 return orFunction(other, fn, executor);
708 }
709
710 /**
711 * Creates and returns a CompletableFuture that is completed after
712 * performing the given action with the result of either this or the
713 * other given CompletableFuture's result, when either complete.
714 * If this and/or the other CompletableFuture complete
715 * exceptionally, then the returned CompletableFuture may also do
716 * so, with a RuntimeException having one of these exceptions as
717 * its cause. No guarantees are made about which exception is
718 * used in the returned CompletableFuture.
719 *
720 * @param other the other CompletableFuture
721 * @param block the action to perform before completing the
722 * returned CompletableFuture
723 * @return the new CompletableFuture
724 */
725 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
726 Block<? super T> block) {
727 return orBlock(other, block, null);
728 }
729
730 /**
731 * Creates and returns a CompletableFuture that is completed
732 * asynchronously using the {@link ForkJoinPool#commonPool()},
733 * performing the given action with the result of either this or
734 * the other given CompletableFuture's result, when either
735 * complete. If this and/or the other CompletableFuture complete
736 * exceptionally, then the returned CompletableFuture may also do
737 * so, with a RuntimeException having one of these exceptions as
738 * its cause. No guarantees are made about which exception is
739 * used in the returned CompletableFuture.
740 *
741 * @param other the other CompletableFuture
742 * @param block the action to perform before completing the
743 * returned CompletableFuture
744 * @return the new CompletableFuture
745 */
746 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
747 Block<? super T> block) {
748 return orBlock(other, block, ForkJoinPool.commonPool());
749 }
750
751 /**
752 * Creates and returns a CompletableFuture that is completed
753 * asynchronously using the given executor,
754 * performing the given action with the result of either this or
755 * the other given CompletableFuture's result, when either
756 * complete. If this and/or the other CompletableFuture complete
757 * exceptionally, then the returned CompletableFuture may also do
758 * so, with a RuntimeException having one of these exceptions as
759 * its cause. No guarantees are made about which exception is
760 * used in the returned CompletableFuture.
761 *
762 * @param other the other CompletableFuture
763 * @param block the action to perform before completing the
764 * returned CompletableFuture
765 * @param executor the executor to use for asynchronous execution
766 * @return the new CompletableFuture
767 */
768 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
769 Block<? super T> block,
770 Executor executor) {
771 if (executor == null) throw new NullPointerException();
772 return orBlock(other, block, executor);
773 }
774
775 /**
776 * Creates and returns a CompletableFuture that is completed
777 * after this or the other given CompletableFuture complete. If
778 * this and/or the other CompletableFuture complete exceptionally,
779 * then the returned CompletableFuture may also do so, with a
780 * RuntimeException having one of these exceptions as its cause.
781 * No guarantees are made about which exception is used in the
782 * returned CompletableFuture.
783 *
784 * @param other the other CompletableFuture
785 * @param action the action to perform before completing the
786 * returned CompletableFuture
787 * @return the new CompletableFuture
788 */
789 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
790 Runnable action) {
791 return orRunnable(other, action, null);
792 }
793
794 /**
795 * Creates and returns a CompletableFuture that is completed
796 * asynchronously using the {@link ForkJoinPool#commonPool()}
797 * after this or the other given CompletableFuture complete. If
798 * this and/or the other CompletableFuture complete exceptionally,
799 * then the returned CompletableFuture may also do so, with a
800 * RuntimeException having one of these exceptions as its cause.
801 * No guarantees are made about which exception is used in the
802 * returned CompletableFuture.
803 *
804 * @param other the other CompletableFuture
805 * @param action the action to perform before completing the
806 * returned CompletableFuture
807 * @return the new CompletableFuture
808 */
809 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
810 Runnable action) {
811 return orRunnable(other, action, ForkJoinPool.commonPool());
812 }
813
814 /**
815 * Creates and returns a CompletableFuture that is completed
816 * asynchronously using the given executor after this or the other
817 * given CompletableFuture complete. If this and/or the other
818 * CompletableFuture complete exceptionally, then the returned
819 * CompletableFuture may also do so, with a RuntimeException
820 * having one of these exceptions as its cause. No guarantees are
821 * made about which exception is used in the returned
822 * CompletableFuture.
823 *
824 * @param other the other CompletableFuture
825 * @param action the action to perform before completing the
826 * returned CompletableFuture
827 * @param executor the executor to use for asynchronous execution
828 * @return the new CompletableFuture
829 */
830 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
831 Runnable action,
832 Executor executor) {
833 if (executor == null) throw new NullPointerException();
834 return orRunnable(other, action, executor);
835 }
836
837 /**
838 * Returns a CompletableFuture equal or equivalent to that
839 * produced by the given function of the result of this
840 * CompletableFuture when completed. If this CompletableFuture
841 * completes exceptionally, then the returned CompletableFuture
842 * also does so, with a RuntimeException having this exception as
843 * its cause.
844 *
845 * @param fn the function returning a new CompletableFuture.
846 * @return the CompletableFuture, that {@code isDone()} upon
847 * return if completed by the given function, or an exception
848 * occurs.
849 */
850 public <U> CompletableFuture<U> thenCompose(Function<? super T,
851 CompletableFuture<U>> fn) {
852 if (fn == null) throw new NullPointerException();
853 CompletableFuture<U> dst = null;
854 ThenCompose<T,U> d = null;
855 Object r;
856 if ((r = result) == null) {
857 dst = new CompletableFuture<U>();
858 CompletionNode p = new CompletionNode
859 (d = new ThenCompose<T,U>(this, fn, dst));
860 while ((r = result) == null) {
861 if (UNSAFE.compareAndSwapObject
862 (this, COMPLETIONS, p.next = completions, p))
863 break;
864 }
865 }
866 if (r != null && (d == null || d.compareAndSet(0, 1))) {
867 T t; Throwable ex = null;
868 if (r instanceof AltResult) {
869 ex = ((AltResult)r).ex;
870 t = null;
871 }
872 else
873 t = (T) r;
874 if (ex == null) {
875 try {
876 dst = fn.apply(t);
877 } catch (Throwable rex) {
878 ex = rex;
879 }
880 }
881 if (dst == null) {
882 dst = new CompletableFuture<U>();
883 if (ex == null)
884 ex = new NullPointerException();
885 }
886 if (ex != null)
887 dst.completeExceptionally(new RuntimeException(ex));
888 }
889 if (r != null || result != null)
890 postComplete();
891 return dst;
892 }
893
894 /**
895 * Creates and returns a CompletableFuture that is completed with
896 * the result of the given function of the exception triggering
897 * this CompletableFuture's completion when it completes
898 * exceptionally; Otherwise, if this CompletableFuture completes
899 * normally, then the returned CompletableFuture also completes
900 * normally with the same value.
901 *
902 * @param fn the function to use to compute the value of the
903 * returned CompletableFuture if this CompletableFuture completed
904 * exceptionally
905 * @return the new CompletableFuture
906 */
907 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
908 if (fn == null) throw new NullPointerException();
909 CompletableFuture<T> dst = new CompletableFuture<T>();
910 ExceptionAction<T> d = null;
911 Object r;
912 if ((r = result) == null) {
913 CompletionNode p =
914 new CompletionNode(d = new ExceptionAction<T>(this, fn, dst));
915 while ((r = result) == null) {
916 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
917 p.next = completions, p))
918 break;
919 }
920 }
921 if (r != null && (d == null || d.compareAndSet(0, 1))) {
922 T t; Throwable ex = null;
923 if (r instanceof AltResult) {
924 if ((ex = ((AltResult)r).ex) != null) {
925 try {
926 dst.complete(fn.apply(ex));
927 } catch (Throwable rex) {
928 dst.completeExceptionally(rex);
929 }
930 }
931 t = null;
932 }
933 else
934 t = (T) r;
935 if (ex == null)
936 dst.complete(t);
937 }
938 if (r != null || result != null)
939 postComplete();
940 return dst;
941 }
942
943 /**
944 * Creates and returns a CompletableFuture that is completed with
945 * the result of the given function of the result and exception of
946 * this CompletableFuture's completion when it completes. The
947 * given function is invoked with the result (or {@code null} if
948 * none) and the exception (or {@code null} if none) of this
949 * CompletableFuture when complete.
950 *
951 * @param fn the function to use to compute the value of the
952 * returned CompletableFuture
953
954 * @return the new CompletableFuture
955 */
956 public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
957 if (fn == null) throw new NullPointerException();
958 CompletableFuture<U> dst = new CompletableFuture<U>();
959 ThenHandle<T,U> d = null;
960 Object r;
961 if ((r = result) == null) {
962 CompletionNode p =
963 new CompletionNode(d = new ThenHandle<T,U>(this, fn, dst));
964 while ((r = result) == null) {
965 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
966 p.next = completions, p))
967 break;
968 }
969 }
970 if (r != null && (d == null || d.compareAndSet(0, 1))) {
971 T t; Throwable ex;
972 if (r instanceof AltResult) {
973 ex = ((AltResult)r).ex;
974 t = null;
975 }
976 else {
977 ex = null;
978 t = (T) r;
979 }
980 try {
981 dst.complete(fn.apply(t, ex));
982 } catch (Throwable rex) {
983 dst.completeExceptionally(rex);
984 }
985 }
986 if (r != null || result != null)
987 postComplete();
988 return dst;
989 }
990
991 /**
992 * Attempts to complete this CompletableFuture with
993 * a {@link CancellationException}.
994 *
995 * @param mayInterruptIfRunning this value has no effect in this
996 * implementation because interrupts are not used to control
997 * processing.
998 *
999 * @return {@code true} if this task is now cancelled
1000 */
1001 public boolean cancel(boolean mayInterruptIfRunning) {
1002 Object r;
1003 while ((r = result) == null) {
1004 r = new AltResult(new CancellationException());
1005 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
1006 postComplete();
1007 return true;
1008 }
1009 }
1010 return ((r instanceof AltResult) &&
1011 (((AltResult)r).ex instanceof CancellationException));
1012 }
1013
1014 /**
1015 * Returns {@code true} if this CompletableFuture was cancelled
1016 * before it completed normally.
1017 *
1018 * @return {@code true} if this CompletableFuture was cancelled
1019 * before it completed normally
1020 */
1021 public boolean isCancelled() {
1022 Object r;
1023 return ((r = result) != null &&
1024 (r instanceof AltResult) &&
1025 (((AltResult)r).ex instanceof CancellationException));
1026 }
1027
1028 /**
1029 * Forcibly sets or resets the value subsequently returned by
1030 * method get() and related methods, whether or not already
1031 * completed. This method is designed for use only in error
1032 * recovery actions, and even in such situations may result in
1033 * ongoing dependent completions using established versus
1034 * overwritten values.
1035 *
1036 * @param value the completion value
1037 */
1038 public void obtrudeValue(T value) {
1039 result = (value == null) ? NIL : value;
1040 postComplete();
1041 }
1042
1043 /**
1044 * Removes and signals all waiting threads and runs all completions.
1045 */
1046 private void postComplete() {
1047 WaitNode q; Thread t;
1048 while ((q = waiters) != null) {
1049 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
1050 (t = q.thread) != null) {
1051 q.thread = null;
1052 LockSupport.unpark(t);
1053 }
1054 }
1055
1056 CompletionNode h; Completion c;
1057 while ((h = completions) != null) {
1058 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
1059 (c = h.completion) != null)
1060 c.run();
1061 }
1062 }
1063
1064 /* ------------- waiting for completions -------------- */
1065
1066 /**
1067 * Heuristic spin value for waitingGet() before blocking on
1068 * multiprocessors
1069 */
1070 static final int WAITING_GET_SPINS = 256;
1071
1072 /**
1073 * Returns result after waiting.
1074 */
1075 private T waitingGet() {
1076 WaitNode q = null;
1077 boolean queued = false, interrupted = false;
1078 int h = 0, spins = 0;
1079 for (Object r;;) {
1080 if ((r = result) != null) {
1081 Throwable ex;
1082 if (q != null) // suppress unpark
1083 q.thread = null;
1084 postComplete(); // help release others
1085 if (interrupted)
1086 Thread.currentThread().interrupt();
1087 if (r instanceof AltResult) {
1088 if ((ex = ((AltResult)r).ex) != null) {
1089 if (ex instanceof Error)
1090 throw (Error)ex;
1091 if (ex instanceof RuntimeException)
1092 throw (RuntimeException)ex;
1093 throw new RuntimeException(ex);
1094 }
1095 return null;
1096 }
1097 return (T)r;
1098 }
1099 else if (h == 0) {
1100 h = ThreadLocalRandom.current().nextInt();
1101 if (Runtime.getRuntime().availableProcessors() > 1)
1102 spins = WAITING_GET_SPINS;
1103 }
1104 else if (spins > 0) {
1105 h ^= h << 1; // xorshift
1106 h ^= h >>> 3;
1107 if ((h ^= h << 10) >= 0)
1108 --spins;
1109 }
1110 else if (q == null)
1111 q = new WaitNode();
1112 else if (!queued)
1113 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1114 q.next = waiters, q);
1115 else if (Thread.interrupted())
1116 interrupted = true;
1117 else if (q.thread == null)
1118 q.thread = Thread.currentThread();
1119 else
1120 LockSupport.park(this);
1121 }
1122 }
1123
1124 /**
1125 * Awaits completion or aborts on interrupt or timeout.
1126 *
1127 * @param nanos time to wait
1128 * @return raw result
1129 */
1130 private Object timedAwaitDone(long nanos)
1131 throws InterruptedException, TimeoutException {
1132 final long deadline = System.nanoTime() + nanos;
1133 WaitNode q = null;
1134 boolean queued = false;
1135 for (Object r;;) {
1136 if (Thread.interrupted()) {
1137 removeWaiter(q);
1138 throw new InterruptedException();
1139 }
1140 else if ((r = result) != null) {
1141 if (q != null)
1142 q.thread = null;
1143 postComplete();
1144 return r;
1145 }
1146 else if (q == null)
1147 q = new WaitNode();
1148 else if (!queued)
1149 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1150 q.next = waiters, q);
1151 else if ((nanos = deadline - System.nanoTime()) <= 0L) {
1152 removeWaiter(q);
1153 throw new TimeoutException();
1154 }
1155 else if (q.thread == null)
1156 q.thread = Thread.currentThread();
1157 else
1158 LockSupport.parkNanos(this, nanos);
1159 }
1160 }
1161
1162 /**
1163 * Tries to unlink a timed-out or interrupted wait node to avoid
1164 * accumulating garbage. Internal nodes are simply unspliced
1165 * without CAS since it is harmless if they are traversed anyway
1166 * by releasers. To avoid effects of unsplicing from already
1167 * removed nodes, the list is retraversed in case of an apparent
1168 * race. This is slow when there are a lot of nodes, but we don't
1169 * expect lists to be long enough to outweigh higher-overhead
1170 * schemes.
1171 */
1172 private void removeWaiter(WaitNode node) {
1173 if (node != null) {
1174 node.thread = null;
1175 retry:
1176 for (;;) { // restart on removeWaiter race
1177 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
1178 s = q.next;
1179 if (q.thread != null)
1180 pred = q;
1181 else if (pred != null) {
1182 pred.next = s;
1183 if (pred.thread == null) // check for race
1184 continue retry;
1185 }
1186 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
1187 continue retry;
1188 }
1189 break;
1190 }
1191 }
1192 }
1193
1194 /* ------------- Async tasks -------------- */
1195
1196 /** Base class can act as either FJ or plain Runnable */
1197 static abstract class Async extends ForkJoinTask<Void> implements Runnable {
1198 public final Void getRawResult() { return null; }
1199 public final void setRawResult(Void v) { }
1200 public final void run() { exec(); }
1201 }
1202
1203 static final class AsyncRunnable extends Async {
1204 final Runnable runnable;
1205 final CompletableFuture<Void> dst;
1206 AsyncRunnable(Runnable runnable, CompletableFuture<Void> dst) {
1207 this.runnable = runnable; this.dst = dst;
1208 }
1209 public final boolean exec() {
1210 Runnable fn;
1211 CompletableFuture<Void> d;
1212 if ((fn = this.runnable) == null || (d = this.dst) == null)
1213 throw new NullPointerException();
1214 try {
1215 fn.run();
1216 d.complete(null);
1217 } catch (Throwable ex) {
1218 d.completeExceptionally(ex);
1219 }
1220 return true;
1221 }
1222 private static final long serialVersionUID = 5232453952276885070L;
1223 }
1224
1225 static final class AsyncSupplier<U> extends Async {
1226 final Supplier<U> supplier;
1227 final CompletableFuture<U> dst;
1228 AsyncSupplier(Supplier<U> supplier, CompletableFuture<U> dst) {
1229 this.supplier = supplier; this.dst = dst;
1230 }
1231 public final boolean exec() {
1232 Supplier<U> fn;
1233 CompletableFuture<U> d;
1234 if ((fn = this.supplier) == null || (d = this.dst) == null)
1235 throw new NullPointerException();
1236 try {
1237 d.complete(fn.get());
1238 } catch (Throwable ex) {
1239 d.completeExceptionally(ex);
1240 }
1241 return true;
1242 }
1243 private static final long serialVersionUID = 5232453952276885070L;
1244 }
1245
1246 static final class AsyncFunction<T,U> extends Async {
1247 Function<? super T,? extends U> fn;
1248 T arg;
1249 final CompletableFuture<U> dst;
1250 AsyncFunction(T arg, Function<? super T,? extends U> fn,
1251 CompletableFuture<U> dst) {
1252 this.arg = arg; this.fn = fn; this.dst = dst;
1253 }
1254 public final boolean exec() {
1255 Function<? super T,? extends U> fn;
1256 CompletableFuture<U> d;
1257 if ((fn = this.fn) == null || (d = this.dst) == null)
1258 throw new NullPointerException();
1259 try {
1260 d.complete(fn.apply(arg));
1261 } catch (Throwable ex) {
1262 d.completeExceptionally(ex);
1263 }
1264 return true;
1265 }
1266 private static final long serialVersionUID = 5232453952276885070L;
1267 }
1268
1269 static final class AsyncBiFunction<T,U,V> extends Async {
1270 final BiFunction<? super T,? super U,? extends V> fn;
1271 final T arg1;
1272 final U arg2;
1273 final CompletableFuture<V> dst;
1274 AsyncBiFunction(T arg1, U arg2,
1275 BiFunction<? super T,? super U,? extends V> fn,
1276 CompletableFuture<V> dst) {
1277 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1278 }
1279 public final boolean exec() {
1280 BiFunction<? super T,? super U,? extends V> fn;
1281 CompletableFuture<V> d;
1282 if ((fn = this.fn) == null || (d = this.dst) == null)
1283 throw new NullPointerException();
1284 try {
1285 d.complete(fn.apply(arg1, arg2));
1286 } catch (Throwable ex) {
1287 d.completeExceptionally(ex);
1288 }
1289 return true;
1290 }
1291 private static final long serialVersionUID = 5232453952276885070L;
1292 }
1293
1294 static final class AsyncBlock<T> extends Async {
1295 Block<? super T> fn;
1296 T arg;
1297 final CompletableFuture<Void> dst;
1298 AsyncBlock(T arg, Block<? super T> fn,
1299 CompletableFuture<Void> dst) {
1300 this.arg = arg; this.fn = fn; this.dst = dst;
1301 }
1302 public final boolean exec() {
1303 Block<? super T> fn;
1304 CompletableFuture<Void> d;
1305 if ((fn = this.fn) == null || (d = this.dst) == null)
1306 throw new NullPointerException();
1307 try {
1308 fn.accept(arg);
1309 d.complete(null);
1310 } catch (Throwable ex) {
1311 d.completeExceptionally(ex);
1312 }
1313 return true;
1314 }
1315 private static final long serialVersionUID = 5232453952276885070L;
1316 }
1317
1318 static final class AsyncBiBlock<T,U> extends Async {
1319 final BiBlock<? super T,? super U> fn;
1320 final T arg1;
1321 final U arg2;
1322 final CompletableFuture<Void> dst;
1323 AsyncBiBlock(T arg1, U arg2,
1324 BiBlock<? super T,? super U> fn,
1325 CompletableFuture<Void> dst) {
1326 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1327 }
1328 public final boolean exec() {
1329 BiBlock<? super T,? super U> fn;
1330 CompletableFuture<Void> d;
1331 if ((fn = this.fn) == null || (d = this.dst) == null)
1332 throw new NullPointerException();
1333 try {
1334 fn.accept(arg1, arg2);
1335 d.complete(null);
1336 } catch (Throwable ex) {
1337 d.completeExceptionally(ex);
1338 }
1339 return true;
1340 }
1341 private static final long serialVersionUID = 5232453952276885070L;
1342 }
1343
1344 /* ------------- Completions -------------- */
1345
1346 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1347 static abstract class Completion extends AtomicInteger implements Runnable {
1348 }
1349
1350 static final class ThenFunction<T,U> extends Completion {
1351 final CompletableFuture<? extends T> src;
1352 final Function<? super T,? extends U> fn;
1353 final CompletableFuture<U> dst;
1354 final Executor executor;
1355 ThenFunction(CompletableFuture<? extends T> src,
1356 final Function<? super T,? extends U> fn,
1357 final CompletableFuture<U> dst, Executor executor) {
1358 this.src = src; this.fn = fn; this.dst = dst;
1359 this.executor = executor;
1360 }
1361 public void run() {
1362 CompletableFuture<? extends T> a;
1363 Function<? super T,? extends U> fn;
1364 CompletableFuture<U> dst;
1365 Object r; T t; Throwable ex;
1366 if ((dst = this.dst) != null &&
1367 (fn = this.fn) != null &&
1368 (a = this.src) != null &&
1369 (r = a.result) != null &&
1370 compareAndSet(0, 1)) {
1371 if (r instanceof AltResult) {
1372 if ((ex = ((AltResult)r).ex) != null)
1373 ex = new RuntimeException(ex);
1374 t = null;
1375 }
1376 else {
1377 ex = null;
1378 t = (T) r;
1379 }
1380 if (ex == null) {
1381 try {
1382 if (executor != null)
1383 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
1384 else
1385 dst.complete(fn.apply(t));
1386 } catch (Throwable rex) {
1387 ex = rex;
1388 }
1389 }
1390 if (ex != null)
1391 dst.completeExceptionally(ex);
1392 }
1393 }
1394 }
1395
1396 static final class ThenBlock<T> extends Completion {
1397 final CompletableFuture<? extends T> src;
1398 final Block<? super T> fn;
1399 final CompletableFuture<Void> dst;
1400 final Executor executor;
1401 ThenBlock(CompletableFuture<? extends T> src,
1402 final Block<? super T> fn,
1403 final CompletableFuture<Void> dst, Executor executor) {
1404 this.src = src; this.fn = fn; this.dst = dst;
1405 this.executor = executor;
1406 }
1407 public void run() {
1408 CompletableFuture<? extends T> a;
1409 Block<? super T> fn;
1410 CompletableFuture<Void> dst;
1411 Object r; T t; Throwable ex;
1412 if ((dst = this.dst) != null &&
1413 (fn = this.fn) != null &&
1414 (a = this.src) != null &&
1415 (r = a.result) != null &&
1416 compareAndSet(0, 1)) {
1417 if (r instanceof AltResult) {
1418 if ((ex = ((AltResult)r).ex) != null)
1419 ex = new RuntimeException(ex);
1420 t = null;
1421 }
1422 else {
1423 ex = null;
1424 t = (T) r;
1425 }
1426 if (ex == null) {
1427 try {
1428 if (executor != null)
1429 executor.execute(new AsyncBlock<T>(t, fn, dst));
1430 else {
1431 fn.accept(t);
1432 dst.complete(null);
1433 }
1434 } catch (Throwable rex) {
1435 ex = rex;
1436 }
1437 }
1438 if (ex != null)
1439 dst.completeExceptionally(ex);
1440 }
1441 }
1442 }
1443
1444 static final class ThenRunnable<T> extends Completion {
1445 final CompletableFuture<? extends T> src;
1446 final Runnable fn;
1447 final CompletableFuture<Void> dst;
1448 final Executor executor;
1449 ThenRunnable(CompletableFuture<? extends T> src,
1450 Runnable fn,
1451 CompletableFuture<Void> dst,
1452 Executor executor) {
1453 this.src = src; this.fn = fn; this.dst = dst;
1454 this.executor = executor;
1455 }
1456 public void run() {
1457 CompletableFuture<? extends T> a;
1458 Runnable fn;
1459 CompletableFuture<Void> dst;
1460 Object r; Throwable ex;
1461 if ((dst = this.dst) != null &&
1462 (fn = this.fn) != null &&
1463 (a = this.src) != null &&
1464 (r = a.result) != null &&
1465 compareAndSet(0, 1)) {
1466 if (r instanceof AltResult) {
1467 if ((ex = ((AltResult)r).ex) != null)
1468 ex = new RuntimeException(ex);
1469 }
1470 else
1471 ex = null;
1472 if (ex == null) {
1473 try {
1474 if (executor != null)
1475 executor.execute(new AsyncRunnable(fn, dst));
1476 else {
1477 fn.run();
1478 dst.complete(null);
1479 }
1480 } catch (Throwable rex) {
1481 ex = rex;
1482 }
1483 }
1484 if (ex != null)
1485 dst.completeExceptionally(ex);
1486 }
1487 }
1488 }
1489
1490 static final class AndFunction<T,U,V> extends Completion {
1491 final CompletableFuture<? extends T> src;
1492 final CompletableFuture<? extends U> snd;
1493 final BiFunction<? super T,? super U,? extends V> fn;
1494 final CompletableFuture<V> dst;
1495 final Executor executor;
1496 AndFunction(CompletableFuture<? extends T> src,
1497 CompletableFuture<? extends U> snd,
1498 BiFunction<? super T,? super U,? extends V> fn,
1499 CompletableFuture<V> dst, Executor executor) {
1500 this.src = src; this.snd = snd;
1501 this.fn = fn; this.dst = dst;
1502 this.executor = executor;
1503 }
1504 public void run() {
1505 Object r, s; T t; U u; Throwable ex;
1506 CompletableFuture<? extends T> a;
1507 CompletableFuture<? extends U> b;
1508 BiFunction<? super T,? super U,? extends V> fn;
1509 CompletableFuture<V> dst;
1510 if ((dst = this.dst) != null &&
1511 (fn = this.fn) != null &&
1512 (a = this.src) != null &&
1513 (r = a.result) != null &&
1514 (b = this.snd) != null &&
1515 (s = b.result) != null &&
1516 compareAndSet(0, 1)) {
1517 if (r instanceof AltResult) {
1518 if ((ex = ((AltResult)r).ex) != null) {
1519 dst.completeExceptionally(new RuntimeException(ex));
1520 return;
1521 }
1522 t = null;
1523 }
1524 else
1525 t = (T) r;
1526 if (s instanceof AltResult) {
1527 if ((ex = ((AltResult)s).ex) != null) {
1528 dst.completeExceptionally(new RuntimeException(ex));
1529 return;
1530 }
1531 u = null;
1532 }
1533 else
1534 u = (U) s;
1535 try {
1536 if (executor != null)
1537 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1538 else
1539 dst.complete(fn.apply(t, u));
1540 } catch (Throwable rex) {
1541 dst.completeExceptionally(rex);
1542 }
1543 }
1544 }
1545 }
1546
1547 static final class AndBlock<T,U> extends Completion {
1548 final CompletableFuture<? extends T> src;
1549 final CompletableFuture<? extends U> snd;
1550 final BiBlock<? super T,? super U> fn;
1551 final CompletableFuture<Void> dst;
1552 final Executor executor;
1553 AndBlock(CompletableFuture<? extends T> src,
1554 CompletableFuture<? extends U> snd,
1555 BiBlock<? super T,? super U> fn,
1556 CompletableFuture<Void> dst, Executor executor) {
1557 this.src = src; this.snd = snd;
1558 this.fn = fn; this.dst = dst;
1559 this.executor = executor;
1560 }
1561 public void run() {
1562 Object r, s; T t; U u; Throwable ex;
1563 CompletableFuture<? extends T> a;
1564 CompletableFuture<? extends U> b;
1565 BiBlock<? super T,? super U> fn;
1566 CompletableFuture<Void> dst;
1567 if ((dst = this.dst) != null &&
1568 (fn = this.fn) != null &&
1569 (a = this.src) != null &&
1570 (r = a.result) != null &&
1571 (b = this.snd) != null &&
1572 (s = b.result) != null &&
1573 compareAndSet(0, 1)) {
1574 if (r instanceof AltResult) {
1575 if ((ex = ((AltResult)r).ex) != null) {
1576 dst.completeExceptionally(new RuntimeException(ex));
1577 return;
1578 }
1579 t = null;
1580 }
1581 else
1582 t = (T) r;
1583 if (s instanceof AltResult) {
1584 if ((ex = ((AltResult)s).ex) != null) {
1585 dst.completeExceptionally(new RuntimeException(ex));
1586 return;
1587 }
1588 u = null;
1589 }
1590 else
1591 u = (U) s;
1592 try {
1593 if (executor != null)
1594 executor.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
1595 else {
1596 fn.accept(t, u);
1597 dst.complete(null);
1598 }
1599 } catch (Throwable rex) {
1600 dst.completeExceptionally(rex);
1601 }
1602 }
1603 }
1604 }
1605
1606 static final class AndRunnable<T> extends Completion {
1607 final CompletableFuture<? extends T> src;
1608 final CompletableFuture<?> snd;
1609 final Runnable fn;
1610 final CompletableFuture<Void> dst;
1611 final Executor executor;
1612 AndRunnable(CompletableFuture<? extends T> src,
1613 CompletableFuture<?> snd,
1614 Runnable fn,
1615 CompletableFuture<Void> dst, Executor executor) {
1616 this.src = src; this.snd = snd;
1617 this.fn = fn; this.dst = dst;
1618 this.executor = executor;
1619 }
1620 public void run() {
1621 Object r, s; Throwable ex;
1622 final CompletableFuture<? extends T> a;
1623 final CompletableFuture<?> b;
1624 final Runnable fn;
1625 final CompletableFuture<Void> dst;
1626 if ((dst = this.dst) != null &&
1627 (fn = this.fn) != null &&
1628 (a = this.src) != null &&
1629 (r = a.result) != null &&
1630 (b = this.snd) != null &&
1631 (s = b.result) != null &&
1632 compareAndSet(0, 1)) {
1633 if (r instanceof AltResult) {
1634 if ((ex = ((AltResult)r).ex) != null) {
1635 dst.completeExceptionally(new RuntimeException(ex));
1636 return;
1637 }
1638 }
1639 if (s instanceof AltResult) {
1640 if ((ex = ((AltResult)s).ex) != null) {
1641 dst.completeExceptionally(new RuntimeException(ex));
1642 return;
1643 }
1644 }
1645 try {
1646 if (executor != null)
1647 executor.execute(new AsyncRunnable(fn, dst));
1648 else {
1649 fn.run();
1650 dst.complete(null);
1651 }
1652 } catch (Throwable rex) {
1653 dst.completeExceptionally(rex);
1654 }
1655 }
1656 }
1657 }
1658
1659 static final class OrFunction<T,U> extends Completion {
1660 final CompletableFuture<? extends T> src;
1661 final CompletableFuture<? extends T> snd;
1662 final Function<? super T,? extends U> fn;
1663 final CompletableFuture<U> dst;
1664 final Executor executor;
1665 OrFunction(CompletableFuture<? extends T> src,
1666 CompletableFuture<? extends T> snd,
1667 Function<? super T,? extends U> fn,
1668 CompletableFuture<U> dst, Executor executor) {
1669 this.src = src; this.snd = snd;
1670 this.fn = fn; this.dst = dst;
1671 this.executor = executor;
1672 }
1673 public void run() {
1674 Object r; T t; Throwable ex;
1675 CompletableFuture<? extends T> a;
1676 CompletableFuture<? extends T> b;
1677 Function<? super T,? extends U> fn;
1678 CompletableFuture<U> dst;
1679 if ((dst = this.dst) != null &&
1680 (fn = this.fn) != null &&
1681 (((a = this.src) != null && (r = a.result) != null) ||
1682 ((b = this.snd) != null && (r = b.result) != null)) &&
1683 compareAndSet(0, 1)) {
1684 if (r instanceof AltResult) {
1685 if ((ex = ((AltResult)r).ex) != null) {
1686 dst.completeExceptionally(new RuntimeException(ex));
1687 return;
1688 }
1689 t = null;
1690 }
1691 else
1692 t = (T) r;
1693 try {
1694 if (executor != null)
1695 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
1696 else
1697 dst.complete(fn.apply(t));
1698 } catch (Throwable rex) {
1699 dst.completeExceptionally(rex);
1700 }
1701 }
1702 }
1703 }
1704
1705 static final class OrBlock<T> extends Completion {
1706 final CompletableFuture<? extends T> src;
1707 final CompletableFuture<? extends T> snd;
1708 final Block<? super T> fn;
1709 final CompletableFuture<Void> dst;
1710 final Executor executor;
1711 OrBlock(CompletableFuture<? extends T> src,
1712 CompletableFuture<? extends T> snd,
1713 Block<? super T> fn,
1714 CompletableFuture<Void> dst, Executor executor) {
1715 this.src = src; this.snd = snd;
1716 this.fn = fn; this.dst = dst;
1717 this.executor = executor;
1718 }
1719 public void run() {
1720 Object r; T t; Throwable ex;
1721 CompletableFuture<? extends T> a;
1722 CompletableFuture<? extends T> b;
1723 Block<? super T> fn;
1724 CompletableFuture<Void> dst;
1725 if ((dst = this.dst) != null &&
1726 (fn = this.fn) != null &&
1727 (((a = this.src) != null && (r = a.result) != null) ||
1728 ((b = this.snd) != null && (r = b.result) != null)) &&
1729 compareAndSet(0, 1)) {
1730 if (r instanceof AltResult) {
1731 if ((ex = ((AltResult)r).ex) != null) {
1732 dst.completeExceptionally(new RuntimeException(ex));
1733 return;
1734 }
1735 t = null;
1736 }
1737 else
1738 t = (T) r;
1739 try {
1740 if (executor != null)
1741 executor.execute(new AsyncBlock<T>(t, fn, dst));
1742 else {
1743 fn.accept(t);
1744 dst.complete(null);
1745 }
1746 } catch (Throwable rex) {
1747 dst.completeExceptionally(rex);
1748 }
1749 }
1750 }
1751 }
1752
1753 static final class OrRunnable<T> extends Completion {
1754 final CompletableFuture<? extends T> src;
1755 final CompletableFuture<?> snd;
1756 final Runnable fn;
1757 final CompletableFuture<Void> dst;
1758 final Executor executor;
1759 OrRunnable(CompletableFuture<? extends T> src,
1760 CompletableFuture<?> snd,
1761 Runnable fn,
1762 CompletableFuture<Void> dst, Executor executor) {
1763 this.src = src; this.snd = snd;
1764 this.fn = fn; this.dst = dst;
1765 this.executor = executor;
1766 }
1767 public void run() {
1768 Object r; Throwable ex;
1769 CompletableFuture<? extends T> a;
1770 final CompletableFuture<?> b;
1771 final Runnable fn;
1772 final CompletableFuture<Void> dst;
1773 if ((dst = this.dst) != null &&
1774 (fn = this.fn) != null &&
1775 (((a = this.src) != null && (r = a.result) != null) ||
1776 ((b = this.snd) != null && (r = b.result) != null)) &&
1777 compareAndSet(0, 1)) {
1778 if ((r instanceof AltResult) &&
1779 (ex = ((AltResult)r).ex) != null) {
1780 dst.completeExceptionally(new RuntimeException(ex));
1781 }
1782 else {
1783 try {
1784 if (executor != null)
1785 executor.execute(new AsyncRunnable(fn, dst));
1786 else {
1787 fn.run();
1788 dst.complete(null);
1789 }
1790 } catch (Throwable rex) {
1791 dst.completeExceptionally(rex);
1792 }
1793 }
1794 }
1795 }
1796 }
1797
1798 static final class ExceptionAction<T> extends Completion {
1799 final CompletableFuture<? extends T> src;
1800 final Function<? super Throwable, ? extends T> fn;
1801 final CompletableFuture<T> dst;
1802 ExceptionAction(CompletableFuture<? extends T> src,
1803 Function<? super Throwable, ? extends T> fn,
1804 CompletableFuture<T> dst) {
1805 this.src = src; this.fn = fn; this.dst = dst;
1806 }
1807 public void run() {
1808 CompletableFuture<? extends T> a;
1809 Function<? super Throwable, ? extends T> fn;
1810 CompletableFuture<T> dst;
1811 Object r; T t; Throwable ex;
1812 if ((dst = this.dst) != null &&
1813 (fn = this.fn) != null &&
1814 (a = this.src) != null &&
1815 (r = a.result) != null &&
1816 compareAndSet(0, 1)) {
1817 if (r instanceof AltResult) {
1818 if ((ex = ((AltResult)r).ex) != null) {
1819 try {
1820 dst.complete(fn.apply(ex));
1821 } catch (Throwable rex) {
1822 dst.completeExceptionally(rex);
1823 }
1824 return;
1825 }
1826 t = null;
1827 }
1828 else
1829 t = (T) r;
1830 dst.complete(t);
1831 }
1832 }
1833 }
1834
1835 static final class ThenCopy<T> extends Completion {
1836 final CompletableFuture<? extends T> src;
1837 final CompletableFuture<T> dst;
1838 ThenCopy(CompletableFuture<? extends T> src,
1839 CompletableFuture<T> dst) {
1840 this.src = src; this.dst = dst;
1841 }
1842 public void run() {
1843 CompletableFuture<? extends T> a;
1844 CompletableFuture<T> dst;
1845 Object r; T t; Throwable ex;
1846 if ((dst = this.dst) != null &&
1847 (a = this.src) != null &&
1848 (r = a.result) != null &&
1849 compareAndSet(0, 1)) {
1850 if (r instanceof AltResult) {
1851 if ((ex = ((AltResult)r).ex) != null) {
1852 dst.completeExceptionally(new RuntimeException(ex));
1853 return;
1854 }
1855 t = null;
1856 }
1857 else
1858 t = (T) r;
1859 dst.complete(t);
1860 }
1861 }
1862 }
1863
1864 static final class ThenHandle<T,U> extends Completion {
1865 final CompletableFuture<? extends T> src;
1866 final BiFunction<? super T, Throwable, ? extends U> fn;
1867 final CompletableFuture<U> dst;
1868 ThenHandle(CompletableFuture<? extends T> src,
1869 BiFunction<? super T, Throwable, ? extends U> fn,
1870 final CompletableFuture<U> dst) {
1871 this.src = src; this.fn = fn; this.dst = dst;
1872 }
1873 public void run() {
1874 CompletableFuture<? extends T> a;
1875 BiFunction<? super T, Throwable, ? extends U> fn;
1876 CompletableFuture<U> dst;
1877 Object r; T t; Throwable ex;
1878 if ((dst = this.dst) != null &&
1879 (fn = this.fn) != null &&
1880 (a = this.src) != null &&
1881 (r = a.result) != null &&
1882 compareAndSet(0, 1)) {
1883 if (r instanceof AltResult) {
1884 ex = ((AltResult)r).ex;
1885 t = null;
1886 }
1887 else {
1888 ex = null;
1889 t = (T) r;
1890 }
1891 try {
1892 dst.complete(fn.apply(t, ex));
1893 } catch (Throwable rex) {
1894 dst.completeExceptionally(rex);
1895 }
1896 }
1897 }
1898 }
1899
1900 static final class ThenCompose<T,U> extends Completion {
1901 final CompletableFuture<? extends T> src;
1902 final Function<? super T, CompletableFuture<U>> fn;
1903 final CompletableFuture<U> dst;
1904 ThenCompose(CompletableFuture<? extends T> src,
1905 Function<? super T, CompletableFuture<U>> fn,
1906 final CompletableFuture<U> dst) {
1907 this.src = src; this.fn = fn; this.dst = dst;
1908 }
1909 public void run() {
1910 CompletableFuture<? extends T> a;
1911 Function<? super T, CompletableFuture<U>> fn;
1912 CompletableFuture<U> dst;
1913 Object r; T t; Throwable ex;
1914 if ((dst = this.dst) != null &&
1915 (fn = this.fn) != null &&
1916 (a = this.src) != null &&
1917 (r = a.result) != null &&
1918 compareAndSet(0, 1)) {
1919 if (r instanceof AltResult) {
1920 ex = ((AltResult)r).ex;
1921 t = null;
1922 }
1923 else {
1924 ex = null;
1925 t = (T) r;
1926 }
1927 CompletableFuture<U> c = null;
1928 if (ex == null) {
1929 try {
1930 c = fn.apply(t);
1931 } catch (Throwable rex) {
1932 ex = rex;
1933 }
1934 }
1935 if (ex != null || c == null) {
1936 if (ex == null)
1937 ex = new NullPointerException();
1938 dst.completeExceptionally(new RuntimeException(ex));
1939 return;
1940 }
1941 ThenCopy<U> d = null;
1942 Object s;
1943 if ((s = c.result) == null) {
1944 CompletionNode p = new CompletionNode
1945 (d = new ThenCopy<U>(c, dst));
1946 while ((s = c.result) == null) {
1947 if (UNSAFE.compareAndSwapObject
1948 (c, COMPLETIONS, p.next = c.completions, p))
1949 break;
1950 }
1951 if ((s = c.result) == null || !d.compareAndSet(0, 1))
1952 return;
1953 }
1954 U u;
1955 if (s instanceof AltResult) {
1956 if ((ex = ((AltResult)s).ex) != null)
1957 dst.completeExceptionally(new RuntimeException(ex));
1958 u = null;
1959 }
1960 else
1961 u = (U) s;
1962 if (ex == null) {
1963 try {
1964 dst.complete(u);
1965 } catch (Throwable rex) {
1966 dst.completeExceptionally(rex);
1967 }
1968 }
1969 c.postComplete();
1970 }
1971 }
1972 }
1973
1974 /* ------------- then/and/or implementations -------------- */
1975
1976 private <U> CompletableFuture<U> thenFunction(Function<? super T,? extends U> fn,
1977 Executor executor) {
1978
1979 if (fn == null) throw new NullPointerException();
1980 CompletableFuture<U> dst = new CompletableFuture<U>();
1981 ThenFunction<T,U> d = null;
1982 Object r;
1983 if ((r = result) == null) {
1984 CompletionNode p = new CompletionNode
1985 (d = new ThenFunction<T,U>(this, fn, dst, executor));
1986 while ((r = result) == null) {
1987 if (UNSAFE.compareAndSwapObject
1988 (this, COMPLETIONS, p.next = completions, p))
1989 break;
1990 }
1991 }
1992 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1993 T t; Throwable ex = null;
1994 if (r instanceof AltResult) {
1995 if ((ex = ((AltResult)r).ex) != null)
1996 dst.completeExceptionally(new RuntimeException(ex));
1997 t = null;
1998 }
1999 else
2000 t = (T) r;
2001 if (ex == null) {
2002 try {
2003 if (executor != null)
2004 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
2005 else
2006 dst.complete(fn.apply(t));
2007 } catch (Throwable rex) {
2008 dst.completeExceptionally(rex);
2009 }
2010 }
2011 }
2012 if (r != null || result != null)
2013 postComplete();
2014 return dst;
2015 }
2016
2017 private CompletableFuture<Void> thenBlock(Block<? super T> fn,
2018 Executor executor) {
2019
2020 if (fn == null) throw new NullPointerException();
2021 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2022 ThenBlock<T> d = null;
2023 Object r;
2024 if ((r = result) == null) {
2025 CompletionNode p = new CompletionNode
2026 (d = new ThenBlock<T>(this, fn, dst, executor));
2027 while ((r = result) == null) {
2028 if (UNSAFE.compareAndSwapObject
2029 (this, COMPLETIONS, p.next = completions, p))
2030 break;
2031 }
2032 }
2033 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2034 T t; Throwable ex = null;
2035 if (r instanceof AltResult) {
2036 if ((ex = ((AltResult)r).ex) != null)
2037 dst.completeExceptionally(new RuntimeException(ex));
2038 t = null;
2039 }
2040 else
2041 t = (T) r;
2042 if (ex == null) {
2043 try {
2044 if (executor != null)
2045 executor.execute(new AsyncBlock<T>(t, fn, dst));
2046 else {
2047 fn.accept(t);
2048 dst.complete(null);
2049 }
2050 } catch (Throwable rex) {
2051 dst.completeExceptionally(rex);
2052 }
2053 }
2054 }
2055 if (r != null || result != null)
2056 postComplete();
2057 return dst;
2058 }
2059
2060 private CompletableFuture<Void> thenRunnable(Runnable action,
2061 Executor executor) {
2062 if (action == null) throw new NullPointerException();
2063 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2064 ThenRunnable<T> d = null;
2065 Object r;
2066 if ((r = result) == null) {
2067 CompletionNode p = new CompletionNode
2068 (d = new ThenRunnable<T>(this, action, dst, executor));
2069 while ((r = result) == null) {
2070 if (UNSAFE.compareAndSwapObject
2071 (this, COMPLETIONS, p.next = completions, p))
2072 break;
2073 }
2074 }
2075 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2076 Throwable ex = null;
2077 if (r instanceof AltResult) {
2078 if ((ex = ((AltResult)r).ex) != null)
2079 dst.completeExceptionally(new RuntimeException(ex));
2080 }
2081 if (ex == null) {
2082 try {
2083 if (executor != null)
2084 executor.execute(new AsyncRunnable(action, dst));
2085 else {
2086 action.run();
2087 dst.complete(null);
2088 }
2089 } catch (Throwable rex) {
2090 dst.completeExceptionally(rex);
2091 }
2092 }
2093 }
2094 if (r != null || result != null)
2095 postComplete();
2096 return dst;
2097 }
2098
2099 private <U,V> CompletableFuture<V> andFunction(CompletableFuture<? extends U> other,
2100 BiFunction<? super T,? super U,? extends V> fn,
2101 Executor executor) {
2102 if (other == null || fn == null) throw new NullPointerException();
2103 CompletableFuture<V> dst = new CompletableFuture<V>();
2104 AndFunction<T,U,V> d = null;
2105 Object r, s = null;
2106 if ((r = result) == null || (s = other.result) == null) {
2107 d = new AndFunction<T,U,V>(this, other, fn, dst, executor);
2108 CompletionNode q = null, p = new CompletionNode(d);
2109 while ((r == null && (r = result) == null) ||
2110 (s == null && (s = other.result) == null)) {
2111 if (q != null) {
2112 if (s != null ||
2113 UNSAFE.compareAndSwapObject
2114 (other, COMPLETIONS, q.next = other.completions, q))
2115 break;
2116 }
2117 else if (r != null ||
2118 UNSAFE.compareAndSwapObject
2119 (this, COMPLETIONS, p.next = completions, p)) {
2120 if (s != null)
2121 break;
2122 q = new CompletionNode(d);
2123 }
2124 }
2125 }
2126 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2127 T t; U u; Throwable ex = null;
2128 if (r instanceof AltResult) {
2129 if ((ex = ((AltResult)r).ex) != null)
2130 dst.completeExceptionally(new RuntimeException(ex));
2131 t = null;
2132 }
2133 else
2134 t = (T) r;
2135 if (ex != null)
2136 u = null;
2137 else if (s instanceof AltResult) {
2138 if ((ex = ((AltResult)s).ex) != null)
2139 dst.completeExceptionally(new RuntimeException(ex));
2140 u = null;
2141 }
2142 else
2143 u = (U) s;
2144 if (ex == null) {
2145 try {
2146 if (executor != null)
2147 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
2148 else
2149 dst.complete(fn.apply(t, u));
2150 } catch (Throwable rex) {
2151 dst.completeExceptionally(rex);
2152 }
2153 }
2154 }
2155 if (r != null || result != null)
2156 postComplete();
2157 if (s != null || other.result != null)
2158 other.postComplete();
2159 return dst;
2160 }
2161
2162 private <U> CompletableFuture<Void> andBlock(CompletableFuture<? extends U> other,
2163 BiBlock<? super T,? super U> fn,
2164 Executor executor) {
2165 if (other == null || fn == null) throw new NullPointerException();
2166 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2167 AndBlock<T,U> d = null;
2168 Object r, s = null;
2169 if ((r = result) == null || (s = other.result) == null) {
2170 d = new AndBlock<T,U>(this, other, fn, dst, executor);
2171 CompletionNode q = null, p = new CompletionNode(d);
2172 while ((r == null && (r = result) == null) ||
2173 (s == null && (s = other.result) == null)) {
2174 if (q != null) {
2175 if (s != null ||
2176 UNSAFE.compareAndSwapObject
2177 (other, COMPLETIONS, q.next = other.completions, q))
2178 break;
2179 }
2180 else if (r != null ||
2181 UNSAFE.compareAndSwapObject
2182 (this, COMPLETIONS, p.next = completions, p)) {
2183 if (s != null)
2184 break;
2185 q = new CompletionNode(d);
2186 }
2187 }
2188 }
2189 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2190 T t; U u; Throwable ex = null;
2191 if (r instanceof AltResult) {
2192 if ((ex = ((AltResult)r).ex) != null)
2193 dst.completeExceptionally(new RuntimeException(ex));
2194 t = null;
2195 }
2196 else
2197 t = (T) r;
2198 if (ex != null)
2199 u = null;
2200 else if (s instanceof AltResult) {
2201 if ((ex = ((AltResult)s).ex) != null)
2202 dst.completeExceptionally(new RuntimeException(ex));
2203 u = null;
2204 }
2205 else
2206 u = (U) s;
2207 if (ex == null) {
2208 try {
2209 if (executor != null)
2210 executor.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
2211 else {
2212 fn.accept(t, u);
2213 dst.complete(null);
2214 }
2215 } catch (Throwable rex) {
2216 dst.completeExceptionally(rex);
2217 }
2218 }
2219 }
2220 if (r != null || result != null)
2221 postComplete();
2222 if (s != null || other.result != null)
2223 other.postComplete();
2224 return dst;
2225 }
2226
2227 private CompletableFuture<Void> andRunnable(CompletableFuture<?> other,
2228 Runnable action,
2229 Executor executor) {
2230 if (other == null || action == null) throw new NullPointerException();
2231 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2232 AndRunnable<T> d = null;
2233 Object r, s = null;
2234 if ((r = result) == null || (s = other.result) == null) {
2235 d = new AndRunnable<T>(this, other, action, dst, executor);
2236 CompletionNode q = null, p = new CompletionNode(d);
2237 while ((r == null && (r = result) == null) ||
2238 (s == null && (s = other.result) == null)) {
2239 if (q != null) {
2240 if (s != null ||
2241 UNSAFE.compareAndSwapObject
2242 (other, COMPLETIONS, q.next = other.completions, q))
2243 break;
2244 }
2245 else if (r != null ||
2246 UNSAFE.compareAndSwapObject
2247 (this, COMPLETIONS, p.next = completions, p)) {
2248 if (s != null)
2249 break;
2250 q = new CompletionNode(d);
2251 }
2252 }
2253 }
2254 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2255 Throwable ex = null;
2256 if ((r instanceof AltResult) &&
2257 (ex = ((AltResult)r).ex) != null)
2258 dst.completeExceptionally(new RuntimeException(ex));
2259 else if ((s instanceof AltResult) &&
2260 (ex = ((AltResult)s).ex) != null)
2261 dst.completeExceptionally(new RuntimeException(ex));
2262 else {
2263 try {
2264 if (executor != null)
2265 executor.execute(new AsyncRunnable(action, dst));
2266 else {
2267 action.run();
2268 dst.complete(null);
2269 }
2270 } catch (Throwable rex) {
2271 dst.completeExceptionally(rex);
2272 }
2273 }
2274 }
2275 if (r != null || result != null)
2276 postComplete();
2277 if (s != null || other.result != null)
2278 other.postComplete();
2279 return dst;
2280 }
2281
2282 private <U> CompletableFuture<U> orFunction(CompletableFuture<? extends T> other,
2283 Function<? super T, U> fn,
2284 Executor executor) {
2285 if (other == null || fn == null) throw new NullPointerException();
2286 CompletableFuture<U> dst = new CompletableFuture<U>();
2287 OrFunction<T,U> d = null;
2288 Object r;
2289 if ((r = result) == null && (r = other.result) == null) {
2290 d = new OrFunction<T,U>(this, other, fn, dst, executor);
2291 CompletionNode q = null, p = new CompletionNode(d);
2292 while ((r = result) == null && (r = other.result) == null) {
2293 if (q != null) {
2294 if (UNSAFE.compareAndSwapObject
2295 (other, COMPLETIONS, q.next = other.completions, q))
2296 break;
2297 }
2298 else if (UNSAFE.compareAndSwapObject
2299 (this, COMPLETIONS, p.next = completions, p))
2300 q = new CompletionNode(d);
2301 }
2302 }
2303 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2304 T t; Throwable ex = null;
2305 if (r instanceof AltResult) {
2306 if ((ex = ((AltResult)r).ex) != null)
2307 dst.completeExceptionally(new RuntimeException(ex));
2308 t = null;
2309 }
2310 else
2311 t = (T) r;
2312 if (ex == null) {
2313 try {
2314 if (executor != null)
2315 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
2316 else
2317 dst.complete(fn.apply(t));
2318 } catch (Throwable rex) {
2319 dst.completeExceptionally(rex);
2320 }
2321 }
2322 }
2323 if (result != null)
2324 postComplete();
2325 if (other.result != null)
2326 other.postComplete();
2327 return dst;
2328 }
2329
2330 private CompletableFuture<Void> orBlock(CompletableFuture<? extends T> other,
2331 Block<? super T> fn,
2332 Executor executor) {
2333 if (other == null || fn == null) throw new NullPointerException();
2334 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2335 OrBlock<T> d = null;
2336 Object r;
2337 if ((r = result) == null && (r = other.result) == null) {
2338 d = new OrBlock<T>(this, other, fn, dst, executor);
2339 CompletionNode q = null, p = new CompletionNode(d);
2340 while ((r = result) == null && (r = other.result) == null) {
2341 if (q != null) {
2342 if (UNSAFE.compareAndSwapObject
2343 (other, COMPLETIONS, q.next = other.completions, q))
2344 break;
2345 }
2346 else if (UNSAFE.compareAndSwapObject
2347 (this, COMPLETIONS, p.next = completions, p))
2348 q = new CompletionNode(d);
2349 }
2350 }
2351 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2352 T t; Throwable ex = null;
2353 if (r instanceof AltResult) {
2354 if ((ex = ((AltResult)r).ex) != null)
2355 dst.completeExceptionally(new RuntimeException(ex));
2356 t = null;
2357 }
2358 else
2359 t = (T) r;
2360 if (ex == null) {
2361 try {
2362 if (executor != null)
2363 executor.execute(new AsyncBlock<T>(t, fn, dst));
2364 else {
2365 fn.accept(t);
2366 dst.complete(null);
2367 }
2368 } catch (Throwable rex) {
2369 dst.completeExceptionally(rex);
2370 }
2371 }
2372 }
2373 if (result != null)
2374 postComplete();
2375 if (other.result != null)
2376 other.postComplete();
2377 return dst;
2378 }
2379
2380 private CompletableFuture<Void> orRunnable(CompletableFuture<?> other,
2381 Runnable action,
2382 Executor executor) {
2383 if (other == null || action == null) throw new NullPointerException();
2384 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2385 OrRunnable<T> d = null;
2386 Object r;
2387 if ((r = result) == null && (r = other.result) == null) {
2388 d = new OrRunnable<T>(this, other, action, dst, executor);
2389 CompletionNode q = null, p = new CompletionNode(d);
2390 while ((r = result) == null && (r = other.result) == null) {
2391 if (q != null) {
2392 if (UNSAFE.compareAndSwapObject
2393 (other, COMPLETIONS, q.next = other.completions, q))
2394 break;
2395 }
2396 else if (UNSAFE.compareAndSwapObject
2397 (this, COMPLETIONS, p.next = completions, p))
2398 q = new CompletionNode(d);
2399 }
2400 }
2401 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2402 Throwable ex = null;
2403 if ((r instanceof AltResult) &&
2404 (ex = ((AltResult)r).ex) != null)
2405 dst.completeExceptionally(new RuntimeException(ex));
2406 else {
2407 try {
2408 if (executor != null)
2409 executor.execute(new AsyncRunnable(action, dst));
2410 else {
2411 action.run();
2412 dst.complete(null);
2413 }
2414 } catch (Throwable rex) {
2415 dst.completeExceptionally(rex);
2416 }
2417 }
2418 }
2419 if (result != null)
2420 postComplete();
2421 if (other.result != null)
2422 other.postComplete();
2423 return dst;
2424 }
2425
2426 // Unsafe mechanics
2427 private static final sun.misc.Unsafe UNSAFE;
2428 private static final long RESULT;
2429 private static final long WAITERS;
2430 private static final long COMPLETIONS;
2431 static {
2432 try {
2433 UNSAFE = sun.misc.Unsafe.getUnsafe();
2434 Class<?> k = CompletableFuture.class;
2435 RESULT = UNSAFE.objectFieldOffset
2436 (k.getDeclaredField("result"));
2437 WAITERS = UNSAFE.objectFieldOffset
2438 (k.getDeclaredField("waiters"));
2439 COMPLETIONS = UNSAFE.objectFieldOffset
2440 (k.getDeclaredField("completions"));
2441 } catch (Exception e) {
2442 throw new Error(e);
2443 }
2444 }
2445
2446 }