ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.14
Committed: Sun Dec 30 03:09:46 2012 UTC (11 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.13: +5 -5 lines
Log Message:
punctuation

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Block;
10 import java.util.function.BiBlock;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>Similar methods are available for Functions, Blocks, and
31 * Runnables, depending on whether actions require arguments and/or
32 * produce results. Functions and actions supplied for dependent
33 * completions (mainly using methods with prefix {@code then}) may be
34 * performed by the thread that completes the current
35 * CompletableFuture, or by any other caller of these methods. There
36 * are no guarantees about the order of processing completions unless
37 * constrained by method {@code then} and related methods.
38 *
39 * <p>When two or more threads attempt to {@link #complete} or {@link
40 * #completeExceptionally} a CompletableFuture, only one of them will
41 * succeed. Upon exceptional completion, or when a completion entails
42 * computation of a function or action, and it terminates abruptly
43 * with an exception, then further completions act as {@code
44 * completeExceptionally} with that exception.
45 *
46 * <p>CompletableFutures themselves do not execute asynchronously.
47 * However, the {@code async} methods provide commonly useful ways to
48 * commence asynchronous processing, using either a given {@link
49 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
50 * function or action that will result in the completion of a new
51 * CompletableFuture.
52 *
53 * @author Doug Lea
54 * @since 1.8
55 */
56 public class CompletableFuture<T> implements Future<T> {
57 /*
58 * Quick overview (more to come):
59 *
60 * 1. Non-nullness of field result indicates done. An AltResult is
61 * used to box null as a result, as well as to hold exceptions.
62 *
63 * 2. Waiters are held in a Treiber stack similar to the one used
64 * in FutureTask.
65 *
66 * 3. Completions are also kept in a list/stack, and pulled off
67 * and run when completion is triggered.
68 */
69
70 static final class AltResult {
71 final Throwable ex; // null only for NIL
72 AltResult(Throwable ex) { this.ex = ex; }
73 }
74
75 static final AltResult NIL = new AltResult(null);
76
77 /**
78 * Simple linked list nodes to record waiting threads in a Treiber
79 * stack. See other classes such as Phaser and SynchronousQueue
80 * for more detailed explanation.
81 */
82 static final class WaitNode {
83 volatile Thread thread;
84 volatile WaitNode next;
85 }
86
87 /**
88 * Simple linked list nodes to record completions, used in
89 * basically the same way as WaitNodes
90 */
91 static final class CompletionNode {
92 final Completion completion;
93 volatile CompletionNode next;
94 CompletionNode(Completion completion) { this.completion = completion; }
95 }
96
97
98 volatile Object result; // either the result or boxed AltResult
99 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
100 volatile CompletionNode completions; // list (Treiber stack) of completions
101
102 /**
103 * Creates a new incomplete CompletableFuture.
104 */
105 public CompletableFuture() {
106 }
107
108 /**
109 * Asynchronously executes in the {@link
110 * ForkJoinPool#commonPool()}, a task that completes the returned
111 * CompletableFuture with the result of the given Supplier.
112 *
113 * @param supplier a function returning the value to be used
114 * to complete the returned CompletableFuture
115 * @return the CompletableFuture
116 */
117 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
118 if (supplier == null) throw new NullPointerException();
119 CompletableFuture<U> f = new CompletableFuture<U>();
120 ForkJoinPool.commonPool().
121 execute((ForkJoinTask<?>)new AsyncSupplier<U>(supplier, f));
122 return f;
123 }
124
125 /**
126 * Asynchronously executes using the given executor, a task that
127 * completes the returned CompletableFuture with the result of the
128 * given Supplier.
129 *
130 * @param supplier a function returning the value to be used
131 * to complete the returned CompletableFuture
132 * @param executor the executor to use for asynchronous execution
133 * @return the CompletableFuture
134 */
135 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
136 Executor executor) {
137 if (executor == null || supplier == null)
138 throw new NullPointerException();
139 CompletableFuture<U> f = new CompletableFuture<U>();
140 executor.execute(new AsyncSupplier<U>(supplier, f));
141 return f;
142 }
143
144 /**
145 * Asynchronously executes in the {@link
146 * ForkJoinPool#commonPool()} a task that runs the given action,
147 * and then completes the returned CompletableFuture.
148 *
149 * @param runnable the action to run before completing the
150 * returned CompletableFuture
151 * @return the CompletableFuture
152 */
153 public static CompletableFuture<Void> runAsync(Runnable runnable) {
154 if (runnable == null) throw new NullPointerException();
155 CompletableFuture<Void> f = new CompletableFuture<Void>();
156 ForkJoinPool.commonPool().
157 execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
158 return f;
159 }
160
161 /**
162 * Asynchronously executes using the given executor, a task that
163 * runs the given action, and then completes the returned
164 * CompletableFuture.
165 *
166 * @param runnable the action to run before completing the
167 * returned CompletableFuture
168 * @param executor the executor to use for asynchronous execution
169 * @return the CompletableFuture
170 */
171 public static CompletableFuture<Void> runAsync(Runnable runnable,
172 Executor executor) {
173 if (executor == null || runnable == null)
174 throw new NullPointerException();
175 CompletableFuture<Void> f = new CompletableFuture<Void>();
176 executor.execute(new AsyncRunnable(runnable, f));
177 return f;
178 }
179
180 /**
181 * Returns {@code true} if completed in any fashion: normally,
182 * exceptionally, or cancellation.
183 *
184 * @return {@code true} if completed
185 */
186 public boolean isDone() {
187 return result != null;
188 }
189
190 /**
191 * Returns the result value when complete, or throws an
192 * (unchecked) exception if completed exceptionally. To better
193 * conform with the use of common functional forms, this method
194 * transforms any checked exception possible with {@link
195 * Future#get} into an (unchecked) {@link RuntimeException} with
196 * the underlying exception as its cause. (The checked exception
197 * convention is available using the timed form of get.)
198 *
199 * @return the result value
200 */
201 public T get() {
202 Object r; Throwable ex;
203 if ((r = result) == null)
204 return waitingGet();
205 if (r instanceof AltResult) {
206 if ((ex = ((AltResult)r).ex) != null) {
207 if (ex instanceof Error)
208 throw (Error)ex;
209 if (ex instanceof RuntimeException)
210 throw (RuntimeException)ex;
211 throw new RuntimeException(ex);
212 }
213 return null;
214 }
215 return (T)r;
216 }
217
218 /**
219 * Returns the result value (or throws any encountered exception)
220 * if completed, else returns the given valueIfAbsent.
221 *
222 * @param valueIfAbsent the value to return if not completed
223 * @return the result value, if completed, else the given valueIfAbsent
224 */
225 public T getNow(T valueIfAbsent) {
226 Object r; Throwable ex;
227 if ((r = result) == null)
228 return valueIfAbsent;
229 if (r instanceof AltResult) {
230 if ((ex = ((AltResult)r).ex) != null) {
231 if (ex instanceof Error)
232 throw (Error)ex;
233 if (ex instanceof RuntimeException)
234 throw (RuntimeException)ex;
235 throw new RuntimeException(ex);
236 }
237 return null;
238 }
239 return (T)r;
240 }
241
242 /**
243 * Waits if necessary for at most the given time for completion,
244 * and then retrieves its result, if available.
245 *
246 * @param timeout the maximum time to wait
247 * @param unit the time unit of the timeout argument
248 * @return the computed result
249 * @throws CancellationException if the computation was cancelled
250 * @throws ExecutionException if the computation threw an
251 * exception
252 * @throws InterruptedException if the current thread was interrupted
253 * while waiting
254 * @throws TimeoutException if the wait timed out
255 */
256 public T get(long timeout, TimeUnit unit)
257 throws InterruptedException, ExecutionException, TimeoutException {
258 Object r; Throwable ex;
259 long nanos = unit.toNanos(timeout);
260 if (Thread.interrupted())
261 throw new InterruptedException();
262 if ((r = result) == null)
263 r = timedAwaitDone(nanos);
264 if (r instanceof AltResult) {
265 if ((ex = ((AltResult)r).ex) != null) {
266 if (ex instanceof ExecutionException) // avoid re-wrap
267 throw (ExecutionException)ex;
268 throw new ExecutionException(ex);
269 }
270 return null;
271 }
272 return (T)r;
273 }
274
275 /**
276 * If not already completed, sets the value returned by {@link
277 * #get()} and related methods to the given value.
278 *
279 * @param value the result value
280 * @return true if this invocation caused this CompletableFuture
281 * to transition to a completed state, else false
282 */
283 public boolean complete(T value) {
284 if (result == null &&
285 UNSAFE.compareAndSwapObject(this, RESULT, null,
286 (value == null) ? NIL : value)) {
287 postComplete();
288 return true;
289 }
290 return false;
291 }
292
293 /**
294 * If not already completed, causes invocations of {@link #get()}
295 * and related methods to throw the given exception.
296 *
297 * @param ex the exception
298 * @return true if this invocation caused this CompletableFuture
299 * to transition to a completed state, else false
300 */
301 public boolean completeExceptionally(Throwable ex) {
302 if (ex == null) throw new NullPointerException();
303 if (result == null) {
304 Object r = new AltResult(ex);
305 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
306 postComplete();
307 return true;
308 }
309 }
310 return false;
311 }
312
313 /**
314 * Creates and returns a CompletableFuture that is completed with
315 * the result of the given function of this CompletableFuture.
316 * If this CompletableFuture completes exceptionally,
317 * then the returned CompletableFuture also does so,
318 * with a RuntimeException having this exception as
319 * its cause.
320 *
321 * @param fn the function to use to compute the value of
322 * the returned CompletableFuture
323 * @return the new CompletableFuture
324 */
325 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
326 return thenFunction(fn, null);
327 }
328
329 /**
330 * Creates and returns a CompletableFuture that is asynchronously
331 * completed using the {@link ForkJoinPool#commonPool()} with the
332 * result of the given function of this CompletableFuture. If
333 * this CompletableFuture completes exceptionally, then the
334 * returned CompletableFuture also does so, with a
335 * RuntimeException having this exception as its cause.
336 *
337 * @param fn the function to use to compute the value of
338 * the returned CompletableFuture
339 * @return the new CompletableFuture
340 */
341 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
342 return thenFunction(fn, ForkJoinPool.commonPool());
343 }
344
345 /**
346 * Creates and returns a CompletableFuture that is asynchronously
347 * completed using the given executor with the result of the given
348 * function of this CompletableFuture. If this CompletableFuture
349 * completes exceptionally, then the returned CompletableFuture
350 * also does so, with a RuntimeException having this exception as
351 * its cause.
352 *
353 * @param fn the function to use to compute the value of
354 * the returned CompletableFuture
355 * @param executor the executor to use for asynchronous execution
356 * @return the new CompletableFuture
357 */
358 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
359 Executor executor) {
360 if (executor == null) throw new NullPointerException();
361 return thenFunction(fn, executor);
362 }
363
364 /**
365 * Creates and returns a CompletableFuture that is completed after
366 * performing the given action with this CompletableFuture's
367 * result when it completes. If this CompletableFuture
368 * completes exceptionally, then the returned CompletableFuture
369 * also does so, with a RuntimeException having this exception as
370 * its cause.
371 *
372 * @param block the action to perform before completing the
373 * returned CompletableFuture
374 * @return the new CompletableFuture
375 */
376 public CompletableFuture<Void> thenAccept(Block<? super T> block) {
377 return thenBlock(block, null);
378 }
379
380 /**
381 * Creates and returns a CompletableFuture that is asynchronously
382 * completed using the {@link ForkJoinPool#commonPool()} with this
383 * CompletableFuture's result when it completes. If this
384 * CompletableFuture completes exceptionally, then the returned
385 * CompletableFuture also does so, with a RuntimeException having
386 * this exception as its cause.
387 *
388 * @param block the action to perform before completing the
389 * returned CompletableFuture
390 * @return the new CompletableFuture
391 */
392 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block) {
393 return thenBlock(block, ForkJoinPool.commonPool());
394 }
395
396 /**
397 * Creates and returns a CompletableFuture that is asynchronously
398 * completed using the given executor with this
399 * CompletableFuture's result when it completes. If this
400 * CompletableFuture completes exceptionally, then the returned
401 * CompletableFuture also does so, with a RuntimeException having
402 * this exception as its cause.
403 *
404 * @param block the action to perform before completing the
405 * returned CompletableFuture
406 * @param executor the executor to use for asynchronous execution
407 * @return the new CompletableFuture
408 */
409 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block,
410 Executor executor) {
411 if (executor == null) throw new NullPointerException();
412 return thenBlock(block, executor);
413 }
414
415 /**
416 * Creates and returns a CompletableFuture that is completed after
417 * performing the given action when this CompletableFuture
418 * completes. If this CompletableFuture completes exceptionally,
419 * then the returned CompletableFuture also does so, with a
420 * RuntimeException having this exception as its cause.
421 *
422 * @param action the action to perform before completing the
423 * returned CompletableFuture
424 * @return the new CompletableFuture
425 */
426 public CompletableFuture<Void> thenRun(Runnable action) {
427 return thenRunnable(action, null);
428 }
429
430 /**
431 * Creates and returns a CompletableFuture that is asynchronously
432 * completed using the {@link ForkJoinPool#commonPool()} after
433 * performing the given action when this CompletableFuture
434 * completes. If this CompletableFuture completes exceptionally,
435 * then the returned CompletableFuture also does so, with a
436 * RuntimeException having this exception as its cause.
437 *
438 * @param action the action to perform before completing the
439 * returned CompletableFuture
440 * @return the new CompletableFuture
441 */
442 public CompletableFuture<Void> thenRunAsync(Runnable action) {
443 return thenRunnable(action, ForkJoinPool.commonPool());
444 }
445
446 /**
447 * Creates and returns a CompletableFuture that is asynchronously
448 * completed using the given executor after performing the given
449 * action when this CompletableFuture completes. If this
450 * CompletableFuture completes exceptionally, then the returned
451 * CompletableFuture also does so, with a RuntimeException having
452 * this exception as its cause.
453 *
454 * @param action the action to perform before completing the
455 * returned CompletableFuture
456 * @param executor the executor to use for asynchronous execution
457 * @return the new CompletableFuture
458 */
459 public CompletableFuture<Void> thenRunAsync(Runnable action,
460 Executor executor) {
461 if (executor == null) throw new NullPointerException();
462 return thenRunnable(action, executor);
463 }
464
465 /**
466 * Creates and returns a CompletableFuture that is completed with
467 * the result of the given function of this and the other given
468 * CompletableFuture's results when both complete. If this or
469 * the other CompletableFuture complete exceptionally, then the
470 * returned CompletableFuture also does so, with a
471 * RuntimeException having the exception as its cause.
472 *
473 * @param other the other CompletableFuture
474 * @param fn the function to use to compute the value of
475 * the returned CompletableFuture
476 * @return the new CompletableFuture
477 */
478 public <U,V> CompletableFuture<V> thenApply(CompletableFuture<? extends U> other,
479 BiFunction<? super T,? super U,? extends V> fn) {
480 return andFunction(other, fn, null);
481 }
482
483 /**
484 * Creates and returns a CompletableFuture that is asynchronously
485 * completed using the {@link ForkJoinPool#commonPool()} with
486 * the result of the given function of this and the other given
487 * CompletableFuture's results when both complete. If this or
488 * the other CompletableFuture complete exceptionally, then the
489 * returned CompletableFuture also does so, with a
490 * RuntimeException having the exception as its cause.
491 *
492 * @param other the other CompletableFuture
493 * @param fn the function to use to compute the value of
494 * the returned CompletableFuture
495 * @return the new CompletableFuture
496 */
497 public <U,V> CompletableFuture<V> thenApplyAsync(CompletableFuture<? extends U> other,
498 BiFunction<? super T,? super U,? extends V> fn) {
499 return andFunction(other, fn, ForkJoinPool.commonPool());
500 }
501
502 /**
503 * Creates and returns a CompletableFuture that is
504 * asynchronously completed using the given executor with the
505 * result of the given function of this and the other given
506 * CompletableFuture's results when both complete. If this or
507 * the other CompletableFuture complete exceptionally, then the
508 * returned CompletableFuture also does so, with a
509 * RuntimeException having the exception as its cause.
510 *
511 * @param other the other CompletableFuture
512 * @param fn the function to use to compute the value of
513 * the returned CompletableFuture
514 * @param executor the executor to use for asynchronous execution
515 * @return the new CompletableFuture
516 */
517
518 public <U,V> CompletableFuture<V> thenApplyAsync(CompletableFuture<? extends U> other,
519 BiFunction<? super T,? super U,? extends V> fn,
520 Executor executor) {
521 if (executor == null) throw new NullPointerException();
522 return andFunction(other, fn, executor);
523 }
524
525 /**
526 * Creates and returns a CompletableFuture that is completed with
527 * the results of this and the other given CompletableFuture if
528 * both complete. If this and/or the other CompletableFuture
529 * complete exceptionally, then the returned CompletableFuture
530 * also does so, with a RuntimeException having the one of the
531 * exceptions as its cause.
532 *
533 * @param other the other CompletableFuture
534 * @param block the action to perform before completing the
535 * returned CompletableFuture
536 * @return the new CompletableFuture
537 */
538 public <U> CompletableFuture<Void> thenAccept(CompletableFuture<? extends U> other,
539 BiBlock<? super T, ? super U> block) {
540 return andBlock(other, block, null);
541 }
542
543 /**
544 * Creates and returns a CompletableFuture that is completed
545 * asynchronously using the {@link ForkJoinPool#commonPool()} with
546 * the results of this and the other given CompletableFuture when
547 * both complete. If this and/or the other CompletableFuture
548 * complete exceptionally, then the returned CompletableFuture
549 * also does so, with a RuntimeException having the one of the
550 * exceptions as its cause.
551 *
552 * @param other the other CompletableFuture
553 * @param block the action to perform before completing the
554 * returned CompletableFuture
555 * @return the new CompletableFuture
556 */
557 public <U> CompletableFuture<Void> thenAcceptAsync(CompletableFuture<? extends U> other,
558 BiBlock<? super T, ? super U> block) {
559 return andBlock(other, block, ForkJoinPool.commonPool());
560 }
561
562 /**
563 * Creates and returns a CompletableFuture that is completed
564 * asynchronously using the given executor with the results of
565 * this and the other given CompletableFuture when both complete.
566 * If this and/or the other CompletableFuture complete
567 * exceptionally, then the returned CompletableFuture also does
568 * so, with a RuntimeException having the one of the exceptions as
569 * its cause.
570 *
571 * @param other the other CompletableFuture
572 * @param block the action to perform before completing the
573 * returned CompletableFuture
574 * @param executor the executor to use for asynchronous execution
575 * @return the new CompletableFuture
576 */
577 public <U> CompletableFuture<Void> thenAcceptAsync(CompletableFuture<? extends U> other,
578 BiBlock<? super T, ? super U> block,
579 Executor executor) {
580 if (executor == null) throw new NullPointerException();
581 return andBlock(other, block, executor);
582 }
583
584 /**
585 * Creates and returns a CompletableFuture that is completed
586 * when this and the other given CompletableFuture both
587 * complete. If this and/or the other CompletableFuture complete
588 * exceptionally, then the returned CompletableFuture also does
589 * so, with a RuntimeException having the one of the exceptions as
590 * its cause.
591 *
592 * @param other the other CompletableFuture
593 * @param action the action to perform before completing the
594 * returned CompletableFuture
595 * @return the new CompletableFuture
596 */
597 public CompletableFuture<Void> thenRun(CompletableFuture<?> other,
598 Runnable action) {
599 return andRunnable(other, action, null);
600 }
601
602 /**
603 * Creates and returns a CompletableFuture that is completed
604 * asynchronously using the {@link ForkJoinPool#commonPool()}
605 * when this and the other given CompletableFuture both
606 * complete. If this and/or the other CompletableFuture complete
607 * exceptionally, then the returned CompletableFuture also does
608 * so, with a RuntimeException having the one of the exceptions as
609 * its cause.
610 *
611 * @param other the other CompletableFuture
612 * @param action the action to perform before completing the
613 * returned CompletableFuture
614 * @return the new CompletableFuture
615 */
616 public CompletableFuture<Void> thenRunAsync(CompletableFuture<?> other,
617 Runnable action) {
618 return andRunnable(other, action, ForkJoinPool.commonPool());
619 }
620
621 /**
622 * Creates and returns a CompletableFuture that is completed
623 * asynchronously using the given executor
624 * when this and the other given CompletableFuture both
625 * complete. If this and/or the other CompletableFuture complete
626 * exceptionally, then the returned CompletableFuture also does
627 * so, with a RuntimeException having the one of the exceptions as
628 * its cause.
629 *
630 * @param other the other CompletableFuture
631 * @param action the action to perform before completing the
632 * returned CompletableFuture
633 * @param executor the executor to use for asynchronous execution
634 * @return the new CompletableFuture
635 */
636 public CompletableFuture<Void> thenRunAsync(CompletableFuture<?> other,
637 Runnable action,
638 Executor executor) {
639 if (executor == null) throw new NullPointerException();
640 return andRunnable(other, action, executor);
641 }
642
643 /**
644 * Creates and returns a CompletableFuture that is completed with
645 * the result of the given function of either this or the other
646 * given CompletableFuture's results when either complete. If
647 * this and/or the other CompletableFuture complete exceptionally,
648 * then the returned CompletableFuture may also do so, with a
649 * RuntimeException having one of these exceptions as its cause.
650 * No guarantees are made about which result or exception is used
651 * in the returned CompletableFuture.
652 *
653 * @param other the other CompletableFuture
654 * @param fn the function to use to compute the value of
655 * the returned CompletableFuture
656 * @return the new CompletableFuture
657 */
658 public <U> CompletableFuture<U> orApply(CompletableFuture<? extends T> other,
659 Function<? super T, U> fn) {
660 return orFunction(other, fn, null);
661 }
662
663 /**
664 * Creates and returns a CompletableFuture that is completed
665 * asynchronously using the {@link ForkJoinPool#commonPool()} with
666 * the result of the given function of either this or the other
667 * given CompletableFuture's results when either complete. If
668 * this and/or the other CompletableFuture complete exceptionally,
669 * then the returned CompletableFuture may also do so, with a
670 * RuntimeException having one of these exceptions as its cause.
671 * No guarantees are made about which result or exception is used
672 * in the returned CompletableFuture.
673 *
674 * @param other the other CompletableFuture
675 * @param fn the function to use to compute the value of
676 * the returned CompletableFuture
677 * @return the new CompletableFuture
678 */
679 public <U> CompletableFuture<U> orApplyAsync(CompletableFuture<? extends T> other,
680 Function<? super T, U> fn) {
681 return orFunction(other, fn, ForkJoinPool.commonPool());
682 }
683
684 /**
685 * Creates and returns a CompletableFuture that is completed
686 * asynchronously using the given executor with the result of the
687 * given function of either this or the other given
688 * CompletableFuture's results when either complete. If this
689 * and/or the other CompletableFuture complete exceptionally, then
690 * the returned CompletableFuture may also do so, with a
691 * RuntimeException having one of these exceptions as its cause.
692 * No guarantees are made about which result or exception is used
693 * in the returned CompletableFuture.
694 *
695 * @param other the other CompletableFuture
696 * @param fn the function to use to compute the value of
697 * the returned CompletableFuture
698 * @param executor the executor to use for asynchronous execution
699 * @return the new CompletableFuture
700 */
701 public <U> CompletableFuture<U> orApplyAsync(CompletableFuture<? extends T> other,
702 Function<? super T, U> fn,
703 Executor executor) {
704 if (executor == null) throw new NullPointerException();
705 return orFunction(other, fn, executor);
706 }
707
708 /**
709 * Creates and returns a CompletableFuture that is completed after
710 * performing the given action with the result of either this or the
711 * other given CompletableFuture's result, when either complete.
712 * If this and/or the other CompletableFuture complete
713 * exceptionally, then the returned CompletableFuture may also do
714 * so, with a RuntimeException having one of these exceptions as
715 * its cause. No guarantees are made about which exception is
716 * used in the returned CompletableFuture.
717 *
718 * @param other the other CompletableFuture
719 * @param block the action to perform before completing the
720 * returned CompletableFuture
721 * @return the new CompletableFuture
722 */
723 public CompletableFuture<Void> orAccept(CompletableFuture<? extends T> other,
724 Block<? super T> block) {
725 return orBlock(other, block, null);
726 }
727
728 /**
729 * Creates and returns a CompletableFuture that is completed
730 * asynchronously using the {@link ForkJoinPool#commonPool()},
731 * performing the given action with the result of either this or
732 * the other given CompletableFuture's result, when either
733 * complete. If this and/or the other CompletableFuture complete
734 * exceptionally, then the returned CompletableFuture may also do
735 * so, with a RuntimeException having one of these exceptions as
736 * its cause. No guarantees are made about which exception is
737 * used in the returned CompletableFuture.
738 *
739 * @param other the other CompletableFuture
740 * @param block the action to perform before completing the
741 * returned CompletableFuture
742 * @return the new CompletableFuture
743 */
744 public CompletableFuture<Void> orAcceptAsync(CompletableFuture<? extends T> other,
745 Block<? super T> block) {
746 return orBlock(other, block, ForkJoinPool.commonPool());
747 }
748
749 /**
750 * Creates and returns a CompletableFuture that is completed
751 * asynchronously using the given executor,
752 * performing the given action with the result of either this or
753 * the other given CompletableFuture's result, when either
754 * complete. If this and/or the other CompletableFuture complete
755 * exceptionally, then the returned CompletableFuture may also do
756 * so, with a RuntimeException having one of these exceptions as
757 * its cause. No guarantees are made about which exception is
758 * used in the returned CompletableFuture.
759 *
760 * @param other the other CompletableFuture
761 * @param block the action to perform before completing the
762 * returned CompletableFuture
763 * @param executor the executor to use for asynchronous execution
764 * @return the new CompletableFuture
765 */
766 public CompletableFuture<Void> orRunAsync(CompletableFuture<? extends T> other,
767 Block<? super T> block,
768 Executor executor) {
769 if (executor == null) throw new NullPointerException();
770 return orBlock(other, block, executor);
771 }
772
773 /**
774 * Creates and returns a CompletableFuture that is completed
775 * after this or the other given CompletableFuture complete. If
776 * this and/or the other CompletableFuture complete exceptionally,
777 * then the returned CompletableFuture may also do so, with a
778 * RuntimeException having one of these exceptions as its cause.
779 * No guarantees are made about which exception is used in the
780 * returned CompletableFuture.
781 *
782 * @param other the other CompletableFuture
783 * @param action the action to perform before completing the
784 * returned CompletableFuture
785 * @return the new CompletableFuture
786 */
787 public CompletableFuture<Void> orRun(CompletableFuture<?> other,
788 Runnable action) {
789 return orRunnable(other, action, null);
790 }
791
792 /**
793 * Creates and returns a CompletableFuture that is completed
794 * asynchronously using the {@link ForkJoinPool#commonPool()}
795 * after this or the other given CompletableFuture complete. If
796 * this and/or the other CompletableFuture complete exceptionally,
797 * then the returned CompletableFuture may also do so, with a
798 * RuntimeException having one of these exceptions as its cause.
799 * No guarantees are made about which exception is used in the
800 * returned CompletableFuture.
801 *
802 * @param other the other CompletableFuture
803 * @param action the action to perform before completing the
804 * returned CompletableFuture
805 * @return the new CompletableFuture
806 */
807 public CompletableFuture<Void> orRunAsync(CompletableFuture<?> other,
808 Runnable action) {
809 return orRunnable(other, action, ForkJoinPool.commonPool());
810 }
811
812 /**
813 * Creates and returns a CompletableFuture that is completed
814 * asynchronously using the given executor after this or the other
815 * given CompletableFuture complete. If this and/or the other
816 * CompletableFuture complete exceptionally, then the returned
817 * CompletableFuture may also do so, with a RuntimeException
818 * having one of these exceptions as its cause. No guarantees are
819 * made about which exception is used in the returned
820 * CompletableFuture.
821 *
822 * @param other the other CompletableFuture
823 * @param action the action to perform before completing the
824 * returned CompletableFuture
825 * @param executor the executor to use for asynchronous execution
826 * @return the new CompletableFuture
827 */
828 public CompletableFuture<Void> orRunAsync(CompletableFuture<?> other,
829 Runnable action,
830 Executor executor) {
831 if (executor == null) throw new NullPointerException();
832 return orRunnable(other, action, executor);
833 }
834
835 /**
836 * Creates and returns a CompletableFuture that is completed with
837 * the result of the given function of the exception triggering
838 * this CompletableFuture's completion when it completes
839 * exceptionally; Otherwise, if this CompletableFuture completes
840 * normally, then the returned CompletableFuture also completes
841 * normally with the same value.
842 *
843 * @param fn the function to use to compute the value of the
844 * returned CompletableFuture if this CompletableFuture completed
845 * exceptionally
846 * @return the new CompletableFuture
847 */
848 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
849 if (fn == null) throw new NullPointerException();
850 CompletableFuture<T> dst = new CompletableFuture<T>();
851 ExceptionAction<T> d = null;
852 Object r;
853 if ((r = result) == null) {
854 CompletionNode p =
855 new CompletionNode(d = new ExceptionAction<T>(this, fn, dst));
856 while ((r = result) == null) {
857 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
858 p.next = completions, p))
859 break;
860 }
861 }
862 if (r != null && (d == null || d.compareAndSet(0, 1))) {
863 T t; Throwable ex = null;
864 if (r instanceof AltResult) {
865 if ((ex = ((AltResult)r).ex) != null) {
866 try {
867 dst.complete(fn.apply(ex));
868 } catch (Throwable rex) {
869 dst.completeExceptionally(rex);
870 }
871 }
872 t = null;
873 }
874 else
875 t = (T) r;
876 if (ex == null)
877 dst.complete(t);
878 }
879 if (r != null)
880 postComplete();
881 return dst;
882 }
883
884 /**
885 * Attempts to complete this CompletableFuture with
886 * a {@link CancellationException}.
887 *
888 * @param mayInterruptIfRunning this value has no effect in this
889 * implementation because interrupts are not used to control
890 * processing.
891 *
892 * @return {@code true} if this task is now cancelled
893 */
894 public boolean cancel(boolean mayInterruptIfRunning) {
895 Object r;
896 while ((r = result) == null) {
897 r = new AltResult(new CancellationException());
898 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
899 postComplete();
900 return true;
901 }
902 }
903 return ((r instanceof AltResult) &&
904 (((AltResult)r).ex instanceof CancellationException));
905 }
906
907 /**
908 * Returns {@code true} if this CompletableFuture was cancelled
909 * before it completed normally.
910 *
911 * @return {@code true} if this CompletableFuture was cancelled
912 * before it completed normally
913 */
914 public boolean isCancelled() {
915 Object r;
916 return ((r = result) != null &&
917 (r instanceof AltResult) &&
918 (((AltResult)r).ex instanceof CancellationException));
919 }
920
921 /**
922 * Forcibly sets or resets the value subsequently returned by
923 * method get() and related methods, whether or not already
924 * completed. This method is designed for use only in error
925 * recovery actions, and even in such situations may result in
926 * ongoing dependent completions using established versus
927 * overwritten values.
928 *
929 * @param value the completion value
930 */
931 public void obtrudeValue(T value) {
932 result = (value == null) ? NIL : value;
933 postComplete();
934 }
935
936 /**
937 * Removes and signals all waiting threads and runs all completions.
938 */
939 private void postComplete() {
940 WaitNode q; Thread t;
941 while ((q = waiters) != null) {
942 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
943 (t = q.thread) != null) {
944 q.thread = null;
945 LockSupport.unpark(t);
946 }
947 }
948
949 CompletionNode h; Completion c;
950 while ((h = completions) != null) {
951 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
952 (c = h.completion) != null)
953 c.run();
954 }
955 }
956
957 /* ------------- waiting for completions -------------- */
958
959 /**
960 * Heuristic spin value for waitingGet() before blocking on
961 * multiprocessors
962 */
963 static final int WAITING_GET_SPINS = 256;
964
965 /**
966 * Returns result after waiting.
967 */
968 private T waitingGet() {
969 WaitNode q = null;
970 boolean queued = false, interrupted = false;
971 int h = 0, spins = 0;
972 for (Object r;;) {
973 if ((r = result) != null) {
974 Throwable ex;
975 if (q != null) // suppress unpark
976 q.thread = null;
977 postComplete(); // help release others
978 if (interrupted)
979 Thread.currentThread().interrupt();
980 if (r instanceof AltResult) {
981 if ((ex = ((AltResult)r).ex) != null) {
982 if (ex instanceof Error)
983 throw (Error)ex;
984 if (ex instanceof RuntimeException)
985 throw (RuntimeException)ex;
986 throw new RuntimeException(ex);
987 }
988 return null;
989 }
990 return (T)r;
991 }
992 else if (h == 0) {
993 h = ThreadLocalRandom.current().nextInt();
994 if (Runtime.getRuntime().availableProcessors() > 1)
995 spins = WAITING_GET_SPINS;
996 }
997 else if (spins > 0) {
998 h ^= h << 1; // xorshift
999 h ^= h >>> 3;
1000 if ((h ^= h << 10) >= 0)
1001 --spins;
1002 }
1003 else if (q == null)
1004 q = new WaitNode();
1005 else if (!queued)
1006 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1007 q.next = waiters, q);
1008 else if (Thread.interrupted())
1009 interrupted = true;
1010 else if (q.thread == null)
1011 q.thread = Thread.currentThread();
1012 else
1013 LockSupport.park(this);
1014 }
1015 }
1016
1017 /**
1018 * Awaits completion or aborts on interrupt or timeout.
1019 *
1020 * @param nanos time to wait
1021 * @return raw result
1022 */
1023 private Object timedAwaitDone(long nanos)
1024 throws InterruptedException, TimeoutException {
1025 final long deadline = System.nanoTime() + nanos;
1026 WaitNode q = null;
1027 boolean queued = false;
1028 for (Object r;;) {
1029 if (Thread.interrupted()) {
1030 removeWaiter(q);
1031 throw new InterruptedException();
1032 }
1033 else if ((r = result) != null) {
1034 if (q != null)
1035 q.thread = null;
1036 postComplete();
1037 return r;
1038 }
1039 else if (q == null)
1040 q = new WaitNode();
1041 else if (!queued)
1042 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1043 q.next = waiters, q);
1044 else if ((nanos = deadline - System.nanoTime()) <= 0L) {
1045 removeWaiter(q);
1046 throw new TimeoutException();
1047 }
1048 else if (q.thread == null)
1049 q.thread = Thread.currentThread();
1050 else
1051 LockSupport.parkNanos(this, nanos);
1052 }
1053 }
1054
1055 /**
1056 * Tries to unlink a timed-out or interrupted wait node to avoid
1057 * accumulating garbage. Internal nodes are simply unspliced
1058 * without CAS since it is harmless if they are traversed anyway
1059 * by releasers. To avoid effects of unsplicing from already
1060 * removed nodes, the list is retraversed in case of an apparent
1061 * race. This is slow when there are a lot of nodes, but we don't
1062 * expect lists to be long enough to outweigh higher-overhead
1063 * schemes.
1064 */
1065 private void removeWaiter(WaitNode node) {
1066 if (node != null) {
1067 node.thread = null;
1068 retry:
1069 for (;;) { // restart on removeWaiter race
1070 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
1071 s = q.next;
1072 if (q.thread != null)
1073 pred = q;
1074 else if (pred != null) {
1075 pred.next = s;
1076 if (pred.thread == null) // check for race
1077 continue retry;
1078 }
1079 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
1080 continue retry;
1081 }
1082 break;
1083 }
1084 }
1085 }
1086
1087 /* ------------- Async tasks -------------- */
1088
1089 /** Base class can act as either FJ or plain Runnable */
1090 static abstract class Async extends ForkJoinTask<Void> implements Runnable {
1091 public final Void getRawResult() { return null; }
1092 public final void setRawResult(Void v) { }
1093 public final void run() { exec(); }
1094 }
1095
1096 static final class AsyncRunnable extends Async {
1097 final Runnable runnable;
1098 final CompletableFuture<Void> dst;
1099 AsyncRunnable(Runnable runnable, CompletableFuture<Void> dst) {
1100 this.runnable = runnable; this.dst = dst;
1101 }
1102 public final boolean exec() {
1103 Runnable fn;
1104 CompletableFuture<Void> d;
1105 if ((fn = this.runnable) == null || (d = this.dst) == null)
1106 throw new NullPointerException();
1107 try {
1108 fn.run();
1109 d.complete(null);
1110 } catch (Throwable ex) {
1111 d.completeExceptionally(ex);
1112 }
1113 return true;
1114 }
1115 private static final long serialVersionUID = 5232453952276885070L;
1116 }
1117
1118 static final class AsyncSupplier<U> extends Async {
1119 final Supplier<U> supplier;
1120 final CompletableFuture<U> dst;
1121 AsyncSupplier(Supplier<U> supplier, CompletableFuture<U> dst) {
1122 this.supplier = supplier; this.dst = dst;
1123 }
1124 public final boolean exec() {
1125 Supplier<U> fn;
1126 CompletableFuture<U> d;
1127 if ((fn = this.supplier) == null || (d = this.dst) == null)
1128 throw new NullPointerException();
1129 try {
1130 d.complete(fn.get());
1131 } catch (Throwable ex) {
1132 d.completeExceptionally(ex);
1133 }
1134 return true;
1135 }
1136 private static final long serialVersionUID = 5232453952276885070L;
1137 }
1138
1139 static final class AsyncFunction<T,U> extends Async {
1140 Function<? super T,? extends U> fn;
1141 T arg;
1142 final CompletableFuture<U> dst;
1143 AsyncFunction(T arg, Function<? super T,? extends U> fn,
1144 CompletableFuture<U> dst) {
1145 this.arg = arg; this.fn = fn; this.dst = dst;
1146 }
1147 public final boolean exec() {
1148 Function<? super T,? extends U> fn;
1149 CompletableFuture<U> d;
1150 if ((fn = this.fn) == null || (d = this.dst) == null)
1151 throw new NullPointerException();
1152 try {
1153 d.complete(fn.apply(arg));
1154 } catch (Throwable ex) {
1155 d.completeExceptionally(ex);
1156 }
1157 return true;
1158 }
1159 private static final long serialVersionUID = 5232453952276885070L;
1160 }
1161
1162 static final class AsyncBiFunction<T,U,V> extends Async {
1163 final BiFunction<? super T,? super U,? extends V> fn;
1164 final T arg1;
1165 final U arg2;
1166 final CompletableFuture<V> dst;
1167 AsyncBiFunction(T arg1, U arg2,
1168 BiFunction<? super T,? super U,? extends V> fn,
1169 CompletableFuture<V> dst) {
1170 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1171 }
1172 public final boolean exec() {
1173 BiFunction<? super T,? super U,? extends V> fn;
1174 CompletableFuture<V> d;
1175 if ((fn = this.fn) == null || (d = this.dst) == null)
1176 throw new NullPointerException();
1177 try {
1178 d.complete(fn.apply(arg1, arg2));
1179 } catch (Throwable ex) {
1180 d.completeExceptionally(ex);
1181 }
1182 return true;
1183 }
1184 private static final long serialVersionUID = 5232453952276885070L;
1185 }
1186
1187 static final class AsyncBlock<T> extends Async {
1188 Block<? super T> fn;
1189 T arg;
1190 final CompletableFuture<Void> dst;
1191 AsyncBlock(T arg, Block<? super T> fn,
1192 CompletableFuture<Void> dst) {
1193 this.arg = arg; this.fn = fn; this.dst = dst;
1194 }
1195 public final boolean exec() {
1196 Block<? super T> fn;
1197 CompletableFuture<Void> d;
1198 if ((fn = this.fn) == null || (d = this.dst) == null)
1199 throw new NullPointerException();
1200 try {
1201 fn.accept(arg);
1202 d.complete(null);
1203 } catch (Throwable ex) {
1204 d.completeExceptionally(ex);
1205 }
1206 return true;
1207 }
1208 private static final long serialVersionUID = 5232453952276885070L;
1209 }
1210
1211 static final class AsyncBiBlock<T,U> extends Async {
1212 final BiBlock<? super T,? super U> fn;
1213 final T arg1;
1214 final U arg2;
1215 final CompletableFuture<Void> dst;
1216 AsyncBiBlock(T arg1, U arg2,
1217 BiBlock<? super T,? super U> fn,
1218 CompletableFuture<Void> dst) {
1219 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1220 }
1221 public final boolean exec() {
1222 BiBlock<? super T,? super U> fn;
1223 CompletableFuture<Void> d;
1224 if ((fn = this.fn) == null || (d = this.dst) == null)
1225 throw new NullPointerException();
1226 try {
1227 fn.accept(arg1, arg2);
1228 d.complete(null);
1229 } catch (Throwable ex) {
1230 d.completeExceptionally(ex);
1231 }
1232 return true;
1233 }
1234 private static final long serialVersionUID = 5232453952276885070L;
1235 }
1236
1237 /* ------------- Completions -------------- */
1238
1239 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1240 static abstract class Completion extends AtomicInteger implements Runnable {
1241 }
1242
1243 static final class ThenFunction<T,U> extends Completion {
1244 final CompletableFuture<? extends T> src;
1245 final Function<? super T,? extends U> fn;
1246 final CompletableFuture<U> dst;
1247 final Executor executor;
1248 ThenFunction(CompletableFuture<? extends T> src,
1249 final Function<? super T,? extends U> fn,
1250 final CompletableFuture<U> dst, Executor executor) {
1251 this.src = src; this.fn = fn; this.dst = dst;
1252 this.executor = executor;
1253 }
1254 public void run() {
1255 CompletableFuture<? extends T> a;
1256 Function<? super T,? extends U> fn;
1257 CompletableFuture<U> dst;
1258 Object r; T t; Throwable ex;
1259 if ((dst = this.dst) != null &&
1260 (fn = this.fn) != null &&
1261 (a = this.src) != null &&
1262 (r = a.result) != null &&
1263 compareAndSet(0, 1)) {
1264 if (r instanceof AltResult) {
1265 if ((ex = ((AltResult)r).ex) != null) {
1266 dst.completeExceptionally(new RuntimeException(ex));
1267 return;
1268 }
1269 t = null;
1270 }
1271 else
1272 t = (T) r;
1273 try {
1274 if (executor != null)
1275 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
1276 else
1277 dst.complete(fn.apply(t));
1278 } catch (Throwable rex) {
1279 dst.completeExceptionally(rex);
1280 }
1281 }
1282 }
1283 }
1284
1285 static final class ThenBlock<T> extends Completion {
1286 final CompletableFuture<? extends T> src;
1287 final Block<? super T> fn;
1288 final CompletableFuture<Void> dst;
1289 final Executor executor;
1290 ThenBlock(CompletableFuture<? extends T> src,
1291 final Block<? super T> fn,
1292 final CompletableFuture<Void> dst, Executor executor) {
1293 this.src = src; this.fn = fn; this.dst = dst;
1294 this.executor = executor;
1295 }
1296 public void run() {
1297 CompletableFuture<? extends T> a;
1298 Block<? super T> fn;
1299 CompletableFuture<Void> dst;
1300 Object r; T t; Throwable ex;
1301 if ((dst = this.dst) != null &&
1302 (fn = this.fn) != null &&
1303 (a = this.src) != null &&
1304 (r = a.result) != null &&
1305 compareAndSet(0, 1)) {
1306 if (r instanceof AltResult) {
1307 if ((ex = ((AltResult)r).ex) != null) {
1308 dst.completeExceptionally(new RuntimeException(ex));
1309 return;
1310 }
1311 t = null;
1312 }
1313 else
1314 t = (T) r;
1315 try {
1316 if (executor != null)
1317 executor.execute(new AsyncBlock<T>(t, fn, dst));
1318 else {
1319 fn.accept(t);
1320 dst.complete(null);
1321 }
1322 } catch (Throwable rex) {
1323 dst.completeExceptionally(rex);
1324 }
1325 }
1326 }
1327 }
1328
1329 static final class ThenRunnable<T> extends Completion {
1330 final CompletableFuture<? extends T> src;
1331 final Runnable fn;
1332 final CompletableFuture<Void> dst;
1333 final Executor executor;
1334 ThenRunnable(CompletableFuture<? extends T> src,
1335 Runnable fn,
1336 CompletableFuture<Void> dst,
1337 Executor executor) {
1338 this.src = src; this.fn = fn; this.dst = dst;
1339 this.executor = executor;
1340 }
1341 public void run() {
1342 CompletableFuture<? extends T> a;
1343 Runnable fn;
1344 CompletableFuture<Void> dst;
1345 Object r; Throwable ex;
1346 if ((dst = this.dst) != null &&
1347 (fn = this.fn) != null &&
1348 (a = this.src) != null &&
1349 (r = a.result) != null &&
1350 compareAndSet(0, 1)) {
1351 if (r instanceof AltResult) {
1352 if ((ex = ((AltResult)r).ex) != null) {
1353 dst.completeExceptionally(new RuntimeException(ex));
1354 return;
1355 }
1356 }
1357 try {
1358 if (executor != null)
1359 executor.execute(new AsyncRunnable(fn, dst));
1360 else {
1361 fn.run();
1362 dst.complete(null);
1363 }
1364 } catch (Throwable rex) {
1365 dst.completeExceptionally(rex);
1366 }
1367 }
1368 }
1369 }
1370
1371 static final class AndFunction<T,U,V> extends Completion {
1372 final CompletableFuture<? extends T> src;
1373 final CompletableFuture<? extends U> snd;
1374 final BiFunction<? super T,? super U,? extends V> fn;
1375 final CompletableFuture<V> dst;
1376 final Executor executor;
1377 AndFunction(CompletableFuture<? extends T> src,
1378 CompletableFuture<? extends U> snd,
1379 BiFunction<? super T,? super U,? extends V> fn,
1380 CompletableFuture<V> dst, Executor executor) {
1381 this.src = src; this.snd = snd;
1382 this.fn = fn; this.dst = dst;
1383 this.executor = executor;
1384 }
1385 public void run() {
1386 Object r, s; T t; U u; Throwable ex;
1387 CompletableFuture<? extends T> a;
1388 CompletableFuture<? extends U> b;
1389 BiFunction<? super T,? super U,? extends V> fn;
1390 CompletableFuture<V> dst;
1391 if ((dst = this.dst) != null &&
1392 (fn = this.fn) != null &&
1393 (a = this.src) != null &&
1394 (r = a.result) != null &&
1395 (b = this.snd) != null &&
1396 (s = b.result) != null &&
1397 compareAndSet(0, 1)) {
1398 if (r instanceof AltResult) {
1399 if ((ex = ((AltResult)r).ex) != null) {
1400 dst.completeExceptionally(new RuntimeException(ex));
1401 return;
1402 }
1403 t = null;
1404 }
1405 else
1406 t = (T) r;
1407 if (s instanceof AltResult) {
1408 if ((ex = ((AltResult)s).ex) != null) {
1409 dst.completeExceptionally(new RuntimeException(ex));
1410 return;
1411 }
1412 u = null;
1413 }
1414 else
1415 u = (U) s;
1416 try {
1417 if (executor != null)
1418 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1419 else
1420 dst.complete(fn.apply(t, u));
1421 } catch (Throwable rex) {
1422 dst.completeExceptionally(rex);
1423 }
1424 }
1425 }
1426 }
1427
1428 static final class AndBlock<T,U> extends Completion {
1429 final CompletableFuture<? extends T> src;
1430 final CompletableFuture<? extends U> snd;
1431 final BiBlock<? super T,? super U> fn;
1432 final CompletableFuture<Void> dst;
1433 final Executor executor;
1434 AndBlock(CompletableFuture<? extends T> src,
1435 CompletableFuture<? extends U> snd,
1436 BiBlock<? super T,? super U> fn,
1437 CompletableFuture<Void> dst, Executor executor) {
1438 this.src = src; this.snd = snd;
1439 this.fn = fn; this.dst = dst;
1440 this.executor = executor;
1441 }
1442 public void run() {
1443 Object r, s; T t; U u; Throwable ex;
1444 CompletableFuture<? extends T> a;
1445 CompletableFuture<? extends U> b;
1446 BiBlock<? super T,? super U> fn;
1447 CompletableFuture<Void> dst;
1448 if ((dst = this.dst) != null &&
1449 (fn = this.fn) != null &&
1450 (a = this.src) != null &&
1451 (r = a.result) != null &&
1452 (b = this.snd) != null &&
1453 (s = b.result) != null &&
1454 compareAndSet(0, 1)) {
1455 if (r instanceof AltResult) {
1456 if ((ex = ((AltResult)r).ex) != null) {
1457 dst.completeExceptionally(new RuntimeException(ex));
1458 return;
1459 }
1460 t = null;
1461 }
1462 else
1463 t = (T) r;
1464 if (s instanceof AltResult) {
1465 if ((ex = ((AltResult)s).ex) != null) {
1466 dst.completeExceptionally(new RuntimeException(ex));
1467 return;
1468 }
1469 u = null;
1470 }
1471 else
1472 u = (U) s;
1473 try {
1474 if (executor != null)
1475 executor.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
1476 else {
1477 fn.accept(t, u);
1478 dst.complete(null);
1479 }
1480 } catch (Throwable rex) {
1481 dst.completeExceptionally(rex);
1482 }
1483 }
1484 }
1485 }
1486
1487 static final class AndRunnable<T> extends Completion {
1488 final CompletableFuture<? extends T> src;
1489 final CompletableFuture<?> snd;
1490 final Runnable fn;
1491 final CompletableFuture<Void> dst;
1492 final Executor executor;
1493 AndRunnable(CompletableFuture<? extends T> src,
1494 CompletableFuture<?> snd,
1495 Runnable fn,
1496 CompletableFuture<Void> dst, Executor executor) {
1497 this.src = src; this.snd = snd;
1498 this.fn = fn; this.dst = dst;
1499 this.executor = executor;
1500 }
1501 public void run() {
1502 Object r, s; Throwable ex;
1503 final CompletableFuture<? extends T> a;
1504 final CompletableFuture<?> b;
1505 final Runnable fn;
1506 final CompletableFuture<Void> dst;
1507 if ((dst = this.dst) != null &&
1508 (fn = this.fn) != null &&
1509 (a = this.src) != null &&
1510 (r = a.result) != null &&
1511 (b = this.snd) != null &&
1512 (s = b.result) != null &&
1513 compareAndSet(0, 1)) {
1514 if (r instanceof AltResult) {
1515 if ((ex = ((AltResult)r).ex) != null) {
1516 dst.completeExceptionally(new RuntimeException(ex));
1517 return;
1518 }
1519 }
1520 if (s instanceof AltResult) {
1521 if ((ex = ((AltResult)s).ex) != null) {
1522 dst.completeExceptionally(new RuntimeException(ex));
1523 return;
1524 }
1525 }
1526 try {
1527 if (executor != null)
1528 executor.execute(new AsyncRunnable(fn, dst));
1529 else {
1530 fn.run();
1531 dst.complete(null);
1532 }
1533 } catch (Throwable rex) {
1534 dst.completeExceptionally(rex);
1535 }
1536 }
1537 }
1538 }
1539
1540 static final class OrFunction<T,U> extends Completion {
1541 final CompletableFuture<? extends T> src;
1542 final CompletableFuture<? extends T> snd;
1543 final Function<? super T,? extends U> fn;
1544 final CompletableFuture<U> dst;
1545 final Executor executor;
1546 OrFunction(CompletableFuture<? extends T> src,
1547 CompletableFuture<? extends T> snd,
1548 Function<? super T,? extends U> fn,
1549 CompletableFuture<U> dst, Executor executor) {
1550 this.src = src; this.snd = snd;
1551 this.fn = fn; this.dst = dst;
1552 this.executor = executor;
1553 }
1554 public void run() {
1555 Object r; T t; Throwable ex;
1556 CompletableFuture<? extends T> a;
1557 CompletableFuture<? extends T> b;
1558 Function<? super T,? extends U> fn;
1559 CompletableFuture<U> dst;
1560 if ((dst = this.dst) != null &&
1561 (fn = this.fn) != null &&
1562 (((a = this.src) != null && (r = a.result) != null) ||
1563 ((b = this.snd) != null && (r = b.result) != null)) &&
1564 compareAndSet(0, 1)) {
1565 if (r instanceof AltResult) {
1566 if ((ex = ((AltResult)r).ex) != null) {
1567 dst.completeExceptionally(new RuntimeException(ex));
1568 return;
1569 }
1570 t = null;
1571 }
1572 else
1573 t = (T) r;
1574 try {
1575 if (executor != null)
1576 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
1577 else
1578 dst.complete(fn.apply(t));
1579 } catch (Throwable rex) {
1580 dst.completeExceptionally(rex);
1581 }
1582 }
1583 }
1584 }
1585
1586 static final class OrBlock<T> extends Completion {
1587 final CompletableFuture<? extends T> src;
1588 final CompletableFuture<? extends T> snd;
1589 final Block<? super T> fn;
1590 final CompletableFuture<Void> dst;
1591 final Executor executor;
1592 OrBlock(CompletableFuture<? extends T> src,
1593 CompletableFuture<? extends T> snd,
1594 Block<? super T> fn,
1595 CompletableFuture<Void> dst, Executor executor) {
1596 this.src = src; this.snd = snd;
1597 this.fn = fn; this.dst = dst;
1598 this.executor = executor;
1599 }
1600 public void run() {
1601 Object r; T t; Throwable ex;
1602 CompletableFuture<? extends T> a;
1603 CompletableFuture<? extends T> b;
1604 Block<? super T> fn;
1605 CompletableFuture<Void> dst;
1606 if ((dst = this.dst) != null &&
1607 (fn = this.fn) != null &&
1608 (((a = this.src) != null && (r = a.result) != null) ||
1609 ((b = this.snd) != null && (r = b.result) != null)) &&
1610 compareAndSet(0, 1)) {
1611 if (r instanceof AltResult) {
1612 if ((ex = ((AltResult)r).ex) != null) {
1613 dst.completeExceptionally(new RuntimeException(ex));
1614 return;
1615 }
1616 t = null;
1617 }
1618 else
1619 t = (T) r;
1620 try {
1621 if (executor != null)
1622 executor.execute(new AsyncBlock<T>(t, fn, dst));
1623 else {
1624 fn.accept(t);
1625 dst.complete(null);
1626 }
1627 } catch (Throwable rex) {
1628 dst.completeExceptionally(rex);
1629 }
1630 }
1631 }
1632 }
1633
1634 static final class OrRunnable<T> extends Completion {
1635 final CompletableFuture<? extends T> src;
1636 final CompletableFuture<?> snd;
1637 final Runnable fn;
1638 final CompletableFuture<Void> dst;
1639 final Executor executor;
1640 OrRunnable(CompletableFuture<? extends T> src,
1641 CompletableFuture<?> snd,
1642 Runnable fn,
1643 CompletableFuture<Void> dst, Executor executor) {
1644 this.src = src; this.snd = snd;
1645 this.fn = fn; this.dst = dst;
1646 this.executor = executor;
1647 }
1648 public void run() {
1649 Object r; Throwable ex;
1650 CompletableFuture<? extends T> a;
1651 final CompletableFuture<?> b;
1652 final Runnable fn;
1653 final CompletableFuture<Void> dst;
1654 if ((dst = this.dst) != null &&
1655 (fn = this.fn) != null &&
1656 (((a = this.src) != null && (r = a.result) != null) ||
1657 ((b = this.snd) != null && (r = b.result) != null)) &&
1658 compareAndSet(0, 1)) {
1659 if ((r instanceof AltResult) &&
1660 (ex = ((AltResult)r).ex) != null) {
1661 dst.completeExceptionally(new RuntimeException(ex));
1662 }
1663 else {
1664 try {
1665 if (executor != null)
1666 executor.execute(new AsyncRunnable(fn, dst));
1667 else {
1668 fn.run();
1669 dst.complete(null);
1670 }
1671 } catch (Throwable rex) {
1672 dst.completeExceptionally(rex);
1673 }
1674 }
1675 }
1676 }
1677 }
1678
1679 static final class ExceptionAction<T> extends Completion {
1680 final CompletableFuture<? extends T> src;
1681 final Function<? super Throwable, ? extends T> fn;
1682 final CompletableFuture<T> dst;
1683 ExceptionAction(CompletableFuture<? extends T> src,
1684 Function<? super Throwable, ? extends T> fn,
1685 CompletableFuture<T> dst) {
1686 this.src = src; this.fn = fn; this.dst = dst;
1687 }
1688 public void run() {
1689 CompletableFuture<? extends T> a;
1690 Function<? super Throwable, ? extends T> fn;
1691 CompletableFuture<T> dst;
1692 Object r; T t; Throwable ex;
1693 if ((dst = this.dst) != null &&
1694 (fn = this.fn) != null &&
1695 (a = this.src) != null &&
1696 (r = a.result) != null &&
1697 compareAndSet(0, 1)) {
1698 if (r instanceof AltResult) {
1699 if ((ex = ((AltResult)r).ex) != null) {
1700 try {
1701 dst.complete(fn.apply(ex));
1702 } catch (Throwable rex) {
1703 dst.completeExceptionally(rex);
1704 }
1705 return;
1706 }
1707 t = null;
1708 }
1709 else
1710 t = (T) r;
1711 dst.complete(t);
1712 }
1713 }
1714 }
1715
1716 /* ------------- then/and/or implementations -------------- */
1717
1718 private <U> CompletableFuture<U> thenFunction(Function<? super T,? extends U> fn,
1719 Executor executor) {
1720
1721 if (fn == null) throw new NullPointerException();
1722 CompletableFuture<U> dst = new CompletableFuture<U>();
1723 ThenFunction<T,U> d = null;
1724 Object r;
1725 if ((r = result) == null) {
1726 CompletionNode p = new CompletionNode
1727 (d = new ThenFunction<T,U>(this, fn, dst, executor));
1728 while ((r = result) == null) {
1729 if (UNSAFE.compareAndSwapObject
1730 (this, COMPLETIONS, p.next = completions, p))
1731 break;
1732 }
1733 }
1734 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1735 T t; Throwable ex = null;
1736 if (r instanceof AltResult) {
1737 if ((ex = ((AltResult)r).ex) != null)
1738 dst.completeExceptionally(new RuntimeException(ex));
1739 t = null;
1740 }
1741 else
1742 t = (T) r;
1743 if (ex == null) {
1744 try {
1745 if (executor != null)
1746 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
1747 else
1748 dst.complete(fn.apply(t));
1749 } catch (Throwable rex) {
1750 dst.completeExceptionally(rex);
1751 }
1752 }
1753 postComplete();
1754 }
1755 return dst;
1756 }
1757
1758 private CompletableFuture<Void> thenBlock(Block<? super T> fn,
1759 Executor executor) {
1760
1761 if (fn == null) throw new NullPointerException();
1762 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1763 ThenBlock<T> d = null;
1764 Object r;
1765 if ((r = result) == null) {
1766 CompletionNode p = new CompletionNode
1767 (d = new ThenBlock<T>(this, fn, dst, executor));
1768 while ((r = result) == null) {
1769 if (UNSAFE.compareAndSwapObject
1770 (this, COMPLETIONS, p.next = completions, p))
1771 break;
1772 }
1773 }
1774 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1775 T t; Throwable ex = null;
1776 if (r instanceof AltResult) {
1777 if ((ex = ((AltResult)r).ex) != null)
1778 dst.completeExceptionally(new RuntimeException(ex));
1779 t = null;
1780 }
1781 else
1782 t = (T) r;
1783 if (ex == null) {
1784 try {
1785 if (executor != null)
1786 executor.execute(new AsyncBlock<T>(t, fn, dst));
1787 else {
1788 fn.accept(t);
1789 dst.complete(null);
1790 }
1791 } catch (Throwable rex) {
1792 dst.completeExceptionally(rex);
1793 }
1794 }
1795 postComplete();
1796 }
1797 return dst;
1798 }
1799
1800 private CompletableFuture<Void> thenRunnable(Runnable action,
1801 Executor executor) {
1802 if (action == null) throw new NullPointerException();
1803 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1804 ThenRunnable<T> d = null;
1805 Object r;
1806 if ((r = result) == null) {
1807 CompletionNode p = new CompletionNode
1808 (d = new ThenRunnable<T>(this, action, dst, executor));
1809 while ((r = result) == null) {
1810 if (UNSAFE.compareAndSwapObject
1811 (this, COMPLETIONS, p.next = completions, p))
1812 break;
1813 }
1814 }
1815 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1816 Throwable ex = null;
1817 if (r instanceof AltResult) {
1818 if ((ex = ((AltResult)r).ex) != null)
1819 dst.completeExceptionally(new RuntimeException(ex));
1820 }
1821 if (ex == null) {
1822 try {
1823 if (executor != null)
1824 executor.execute(new AsyncRunnable(action, dst));
1825 else {
1826 action.run();
1827 dst.complete(null);
1828 }
1829 } catch (Throwable rex) {
1830 dst.completeExceptionally(rex);
1831 }
1832 }
1833 postComplete();
1834 }
1835 return dst;
1836 }
1837
1838 private <U,V> CompletableFuture<V> andFunction(CompletableFuture<? extends U> other,
1839 BiFunction<? super T,? super U,? extends V> fn,
1840 Executor executor) {
1841 if (other == null || fn == null) throw new NullPointerException();
1842 CompletableFuture<V> dst = new CompletableFuture<V>();
1843 AndFunction<T,U,V> d = null;
1844 Object r, s = null;
1845 if ((r = result) == null || (s = other.result) == null) {
1846 d = new AndFunction<T,U,V>(this, other, fn, dst, executor);
1847 CompletionNode q = null, p = new CompletionNode(d);
1848 while ((r == null && (r = result) == null) ||
1849 (s == null && (s = other.result) == null)) {
1850 if (q != null) {
1851 if (s != null ||
1852 UNSAFE.compareAndSwapObject
1853 (other, COMPLETIONS, q.next = other.completions, q))
1854 break;
1855 }
1856 else if (r != null ||
1857 UNSAFE.compareAndSwapObject
1858 (this, COMPLETIONS, p.next = completions, p)) {
1859 if (s != null)
1860 break;
1861 q = new CompletionNode(d);
1862 }
1863 }
1864 }
1865 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1866 T t; U u; Throwable ex = null;
1867 if (r instanceof AltResult) {
1868 if ((ex = ((AltResult)r).ex) != null)
1869 dst.completeExceptionally(new RuntimeException(ex));
1870 t = null;
1871 }
1872 else
1873 t = (T) r;
1874 if (ex != null)
1875 u = null;
1876 else if (s instanceof AltResult) {
1877 if ((ex = ((AltResult)s).ex) != null)
1878 dst.completeExceptionally(new RuntimeException(ex));
1879 u = null;
1880 }
1881 else
1882 u = (U) s;
1883 if (ex == null) {
1884 try {
1885 if (executor != null)
1886 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1887 else
1888 dst.complete(fn.apply(t, u));
1889 } catch (Throwable rex) {
1890 dst.completeExceptionally(rex);
1891 }
1892 }
1893 }
1894 if (r != null)
1895 postComplete();
1896 if (s != null)
1897 other.postComplete();
1898 return dst;
1899 }
1900
1901 private <U> CompletableFuture<Void> andBlock(CompletableFuture<? extends U> other,
1902 BiBlock<? super T,? super U> fn,
1903 Executor executor) {
1904 if (other == null || fn == null) throw new NullPointerException();
1905 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1906 AndBlock<T,U> d = null;
1907 Object r, s = null;
1908 if ((r = result) == null || (s = other.result) == null) {
1909 d = new AndBlock<T,U>(this, other, fn, dst, executor);
1910 CompletionNode q = null, p = new CompletionNode(d);
1911 while ((r == null && (r = result) == null) ||
1912 (s == null && (s = other.result) == null)) {
1913 if (q != null) {
1914 if (s != null ||
1915 UNSAFE.compareAndSwapObject
1916 (other, COMPLETIONS, q.next = other.completions, q))
1917 break;
1918 }
1919 else if (r != null ||
1920 UNSAFE.compareAndSwapObject
1921 (this, COMPLETIONS, p.next = completions, p)) {
1922 if (s != null)
1923 break;
1924 q = new CompletionNode(d);
1925 }
1926 }
1927 }
1928 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1929 T t; U u; Throwable ex = null;
1930 if (r instanceof AltResult) {
1931 if ((ex = ((AltResult)r).ex) != null)
1932 dst.completeExceptionally(new RuntimeException(ex));
1933 t = null;
1934 }
1935 else
1936 t = (T) r;
1937 if (ex != null)
1938 u = null;
1939 else if (s instanceof AltResult) {
1940 if ((ex = ((AltResult)s).ex) != null)
1941 dst.completeExceptionally(new RuntimeException(ex));
1942 u = null;
1943 }
1944 else
1945 u = (U) s;
1946 if (ex == null) {
1947 try {
1948 if (executor != null)
1949 executor.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
1950 else {
1951 fn.accept(t, u);
1952 dst.complete(null);
1953 }
1954 } catch (Throwable rex) {
1955 dst.completeExceptionally(rex);
1956 }
1957 }
1958 }
1959 if (r != null)
1960 postComplete();
1961 if (s != null)
1962 other.postComplete();
1963 return dst;
1964 }
1965
1966 private CompletableFuture<Void> andRunnable(CompletableFuture<?> other,
1967 Runnable action,
1968 Executor executor) {
1969 if (other == null || action == null) throw new NullPointerException();
1970 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1971 AndRunnable<T> d = null;
1972 Object r, s = null;
1973 if ((r = result) == null || (s = other.result) == null) {
1974 d = new AndRunnable<T>(this, other, action, dst, executor);
1975 CompletionNode q = null, p = new CompletionNode(d);
1976 while ((r == null && (r = result) == null) ||
1977 (s == null && (s = other.result) == null)) {
1978 if (q != null) {
1979 if (s != null ||
1980 UNSAFE.compareAndSwapObject
1981 (other, COMPLETIONS, q.next = other.completions, q))
1982 break;
1983 }
1984 else if (r != null ||
1985 UNSAFE.compareAndSwapObject
1986 (this, COMPLETIONS, p.next = completions, p)) {
1987 if (s != null)
1988 break;
1989 q = new CompletionNode(d);
1990 }
1991 }
1992 }
1993 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1994 Throwable ex = null;
1995 if ((r instanceof AltResult) &&
1996 (ex = ((AltResult)r).ex) != null)
1997 dst.completeExceptionally(new RuntimeException(ex));
1998 else if ((s instanceof AltResult) &&
1999 (ex = ((AltResult)s).ex) != null)
2000 dst.completeExceptionally(new RuntimeException(ex));
2001 else {
2002 try {
2003 if (executor != null)
2004 executor.execute(new AsyncRunnable(action, dst));
2005 else {
2006 action.run();
2007 dst.complete(null);
2008 }
2009 } catch (Throwable rex) {
2010 dst.completeExceptionally(rex);
2011 }
2012 }
2013 }
2014 if (r != null)
2015 postComplete();
2016 if (s != null)
2017 other.postComplete();
2018 return dst;
2019 }
2020
2021 private <U> CompletableFuture<U> orFunction(CompletableFuture<? extends T> other,
2022 Function<? super T, U> fn,
2023 Executor executor) {
2024 if (other == null || fn == null) throw new NullPointerException();
2025 CompletableFuture<U> dst = new CompletableFuture<U>();
2026 OrFunction<T,U> d = null;
2027 Object r;
2028 if ((r = result) == null && (r = other.result) == null) {
2029 d = new OrFunction<T,U>(this, other, fn, dst, executor);
2030 CompletionNode q = null, p = new CompletionNode(d);
2031 while ((r = result) == null && (r = other.result) == null) {
2032 if (q != null) {
2033 if (UNSAFE.compareAndSwapObject
2034 (other, COMPLETIONS, q.next = other.completions, q))
2035 break;
2036 }
2037 else if (UNSAFE.compareAndSwapObject
2038 (this, COMPLETIONS, p.next = completions, p))
2039 q = new CompletionNode(d);
2040 }
2041 }
2042 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2043 T t; Throwable ex = null;
2044 if (r instanceof AltResult) {
2045 if ((ex = ((AltResult)r).ex) != null)
2046 dst.completeExceptionally(new RuntimeException(ex));
2047 t = null;
2048 }
2049 else
2050 t = (T) r;
2051 if (ex == null) {
2052 try {
2053 if (executor != null)
2054 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
2055 else
2056 dst.complete(fn.apply(t));
2057 } catch (Throwable rex) {
2058 dst.completeExceptionally(rex);
2059 }
2060 }
2061 }
2062 if (r != null) {
2063 if (result != null)
2064 postComplete();
2065 if (other.result != null)
2066 other.postComplete();
2067 }
2068 return dst;
2069 }
2070
2071 private CompletableFuture<Void> orBlock(CompletableFuture<? extends T> other,
2072 Block<? super T> fn,
2073 Executor executor) {
2074 if (other == null || fn == null) throw new NullPointerException();
2075 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2076 OrBlock<T> d = null;
2077 Object r;
2078 if ((r = result) == null && (r = other.result) == null) {
2079 d = new OrBlock<T>(this, other, fn, dst, executor);
2080 CompletionNode q = null, p = new CompletionNode(d);
2081 while ((r = result) == null && (r = other.result) == null) {
2082 if (q != null) {
2083 if (UNSAFE.compareAndSwapObject
2084 (other, COMPLETIONS, q.next = other.completions, q))
2085 break;
2086 }
2087 else if (UNSAFE.compareAndSwapObject
2088 (this, COMPLETIONS, p.next = completions, p))
2089 q = new CompletionNode(d);
2090 }
2091 }
2092 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2093 T t; Throwable ex = null;
2094 if (r instanceof AltResult) {
2095 if ((ex = ((AltResult)r).ex) != null)
2096 dst.completeExceptionally(new RuntimeException(ex));
2097 t = null;
2098 }
2099 else
2100 t = (T) r;
2101 if (ex == null) {
2102 try {
2103 if (executor != null)
2104 executor.execute(new AsyncBlock<T>(t, fn, dst));
2105 else {
2106 fn.accept(t);
2107 dst.complete(null);
2108 }
2109 } catch (Throwable rex) {
2110 dst.completeExceptionally(rex);
2111 }
2112 }
2113 }
2114 if (r != null) {
2115 if (result != null)
2116 postComplete();
2117 if (other.result != null)
2118 other.postComplete();
2119 }
2120 return dst;
2121 }
2122
2123 private CompletableFuture<Void> orRunnable(CompletableFuture<?> other,
2124 Runnable action,
2125 Executor executor) {
2126 if (other == null || action == null) throw new NullPointerException();
2127 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2128 OrRunnable<T> d = null;
2129 Object r;
2130 if ((r = result) == null && (r = other.result) == null) {
2131 d = new OrRunnable<T>(this, other, action, dst, executor);
2132 CompletionNode q = null, p = new CompletionNode(d);
2133 while ((r = result) == null && (r = other.result) == null) {
2134 if (q != null) {
2135 if (UNSAFE.compareAndSwapObject
2136 (other, COMPLETIONS, q.next = other.completions, q))
2137 break;
2138 }
2139 else if (UNSAFE.compareAndSwapObject
2140 (this, COMPLETIONS, p.next = completions, p))
2141 q = new CompletionNode(d);
2142 }
2143 }
2144 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2145 Throwable ex = null;
2146 if ((r instanceof AltResult) &&
2147 (ex = ((AltResult)r).ex) != null)
2148 dst.completeExceptionally(new RuntimeException(ex));
2149 else {
2150 try {
2151 if (executor != null)
2152 executor.execute(new AsyncRunnable(action, dst));
2153 else {
2154 action.run();
2155 dst.complete(null);
2156 }
2157 } catch (Throwable rex) {
2158 dst.completeExceptionally(rex);
2159 }
2160 }
2161 }
2162 if (r != null) {
2163 if (result != null)
2164 postComplete();
2165 if (other.result != null)
2166 other.postComplete();
2167 }
2168 return dst;
2169 }
2170
2171 // Unsafe mechanics
2172 private static final sun.misc.Unsafe UNSAFE;
2173 private static final long RESULT;
2174 private static final long WAITERS;
2175 private static final long COMPLETIONS;
2176 static {
2177 try {
2178 UNSAFE = sun.misc.Unsafe.getUnsafe();
2179 Class<?> k = CompletableFuture.class;
2180 RESULT = UNSAFE.objectFieldOffset
2181 (k.getDeclaredField("result"));
2182 WAITERS = UNSAFE.objectFieldOffset
2183 (k.getDeclaredField("waiters"));
2184 COMPLETIONS = UNSAFE.objectFieldOffset
2185 (k.getDeclaredField("completions"));
2186 } catch (Exception e) {
2187 throw new Error(e);
2188 }
2189 }
2190
2191 }