ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.7
Committed: Sat Dec 29 18:13:39 2012 UTC (11 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.6: +595 -61 lines
Log Message:
Rename and expand methods

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Block;
10 import java.util.function.BiBlock;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>Similar methods are available for Functions, Blocks, and
31 * Runnables, depending on whether actions require arguments and/or
32 * produce results. Functions and actions supplied for dependent
33 * completions (mainly using methods with prefix {@code then}) may be
34 * performed by the thread that completes the current
35 * CompletableFuture, or by any other caller of these methods. There
36 * are no guarantees about the order of processing completions unless
37 * constrained by method {@code then} and related methods.
38 *
39 * <p>When two or more threads attempt to {@link #complete} or {@link
40 * #completeExceptionally} a CompletableFuture, only one of them will
41 * succeed. Upon exceptional completion, or when a completion entails
42 * computation of a function or action, and it terminates abruptly
43 * with an exception, then further completions act as {@code
44 * completeExceptionally} with that exception.
45 *
46 * <p>CompletableFutures themselves do not execute asynchronously.
47 * However, the {@code async} methods provide commonly useful ways to
48 * to commence asynchronous processing, using either a given {@link
49 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
50 * function or action that will result in the completion of a new
51 * CompletableFuture.
52 *
53 * @author Doug Lea
54 * @since 1.8
55 */
56 public class CompletableFuture<T> implements Future<T> {
57 /*
58 * Quick overview (more to come):
59 *
60 * 1. Non-nullness of field result indicates done. An AltResult is
61 * used to box null as a result, as well as to hold exceptions.
62 *
63 * 2. Waiters are held in a Treiber stack similar to the one used
64 * in FutureTask
65 *
66 * 3. Completions are also kept in a list/stack, and pulled off
67 * and run when completion is triggered.
68 */
69
70 static final class AltResult {
71 final Throwable ex; // null only for NIL
72 AltResult(Throwable ex) { this.ex = ex; }
73 }
74
75 static final AltResult NIL = new AltResult(null);
76
77 /**
78 * Simple linked list nodes to record waiting threads in a Treiber
79 * stack. See other classes such as Phaser and SynchronousQueue
80 * for more detailed explanation.
81 */
82 static final class WaitNode {
83 volatile Thread thread;
84 volatile WaitNode next;
85 }
86
87 /**
88 * Simple linked list nodes to record completions, used in
89 * basically the same way as WaitNodes
90 */
91 static final class CompletionNode {
92 final Completion completion;
93 volatile CompletionNode next;
94 CompletionNode(Completion completion) { this.completion = completion; }
95 }
96
97
98 volatile Object result; // either the result or boxed AltResult
99 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
100 volatile CompletionNode completions; // list (Treiber stack) of completions
101
102 /**
103 * Creates a new incomplete CompletableFuture.
104 */
105 public CompletableFuture() {
106 }
107
108 /**
109 * Asynchronously executes in the {@link
110 * ForkJoinPool#commonPool()}, a task that completes the returned
111 * CompletableFuture with the result of the given Supplier.
112 *
113 * @param supplier a function returning the value to be used
114 * to complete the returned CompletableFuture.
115 * @return the CompletableFuture.
116 */
117 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
118 if (supplier == null) throw new NullPointerException();
119 CompletableFuture<U> f = new CompletableFuture<U>();
120 ForkJoinPool.commonPool().
121 execute((ForkJoinTask<?>)new AsyncSupplier<U>(supplier, f));
122 return f;
123 }
124
125 /**
126 * Asynchronously executes using the given executor, a task that
127 * completes the returned CompletableFuture with the result of the
128 * given Supplier.
129 *
130 * @param supplier a function returning the value to be used
131 * to complete the returned CompletableFuture.
132 * @param executor the executor to use for asynchronous execution
133 * @return the CompletableFuture.
134 */
135 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
136 Executor executor) {
137 if (executor == null || supplier == null)
138 throw new NullPointerException();
139 CompletableFuture<U> f = new CompletableFuture<U>();
140 executor.execute(new AsyncSupplier<U>(supplier, f));
141 return f;
142 }
143
144 /**
145 * Asynchronously executes in the {@link
146 * ForkJoinPool#commonPool()} a task that runs the given action,
147 * and then completes the returned CompletableFuture
148 *
149 * @param runnable the action to run before completing the
150 * returned CompletableFuture.
151 * @return the CompletableFuture.
152 */
153 public static CompletableFuture<Void> runAsync(Runnable runnable) {
154 if (runnable == null) throw new NullPointerException();
155 CompletableFuture<Void> f = new CompletableFuture<Void>();
156 ForkJoinPool.commonPool().
157 execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
158 return f;
159 }
160
161 /**
162 * Asynchronously executes using the given executor, a task that
163 * runs the given action, and then completes the returned
164 * CompletableFuture
165 *
166 * @param runnable the action to run before completing the
167 * returned CompletableFuture.
168 * @param executor the executor to use for asynchronous execution
169 * @return the CompletableFuture.
170 */
171 public static CompletableFuture<Void> runAsync(Runnable runnable,
172 Executor executor) {
173 if (executor == null || runnable == null)
174 throw new NullPointerException();
175 CompletableFuture<Void> f = new CompletableFuture<Void>();
176 executor.execute(new AsyncRunnable(runnable, f));
177 return f;
178 }
179
180 /**
181 * Returns {@code true} if completed in any fashion: normally,
182 * exceptionally, or cancellation.
183 *
184 * @return {@code true} if completed
185 */
186 public boolean isDone() {
187 return result != null;
188 }
189
190 /**
191 * Returns the result value when complete, or throws an
192 * (unchecked) exception if completed exceptionally. To better
193 * conform with the use of common functional forms, this method
194 * transforms any checked exception possible with {@link
195 * Future#get} into an (unchecked) {@link RuntimeException} with
196 * the underlying exception as its cause. (The checked exception
197 * convention is available using the timed form of get.)
198 *
199 * @return the result value
200 */
201 public T get() {
202 Object r; Throwable ex;
203 if ((r = result) == null)
204 return waitingGet();
205 if (r instanceof AltResult) {
206 if ((ex = ((AltResult)r).ex) != null) {
207 if (ex instanceof Error)
208 throw (Error)ex;
209 if (ex instanceof RuntimeException)
210 throw (RuntimeException)ex;
211 throw new RuntimeException(ex);
212 }
213 return null;
214 }
215 return (T)r;
216 }
217
218 /**
219 * Returns the result value (or throws any encountered exception)
220 * if completed, else returns the given valueIfAbsent.
221 *
222 * @param valueIfAbsent the value to return if not completed
223 * @return the result value, if completed, else the given valueIfAbsent
224 */
225 public T getNow(T valueIfAbsent) {
226 Object r; Throwable ex;
227 if ((r = result) == null)
228 return valueIfAbsent;
229 if (r instanceof AltResult) {
230 if ((ex = ((AltResult)r).ex) != null) {
231 if (ex instanceof Error)
232 throw (Error)ex;
233 if (ex instanceof RuntimeException)
234 throw (RuntimeException)ex;
235 throw new RuntimeException(ex);
236 }
237 return null;
238 }
239 return (T)r;
240 }
241
242 /**
243 * Waits if necessary for at most the given time for completion,
244 * and then retrieves its result, if available.
245 *
246 * @param timeout the maximum time to wait
247 * @param unit the time unit of the timeout argument
248 * @return the computed result
249 * @throws CancellationException if the computation was cancelled
250 * @throws ExecutionException if the computation threw an
251 * exception
252 * @throws InterruptedException if the current thread was interrupted
253 * while waiting
254 * @throws TimeoutException if the wait timed out
255 */
256 public T get(long timeout, TimeUnit unit)
257 throws InterruptedException, ExecutionException, TimeoutException {
258 Object r; Throwable ex;
259 long nanos = unit.toNanos(timeout);
260 if (Thread.interrupted())
261 throw new InterruptedException();
262 if ((r = result) == null)
263 r = timedAwaitDone(nanos);
264 if (r instanceof AltResult) {
265 if ((ex = ((AltResult)r).ex) != null) {
266 if (ex instanceof ExecutionException) // avoid re-wrap
267 throw (ExecutionException)ex;
268 throw new ExecutionException(ex);
269 }
270 return null;
271 }
272 return (T)r;
273 }
274
275 /**
276 * If not already completed, sets the value returned by {@link
277 * #get()} and related methods to the given value.
278 *
279 * @param value the result value
280 * @return true if this invocation caused this CompletableFuture
281 * to transition to a completed state, else false.
282 */
283 public boolean complete(T value) {
284 if (result == null &&
285 UNSAFE.compareAndSwapObject(this, RESULT, null,
286 (value == null) ? NIL : value)) {
287 postComplete();
288 return true;
289 }
290 return false;
291 }
292
293 /**
294 * If not already completed, causes invocations of {@link #get()}
295 * and related methods to throw the given exception.
296 *
297 * @param ex the exception
298 * @return true if this invocation caused this CompletableFuture
299 * to transition to a completed state, else false.
300 */
301 public boolean completeExceptionally(Throwable ex) {
302 if (ex == null) throw new NullPointerException();
303 if (result == null) {
304 Object r = new AltResult(ex);
305 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
306 postComplete();
307 return true;
308 }
309 }
310 return false;
311 }
312
313 /**
314 * Creates and returns a CompletableFuture that is completed with
315 * the result of the given function of this CompletableFuture.
316 * If this CompletableFuture completes exceptionally,
317 * then the returned CompletableFuture also does so,
318 * with a RuntimeException having this exception as
319 * its cause.
320 *
321 * @param fn the function to use to compute the value of
322 * the returned CompletableFuture
323 * @return the new CompletableFuture
324 */
325 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
326 return thenFunction(fn, null);
327 }
328
329 /**
330 * Creates and returns a CompletableFuture that is asynchronously
331 * completed using the {@link ForkJoinPool#commonPool()} with the
332 * result of the given function of this CompletableFuture. If
333 * this CompletableFuture completes exceptionally, then the
334 * returned CompletableFuture also does so, with a
335 * RuntimeException having this exception as its cause.
336 *
337 * @param fn the function to use to compute the value of
338 * the returned CompletableFuture
339 * @return the new CompletableFuture
340 */
341 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
342 return thenFunction(fn, ForkJoinPool.commonPool());
343 }
344
345 /**
346 * Creates and returns a CompletableFuture that is asynchronously
347 * completed using the given executor with the result of the given
348 * function of this CompletableFuture. If this CompletableFuture
349 * completes exceptionally, then the returned CompletableFuture
350 * also does so, with a RuntimeException having this exception as
351 * its cause.
352 *
353 * @param fn the function to use to compute the value of
354 * the returned CompletableFuture
355 * @param executor the executor to use for asynchronous execution
356 * @return the new CompletableFuture
357 */
358 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
359 Executor executor) {
360 if (executor == null) throw new NullPointerException();
361 return thenFunction(fn, executor);
362 }
363
364 /**
365 * Creates and returns a CompletableFuture that is completed after
366 * performing the given action with this CompletableFuture's
367 * result when it completes. If this CompletableFuture
368 * completes exceptionally, then the returned CompletableFuture
369 * also does so, with a RuntimeException having this exception as
370 * its cause.
371 *
372 * @param block the action to perform before completing the
373 * returned CompletableFuture
374 * @return the new CompletableFuture
375 */
376 public CompletableFuture<Void> thenAccept(Block<? super T> block) {
377 return thenBlock(block, null);
378 }
379
380 /**
381 * Creates and returns a CompletableFuture that is asynchronously
382 * completed using the {@link ForkJoinPool#commonPool()} with this
383 * CompletableFuture's result when it completes. If this
384 * CompletableFuture completes exceptionally, then the returned
385 * CompletableFuture also does so, with a RuntimeException having
386 * this exception as its cause.
387 *
388 * @param block the action to perform before completing the
389 * returned CompletableFuture
390 * @return the new CompletableFuture
391 */
392 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block) {
393 return thenBlock(block, ForkJoinPool.commonPool());
394 }
395
396 /**
397 * Creates and returns a CompletableFuture that is asynchronously
398 * completed using the given executor with this
399 * CompletableFuture's result when it completes. If this
400 * CompletableFuture completes exceptionally, then the returned
401 * CompletableFuture also does so, with a RuntimeException having
402 * this exception as its cause.
403 *
404 * @param block the action to perform before completing the
405 * returned CompletableFuture
406 * @param executor the executor to use for asynchronous execution
407 * @return the new CompletableFuture
408 */
409 public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block,
410 Executor executor) {
411 if (executor == null) throw new NullPointerException();
412 return thenBlock(block, executor);
413 }
414
415 /**
416 * Creates and returns a CompletableFuture that is completed after
417 * performing the given action when this CompletableFuture
418 * completes. If this CompletableFuture completes exceptionally,
419 * then the returned CompletableFuture also does so, with a
420 * RuntimeException having this exception as its cause.
421 *
422 * @param action the action to perform before completing the
423 * returned CompletableFuture
424 * @return the new CompletableFuture
425 */
426 public CompletableFuture<Void> thenRun(Runnable action) {
427 return thenRunnable(action, null);
428 }
429
430 /**
431 * Creates and returns a CompletableFuture that is asynchronously
432 * completed using the {@link ForkJoinPool#commonPool()} after
433 * performing the given action when this CompletableFuture
434 * completes. If this CompletableFuture completes exceptionally,
435 * then the returned CompletableFuture also does so, with a
436 * RuntimeException having this exception as its cause.
437 *
438 * @param action the action to perform before completing the
439 * returned CompletableFuture
440 * @return the new CompletableFuture
441 */
442 public CompletableFuture<Void> thenRunAsync(Runnable action) {
443 return thenRunnable(action, ForkJoinPool.commonPool());
444 }
445
446 /**
447 * Creates and returns a CompletableFuture that is asynchronously
448 * completed using the given executor after performing the given
449 * action when this CompletableFuture completes. If this
450 * CompletableFuture completes exceptionally, then the returned
451 * CompletableFuture also does so, with a RuntimeException having
452 * this exception as its cause.
453 *
454 * @param action the action to perform before completing the
455 * returned CompletableFuture
456 * @param executor the executor to use for asynchronous execution
457 * @return the new CompletableFuture
458 */
459 public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) {
460 if (executor == null) throw new NullPointerException();
461 return thenRunnable(action, executor);
462 }
463
464 /**
465 * Creates and returns a CompletableFuture that is completed with
466 * the result of the given function of this and the other given
467 * CompletableFuture's results when both complete. If this or
468 * the other CompletableFuture complete exceptionally, then the
469 * returned CompletableFuture also does so, with a
470 * RuntimeException having the exception as its cause.
471 *
472 * @param other the other CompletableFuture
473 * @param fn the function to use to compute the value of
474 * the returned CompletableFuture
475 * @return the new CompletableFuture
476 */
477 public <U,V> CompletableFuture<V> thenApply(CompletableFuture<? extends U> other,
478 BiFunction<? super T,? super U,? extends V> fn) {
479 return andFunction(other, fn, null);
480 }
481
482 /**
483 * Creates and returns a CompletableFuture that is asynchronously
484 * completed using the {@link ForkJoinPool#commonPool()} with
485 * the result of the given function of this and the other given
486 * CompletableFuture's results when both complete. If this or
487 * the other CompletableFuture complete exceptionally, then the
488 * returned CompletableFuture also does so, with a
489 * RuntimeException having the exception as its cause.
490 *
491 * @param other the other CompletableFuture
492 * @param fn the function to use to compute the value of
493 * the returned CompletableFuture
494 * @return the new CompletableFuture
495 */
496 public <U,V> CompletableFuture<V> thenApplyAsync(CompletableFuture<? extends U> other,
497 BiFunction<? super T,? super U,? extends V> fn) {
498 return andFunction(other, fn, ForkJoinPool.commonPool());
499 }
500
501 /**
502 * Creates and returns a CompletableFuture that is
503 * asynchronously completed using the given executor with the
504 * result of the given function of this and the other given
505 * CompletableFuture's results when both complete. If this or
506 * the other CompletableFuture complete exceptionally, then the
507 * returned CompletableFuture also does so, with a
508 * RuntimeException having the exception as its cause.
509 *
510 * @param other the other CompletableFuture
511 * @param fn the function to use to compute the value of
512 * the returned CompletableFuture
513 * @param executor the executor to use for asynchronous execution
514 * @return the new CompletableFuture
515 */
516
517 public <U,V> CompletableFuture<V> thenApplyAsync(CompletableFuture<? extends U> other,
518 BiFunction<? super T,? super U,? extends V> fn,
519 Executor executor) {
520 if (executor == null) throw new NullPointerException();
521 return andFunction(other, fn, executor);
522 }
523
524 /**
525 * Creates and returns a CompletableFuture that is completed with
526 * the results of this and the other given CompletableFuture if
527 * both complete. If this and/or the other CompletableFuture
528 * complete exceptionally, then the returned CompletableFuture
529 * also does so, with a RuntimeException having the one of the
530 * exceptions as its cause.
531 *
532 * @param other the other CompletableFuture
533 * @param block the action to perform before completing the
534 * returned CompletableFuture
535 * @return the new CompletableFuture
536 */
537 public <U> CompletableFuture<Void> thenAccept(CompletableFuture<? extends U> other,
538 BiBlock<? super T, ? super U> block) {
539 return andBlock(other, block, null);
540 }
541
542 /**
543 * Creates and returns a CompletableFuture that is completed
544 * asynchronously using the {@link ForkJoinPool#commonPool()} with
545 * the results of this and the other given CompletableFuture when
546 * both complete. If this and/or the other CompletableFuture
547 * complete exceptionally, then the returned CompletableFuture
548 * also does so, with a RuntimeException having the one of the
549 * exceptions as its cause.
550 *
551 * @param other the other CompletableFuture
552 * @param block the action to perform before completing the
553 * returned CompletableFuture
554 * @return the new CompletableFuture
555 */
556 public <U> CompletableFuture<Void> thenAcceptAsync(CompletableFuture<? extends U> other,
557 BiBlock<? super T, ? super U> block) {
558 return andBlock(other, block, ForkJoinPool.commonPool());
559 }
560
561 /**
562 * Creates and returns a CompletableFuture that is completed
563 * asynchronously using the given executor with the results of
564 * this and the other given CompletableFuture when both complete.
565 * If this and/or the other CompletableFuture complete
566 * exceptionally, then the returned CompletableFuture also does
567 * so, with a RuntimeException having the one of the exceptions as
568 * its cause.
569 *
570 * @param other the other CompletableFuture
571 * @param block the action to perform before completing the
572 * returned CompletableFuture
573 * @param executor the executor to use for asynchronous execution
574 * @return the new CompletableFuture
575 */
576 public <U> CompletableFuture<Void> thenAcceptAsync(CompletableFuture<? extends U> other,
577 BiBlock<? super T, ? super U> block,
578 Executor executor) {
579 if (executor == null) throw new NullPointerException();
580 return andBlock(other, block, executor);
581 }
582
583 /**
584 * Creates and returns a CompletableFuture that is completed
585 * when this and the other given CompletableFuture both
586 * complete. If this and/or the other CompletableFuture complete
587 * exceptionally, then the returned CompletableFuture also does
588 * so, with a RuntimeException having the one of the exceptions as
589 * its cause.
590 *
591 * @param other the other CompletableFuture
592 * @param action the action to perform before completing the
593 * returned CompletableFuture
594 * @return the new CompletableFuture
595 */
596 public CompletableFuture<Void> thenRun(CompletableFuture<?> other,
597 Runnable action) {
598 return andRunnable(other, action, null);
599 }
600
601 /**
602 * Creates and returns a CompletableFuture that is completed
603 * asynchronously using the {@link ForkJoinPool#commonPool()}
604 * when this and the other given CompletableFuture both
605 * complete. If this and/or the other CompletableFuture complete
606 * exceptionally, then the returned CompletableFuture also does
607 * so, with a RuntimeException having the one of the exceptions as
608 * its cause.
609 *
610 * @param other the other CompletableFuture
611 * @param action the action to perform before completing the
612 * returned CompletableFuture
613 * @return the new CompletableFuture
614 */
615 public CompletableFuture<Void> thenRunAsync(CompletableFuture<?> other,
616 Runnable action) {
617 return andRunnable(other, action, ForkJoinPool.commonPool());
618 }
619
620 /**
621 * Creates and returns a CompletableFuture that is completed
622 * asynchronously using the given executor
623 * when this and the other given CompletableFuture both
624 * complete. If this and/or the other CompletableFuture complete
625 * exceptionally, then the returned CompletableFuture also does
626 * so, with a RuntimeException having the one of the exceptions as
627 * its cause.
628 *
629 * @param other the other CompletableFuture
630 * @param action the action to perform before completing the
631 * returned CompletableFuture
632 * @param executor the executor to use for asynchronous execution
633 * @return the new CompletableFuture
634 */
635 public CompletableFuture<Void> thenRunAsync(CompletableFuture<?> other,
636 Runnable action,
637 Executor executor) {
638 if (executor == null) throw new NullPointerException();
639 return andRunnable(other, action, executor);
640 }
641
642 /**
643 * Creates and returns a CompletableFuture that is completed with
644 * the result of the given function of either this or the other
645 * given CompletableFuture's results when either complete. If
646 * this and/or the other CompletableFuture complete exceptionally,
647 * then the returned CompletableFuture may also do so, with a
648 * RuntimeException having one of these exceptions as its cause.
649 * No guarantees are made about which result or exception is used
650 * in the returned CompletableFuture.
651 *
652 * @param other the other CompletableFuture
653 * @param fn the function to use to compute the value of
654 * the returned CompletableFuture
655 * @return the new CompletableFuture
656 */
657 public <U> CompletableFuture<U> orApply(CompletableFuture<? extends T> other,
658 Function<? super T, U> fn) {
659 return orFunction(other, fn, null);
660 }
661
662 /**
663 * Creates and returns a CompletableFuture that is completed
664 * asynchronously using the {@link ForkJoinPool#commonPool()} with
665 * the result of the given function of either this or the other
666 * given CompletableFuture's results when either complete. If
667 * this and/or the other CompletableFuture complete exceptionally,
668 * then the returned CompletableFuture may also do so, with a
669 * RuntimeException having one of these exceptions as its cause.
670 * No guarantees are made about which result or exception is used
671 * in the returned CompletableFuture.
672 *
673 * @param other the other CompletableFuture
674 * @param fn the function to use to compute the value of
675 * the returned CompletableFuture
676 * @return the new CompletableFuture
677 */
678 public <U> CompletableFuture<U> orApplyAsync(CompletableFuture<? extends T> other,
679 Function<? super T, U> fn) {
680 return orFunction(other, fn, ForkJoinPool.commonPool());
681 }
682
683 /**
684 * Creates and returns a CompletableFuture that is completed
685 * asynchronously using the given executor with the result of the
686 * given function of either this or the other given
687 * CompletableFuture's results when either complete. If this
688 * and/or the other CompletableFuture complete exceptionally, then
689 * the returned CompletableFuture may also do so, with a
690 * RuntimeException having one of these exceptions as its cause.
691 * No guarantees are made about which result or exception is used
692 * in the returned CompletableFuture.
693 *
694 * @param other the other CompletableFuture
695 * @param fn the function to use to compute the value of
696 * the returned CompletableFuture
697 * @param executor the executor to use for asynchronous execution
698 * @return the new CompletableFuture
699 */
700 public <U> CompletableFuture<U> orApplyAsync(CompletableFuture<? extends T> other,
701 Function<? super T, U> fn,
702 Executor executor) {
703 if (executor == null) throw new NullPointerException();
704 return orFunction(other, fn, executor);
705 }
706
707 /**
708 * Creates and returns a CompletableFuture that is completed after
709 * performing the given action with the result of either this or the
710 * other given CompletableFuture's result, when either complete.
711 * If this and/or the other CompletableFuture complete
712 * exceptionally, then the returned CompletableFuture may also do
713 * so, with a RuntimeException having one of these exceptions as
714 * its cause. No guarantees are made about which exception is
715 * used in the returned CompletableFuture.
716 *
717 * @param other the other CompletableFuture
718 * @param block the action to perform before completing the
719 * returned CompletableFuture
720 * @return the new CompletableFuture
721 */
722 public CompletableFuture<Void> orAccept(CompletableFuture<? extends T> other,
723 Block<? super T> block) {
724 return orBlock(other, block, null);
725 }
726
727 /**
728 * Creates and returns a CompletableFuture that is completed
729 * asynchronously using the {@link ForkJoinPool#commonPool()},
730 * performing the given action with the result of either this or
731 * the other given CompletableFuture's result, when either
732 * complete. If this and/or the other CompletableFuture complete
733 * exceptionally, then the returned CompletableFuture may also do
734 * so, with a RuntimeException having one of these exceptions as
735 * its cause. No guarantees are made about which exception is
736 * used in the returned CompletableFuture.
737 *
738 * @param other the other CompletableFuture
739 * @param block the action to perform before completing the
740 * returned CompletableFuture
741 * @return the new CompletableFuture
742 */
743 public CompletableFuture<Void> orAcceptAsync(CompletableFuture<? extends T> other,
744 Block<? super T> block) {
745 return orBlock(other, block, ForkJoinPool.commonPool());
746 }
747
748 /**
749 * Creates and returns a CompletableFuture that is completed
750 * asynchronously using the given executor,
751 * performing the given action with the result of either this or
752 * the other given CompletableFuture's result, when either
753 * complete. If this and/or the other CompletableFuture complete
754 * exceptionally, then the returned CompletableFuture may also do
755 * so, with a RuntimeException having one of these exceptions as
756 * its cause. No guarantees are made about which exception is
757 * used in the returned CompletableFuture.
758 *
759 * @param other the other CompletableFuture
760 * @param block the action to perform before completing the
761 * returned CompletableFuture
762 * @param executor the executor to use for asynchronous execution
763 * @return the new CompletableFuture
764 */
765 public CompletableFuture<Void> orRunAsync(CompletableFuture<? extends T> other,
766 Block<? super T> block,
767 Executor executor) {
768 if (executor == null) throw new NullPointerException();
769 return orBlock(other, block, executor);
770 }
771
772 /**
773 * Creates and returns a CompletableFuture that is completed
774 * after this or the other given CompletableFuture complete. If
775 * this and/or the other CompletableFuture complete exceptionally,
776 * then the returned CompletableFuture may also do so, with a
777 * RuntimeException having one of these exceptions as its cause.
778 * No guarantees are made about which exception is used in the
779 * returned CompletableFuture.
780 *
781 * @param other the other CompletableFuture
782 * @param action the action to perform before completing the
783 * returned CompletableFuture
784 * @return the new CompletableFuture
785 */
786 public CompletableFuture<Void> orRun(CompletableFuture<?> other,
787 Runnable action) {
788 return orRunnable(other, action, null);
789 }
790
791 /**
792 * Creates and returns a CompletableFuture that is completed
793 * asynchronously using the {@link ForkJoinPool#commonPool()}
794 * after this or the other given CompletableFuture complete. If
795 * this and/or the other CompletableFuture complete exceptionally,
796 * then the returned CompletableFuture may also do so, with a
797 * RuntimeException having one of these exceptions as its cause.
798 * No guarantees are made about which exception is used in the
799 * returned CompletableFuture.
800 *
801 * @param other the other CompletableFuture
802 * @param action the action to perform before completing the
803 * returned CompletableFuture
804 * @return the new CompletableFuture
805 */
806 public CompletableFuture<Void> orRunAsync(CompletableFuture<?> other,
807 Runnable action) {
808 return orRunnable(other, action, ForkJoinPool.commonPool());
809 }
810
811 /**
812 * Creates and returns a CompletableFuture that is completed
813 * asynchronously using the given executor after this or the other
814 * given CompletableFuture complete. If this and/or the other
815 * CompletableFuture complete exceptionally, then the returned
816 * CompletableFuture may also do so, with a RuntimeException
817 * having one of these exceptions as its cause. No guarantees are
818 * made about which exception is used in the returned
819 * CompletableFuture.
820 *
821 * @param other the other CompletableFuture
822 * @param action the action to perform before completing the
823 * returned CompletableFuture
824 * @param executor the executor to use for asynchronous execution
825 * @return the new CompletableFuture
826 */
827 public CompletableFuture<Void> orRunAsync(CompletableFuture<?> other,
828 Runnable action,
829 Executor executor) {
830 if (executor == null) throw new NullPointerException();
831 return orRunnable(other, action, executor);
832 }
833
834 /**
835 * Creates and returns a CompletableFuture that is completed with
836 * the result of the given function of the exception triggering
837 * this CompletableFuture's completion when it completes
838 * exceptionally; Otherwise, if this CompletableFuture completes
839 * normally, then the returned CompletableFuture also completes
840 * normally with the same value.
841 *
842 * @param fn the function to use to compute the value of the
843 * returned CompletableFuture if this CompletableFuture completed
844 * exceptionally
845 * @return the new CompletableFuture
846 */
847 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
848 if (fn == null) throw new NullPointerException();
849 CompletableFuture<T> dst = new CompletableFuture<T>();
850 ExceptionAction<T> d = null;
851 Object r;
852 if ((r = result) == null) {
853 CompletionNode p =
854 new CompletionNode(d = new ExceptionAction<T>(this, fn, dst));
855 while ((r = result) == null) {
856 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
857 p.next = completions, p))
858 break;
859 }
860 }
861 if (r != null && (d == null || d.compareAndSet(0, 1))) {
862 T t; Throwable ex = null;
863 if (r instanceof AltResult) {
864 if ((ex = ((AltResult)r).ex) != null) {
865 try {
866 dst.complete(fn.apply(ex));
867 } catch (Throwable rex) {
868 dst.completeExceptionally(rex);
869 }
870 }
871 t = null;
872 }
873 else
874 t = (T) r;
875 if (ex == null)
876 dst.complete(t);
877 }
878 if (r != null)
879 postComplete();
880 return dst;
881 }
882
883 /**
884 * Attempts to complete this CompletableFuture with
885 * a {@link CancellationException}.
886 *
887 * @param mayInterruptIfRunning this value has no effect in this
888 * implementation because interrupts are not used to control
889 * processing.
890 *
891 * @return {@code true} if this task is now cancelled
892 */
893 public boolean cancel(boolean mayInterruptIfRunning) {
894 Object r;
895 while ((r = result) == null) {
896 r = new AltResult(new CancellationException());
897 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
898 postComplete();
899 return true;
900 }
901 }
902 return ((r instanceof AltResult) &&
903 (((AltResult)r).ex instanceof CancellationException));
904 }
905
906 /**
907 * Returns {@code true} if this CompletableFuture was cancelled
908 * before it completed normally.
909 *
910 * @return {@code true} if this CompletableFuture was cancelled
911 * before it completed normally
912 */
913 public boolean isCancelled() {
914 Object r;
915 return ((r = result) != null &&
916 (r instanceof AltResult) &&
917 (((AltResult)r).ex instanceof CancellationException));
918 }
919
920 /**
921 * Forcibly sets or resets the value subsequently returned by
922 * method get() and related methods, whether or not already
923 * completed. This method is designed for use only in error
924 * recovery actions, and even in such situations may result in
925 * ongoing dependent completions using established versus
926 * overwritten values.
927 *
928 * @param value the completion value
929 */
930 public void obtrudeValue(T value) {
931 result = (value == null) ? NIL : value;
932 postComplete();
933 }
934
935 /**
936 * Removes and signals all waiting threads and runs all completions
937 */
938 private void postComplete() {
939 WaitNode q; Thread t;
940 while ((q = waiters) != null) {
941 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
942 (t = q.thread) != null) {
943 q.thread = null;
944 LockSupport.unpark(t);
945 }
946 }
947
948 CompletionNode h; Completion c;
949 while ((h = completions) != null) {
950 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
951 (c = h.completion) != null)
952 c.run();
953 }
954 }
955
956 /* ------------- waiting for completions -------------- */
957
958 /**
959 * Heuristic spin value for waitingGet() before blocking on
960 * multiprocessors
961 */
962 static final int WAITING_GET_SPINS = 256;
963
964 /**
965 * Returns result after waiting.
966 */
967 private T waitingGet() {
968 WaitNode q = null;
969 boolean queued = false, interrupted = false;
970 int h = 0, spins = 0;
971 for (Object r;;) {
972 if ((r = result) != null) {
973 Throwable ex;
974 if (q != null) // suppress unpark
975 q.thread = null;
976 postComplete(); // help release others
977 if (interrupted)
978 Thread.currentThread().interrupt();
979 if (r instanceof AltResult) {
980 if ((ex = ((AltResult)r).ex) != null) {
981 if (ex instanceof Error)
982 throw (Error)ex;
983 if (ex instanceof RuntimeException)
984 throw (RuntimeException)ex;
985 throw new RuntimeException(ex);
986 }
987 return null;
988 }
989 return (T)r;
990 }
991 else if (h == 0) {
992 h = ThreadLocalRandom.current().nextInt();
993 if (Runtime.getRuntime().availableProcessors() > 1)
994 spins = WAITING_GET_SPINS;
995 }
996 else if (spins > 0) {
997 h ^= h << 1; // xorshift
998 h ^= h >>> 3;
999 if ((h ^= h << 10) >= 0)
1000 --spins;
1001 }
1002 else if (q == null)
1003 q = new WaitNode();
1004 else if (!queued)
1005 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1006 q.next = waiters, q);
1007 else if (Thread.interrupted())
1008 interrupted = true;
1009 else if (q.thread == null)
1010 q.thread = Thread.currentThread();
1011 else
1012 LockSupport.park(this);
1013 }
1014 }
1015
1016 /**
1017 * Awaits completion or aborts on interrupt or timeout.
1018 *
1019 * @param nanos time to wait
1020 * @return raw result
1021 */
1022 private Object timedAwaitDone(long nanos)
1023 throws InterruptedException, TimeoutException {
1024 final long deadline = System.nanoTime() + nanos;
1025 WaitNode q = null;
1026 boolean queued = false;
1027 for (Object r;;) {
1028 if (Thread.interrupted()) {
1029 removeWaiter(q);
1030 throw new InterruptedException();
1031 }
1032 else if ((r = result) != null) {
1033 if (q != null)
1034 q.thread = null;
1035 postComplete();
1036 return r;
1037 }
1038 else if (q == null)
1039 q = new WaitNode();
1040 else if (!queued)
1041 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1042 q.next = waiters, q);
1043 else if ((nanos = deadline - System.nanoTime()) <= 0L) {
1044 removeWaiter(q);
1045 throw new TimeoutException();
1046 }
1047 else if (q.thread == null)
1048 q.thread = Thread.currentThread();
1049 else
1050 LockSupport.parkNanos(this, nanos);
1051 }
1052 }
1053
1054 /**
1055 * Tries to unlink a timed-out or interrupted wait node to avoid
1056 * accumulating garbage. Internal nodes are simply unspliced
1057 * without CAS since it is harmless if they are traversed anyway
1058 * by releasers. To avoid effects of unsplicing from already
1059 * removed nodes, the list is retraversed in case of an apparent
1060 * race. This is slow when there are a lot of nodes, but we don't
1061 * expect lists to be long enough to outweigh higher-overhead
1062 * schemes.
1063 */
1064 private void removeWaiter(WaitNode node) {
1065 if (node != null) {
1066 node.thread = null;
1067 retry:
1068 for (;;) { // restart on removeWaiter race
1069 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
1070 s = q.next;
1071 if (q.thread != null)
1072 pred = q;
1073 else if (pred != null) {
1074 pred.next = s;
1075 if (pred.thread == null) // check for race
1076 continue retry;
1077 }
1078 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
1079 continue retry;
1080 }
1081 break;
1082 }
1083 }
1084 }
1085
1086 /* ------------- Async tasks -------------- */
1087
1088 /** Base class can act as either FJ or plain Runnable */
1089 static abstract class Async extends ForkJoinTask<Void> implements Runnable {
1090 public final Void getRawResult() { return null; }
1091 public final void setRawResult(Void v) { }
1092 public final void run() { exec(); }
1093 }
1094
1095 static final class AsyncRunnable extends Async {
1096 final Runnable runnable;
1097 final CompletableFuture<Void> dst;
1098 AsyncRunnable(Runnable runnable, CompletableFuture<Void> dst) {
1099 this.runnable = runnable; this.dst = dst;
1100 }
1101 public final boolean exec() {
1102 Runnable fn;
1103 CompletableFuture<Void> d;
1104 if ((fn = this.runnable) == null || (d = this.dst) == null)
1105 throw new NullPointerException();
1106 try {
1107 fn.run();
1108 d.complete(null);
1109 } catch (Throwable ex) {
1110 d.completeExceptionally(ex);
1111 }
1112 return true;
1113 }
1114 private static final long serialVersionUID = 5232453952276885070L;
1115 }
1116
1117 static final class AsyncSupplier<U> extends Async {
1118 final Supplier<U> supplier;
1119 final CompletableFuture<U> dst;
1120 AsyncSupplier(Supplier<U> supplier, CompletableFuture<U> dst) {
1121 this.supplier = supplier; this.dst = dst;
1122 }
1123 public final boolean exec() {
1124 Supplier<U> fn;
1125 CompletableFuture<U> d;
1126 if ((fn = this.supplier) == null || (d = this.dst) == null)
1127 throw new NullPointerException();
1128 try {
1129 d.complete(fn.get());
1130 } catch (Throwable ex) {
1131 d.completeExceptionally(ex);
1132 }
1133 return true;
1134 }
1135 private static final long serialVersionUID = 5232453952276885070L;
1136 }
1137
1138 static final class AsyncFunction<T,U> extends Async {
1139 Function<? super T,? extends U> fn;
1140 T arg;
1141 final CompletableFuture<U> dst;
1142 AsyncFunction(T arg, Function<? super T,? extends U> fn,
1143 CompletableFuture<U> dst) {
1144 this.arg = arg; this.fn = fn; this.dst = dst;
1145 }
1146 public final boolean exec() {
1147 Function<? super T,? extends U> fn;
1148 CompletableFuture<U> d;
1149 if ((fn = this.fn) == null || (d = this.dst) == null)
1150 throw new NullPointerException();
1151 try {
1152 d.complete(fn.apply(arg));
1153 } catch (Throwable ex) {
1154 d.completeExceptionally(ex);
1155 }
1156 return true;
1157 }
1158 private static final long serialVersionUID = 5232453952276885070L;
1159 }
1160
1161 static final class AsyncBiFunction<T,U,V> extends Async {
1162 final BiFunction<? super T,? super U,? extends V> fn;
1163 final T arg1;
1164 final U arg2;
1165 final CompletableFuture<V> dst;
1166 AsyncBiFunction(T arg1, U arg2,
1167 BiFunction<? super T,? super U,? extends V> fn,
1168 CompletableFuture<V> dst) {
1169 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1170 }
1171 public final boolean exec() {
1172 BiFunction<? super T,? super U,? extends V> fn;
1173 CompletableFuture<V> d;
1174 if ((fn = this.fn) == null || (d = this.dst) == null)
1175 throw new NullPointerException();
1176 try {
1177 d.complete(fn.apply(arg1, arg2));
1178 } catch (Throwable ex) {
1179 d.completeExceptionally(ex);
1180 }
1181 return true;
1182 }
1183 private static final long serialVersionUID = 5232453952276885070L;
1184 }
1185
1186 static final class AsyncBlock<T> extends Async {
1187 Block<? super T> fn;
1188 T arg;
1189 final CompletableFuture<Void> dst;
1190 AsyncBlock(T arg, Block<? super T> fn,
1191 CompletableFuture<Void> dst) {
1192 this.arg = arg; this.fn = fn; this.dst = dst;
1193 }
1194 public final boolean exec() {
1195 Block<? super T> fn;
1196 CompletableFuture<Void> d;
1197 if ((fn = this.fn) == null || (d = this.dst) == null)
1198 throw new NullPointerException();
1199 try {
1200 fn.accept(arg);
1201 d.complete(null);
1202 } catch (Throwable ex) {
1203 d.completeExceptionally(ex);
1204 }
1205 return true;
1206 }
1207 private static final long serialVersionUID = 5232453952276885070L;
1208 }
1209
1210 static final class AsyncBiBlock<T,U> extends Async {
1211 final BiBlock<? super T,? super U> fn;
1212 final T arg1;
1213 final U arg2;
1214 final CompletableFuture<Void> dst;
1215 AsyncBiBlock(T arg1, U arg2,
1216 BiBlock<? super T,? super U> fn,
1217 CompletableFuture<Void> dst) {
1218 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1219 }
1220 public final boolean exec() {
1221 BiBlock<? super T,? super U> fn;
1222 CompletableFuture<Void> d;
1223 if ((fn = this.fn) == null || (d = this.dst) == null)
1224 throw new NullPointerException();
1225 try {
1226 fn.accept(arg1, arg2);
1227 d.complete(null);
1228 } catch (Throwable ex) {
1229 d.completeExceptionally(ex);
1230 }
1231 return true;
1232 }
1233 private static final long serialVersionUID = 5232453952276885070L;
1234 }
1235
1236 /* ------------- Completions -------------- */
1237
1238 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1239 static abstract class Completion extends AtomicInteger implements Runnable {
1240 }
1241
1242 static final class ThenFunction<T,U> extends Completion {
1243 final CompletableFuture<? extends T> src;
1244 final Function<? super T,? extends U> fn;
1245 final CompletableFuture<U> dst;
1246 final Executor executor;
1247 ThenFunction(CompletableFuture<? extends T> src,
1248 final Function<? super T,? extends U> fn,
1249 final CompletableFuture<U> dst, Executor executor) {
1250 this.src = src; this.fn = fn; this.dst = dst;
1251 this.executor = executor;
1252 }
1253 public void run() {
1254 CompletableFuture<? extends T> a;
1255 Function<? super T,? extends U> fn;
1256 CompletableFuture<U> dst;
1257 Object r; T t; Throwable ex;
1258 if ((dst = this.dst) != null &&
1259 (fn = this.fn) != null &&
1260 (a = this.src) != null &&
1261 (r = a.result) != null &&
1262 compareAndSet(0, 1)) {
1263 if (r instanceof AltResult) {
1264 if ((ex = ((AltResult)r).ex) != null) {
1265 dst.completeExceptionally(new RuntimeException(ex));
1266 return;
1267 }
1268 t = null;
1269 }
1270 else
1271 t = (T) r;
1272 try {
1273 if (executor != null)
1274 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
1275 else
1276 dst.complete(fn.apply(t));
1277 } catch (Throwable rex) {
1278 dst.completeExceptionally(rex);
1279 }
1280 }
1281 }
1282 }
1283
1284 static final class ThenBlock<T> extends Completion {
1285 final CompletableFuture<? extends T> src;
1286 final Block<? super T> fn;
1287 final CompletableFuture<Void> dst;
1288 final Executor executor;
1289 ThenBlock(CompletableFuture<? extends T> src,
1290 final Block<? super T> fn,
1291 final CompletableFuture<Void> dst, Executor executor) {
1292 this.src = src; this.fn = fn; this.dst = dst;
1293 this.executor = executor;
1294 }
1295 public void run() {
1296 CompletableFuture<? extends T> a;
1297 Block<? super T> fn;
1298 CompletableFuture<Void> dst;
1299 Object r; T t; Throwable ex;
1300 if ((dst = this.dst) != null &&
1301 (fn = this.fn) != null &&
1302 (a = this.src) != null &&
1303 (r = a.result) != null &&
1304 compareAndSet(0, 1)) {
1305 if (r instanceof AltResult) {
1306 if ((ex = ((AltResult)r).ex) != null) {
1307 dst.completeExceptionally(new RuntimeException(ex));
1308 return;
1309 }
1310 t = null;
1311 }
1312 else
1313 t = (T) r;
1314 try {
1315 if (executor != null)
1316 executor.execute(new AsyncBlock<T>(t, fn, dst));
1317 else {
1318 fn.accept(t);
1319 dst.complete(null);
1320 }
1321 } catch (Throwable rex) {
1322 dst.completeExceptionally(rex);
1323 }
1324 }
1325 }
1326 }
1327
1328 static final class ThenRunnable<T> extends Completion {
1329 final CompletableFuture<? extends T> src;
1330 final Runnable fn;
1331 final CompletableFuture<Void> dst;
1332 final Executor executor;
1333 ThenRunnable(CompletableFuture<? extends T> src,
1334 Runnable fn,
1335 CompletableFuture<Void> dst,
1336 Executor executor) {
1337 this.src = src; this.fn = fn; this.dst = dst;
1338 this.executor = executor;
1339 }
1340 public void run() {
1341 CompletableFuture<? extends T> a;
1342 Runnable fn;
1343 CompletableFuture<Void> dst;
1344 Object r; Throwable ex;
1345 if ((dst = this.dst) != null &&
1346 (fn = this.fn) != null &&
1347 (a = this.src) != null &&
1348 (r = a.result) != null &&
1349 compareAndSet(0, 1)) {
1350 if (r instanceof AltResult) {
1351 if ((ex = ((AltResult)r).ex) != null) {
1352 dst.completeExceptionally(new RuntimeException(ex));
1353 return;
1354 }
1355 }
1356 try {
1357 if (executor != null)
1358 executor.execute(new AsyncRunnable(fn, dst));
1359 else {
1360 fn.run();
1361 dst.complete(null);
1362 }
1363 } catch (Throwable rex) {
1364 dst.completeExceptionally(rex);
1365 }
1366 }
1367 }
1368 }
1369
1370 static final class AndFunction<T,U,V> extends Completion {
1371 final CompletableFuture<? extends T> src;
1372 final CompletableFuture<? extends U> snd;
1373 final BiFunction<? super T,? super U,? extends V> fn;
1374 final CompletableFuture<V> dst;
1375 final Executor executor;
1376 AndFunction(CompletableFuture<? extends T> src,
1377 CompletableFuture<? extends U> snd,
1378 BiFunction<? super T,? super U,? extends V> fn,
1379 CompletableFuture<V> dst, Executor executor) {
1380 this.src = src; this.snd = snd;
1381 this.fn = fn; this.dst = dst;
1382 this.executor = executor;
1383 }
1384 public void run() {
1385 Object r, s; T t; U u; Throwable ex;
1386 CompletableFuture<? extends T> a;
1387 CompletableFuture<? extends U> b;
1388 BiFunction<? super T,? super U,? extends V> fn;
1389 CompletableFuture<V> dst;
1390 if ((dst = this.dst) != null &&
1391 (fn = this.fn) != null &&
1392 (a = this.src) != null &&
1393 (r = a.result) != null &&
1394 (b = this.snd) != null &&
1395 (s = b.result) != null &&
1396 compareAndSet(0, 1)) {
1397 if (r instanceof AltResult) {
1398 if ((ex = ((AltResult)r).ex) != null) {
1399 dst.completeExceptionally(new RuntimeException(ex));
1400 return;
1401 }
1402 t = null;
1403 }
1404 else
1405 t = (T) r;
1406 if (s instanceof AltResult) {
1407 if ((ex = ((AltResult)s).ex) != null) {
1408 dst.completeExceptionally(new RuntimeException(ex));
1409 return;
1410 }
1411 u = null;
1412 }
1413 else
1414 u = (U) s;
1415 try {
1416 if (executor != null)
1417 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1418 else
1419 dst.complete(fn.apply(t, u));
1420 } catch (Throwable rex) {
1421 dst.completeExceptionally(rex);
1422 }
1423 }
1424 }
1425 }
1426
1427 static final class AndBlock<T,U> extends Completion {
1428 final CompletableFuture<? extends T> src;
1429 final CompletableFuture<? extends U> snd;
1430 final BiBlock<? super T,? super U> fn;
1431 final CompletableFuture<Void> dst;
1432 final Executor executor;
1433 AndBlock(CompletableFuture<? extends T> src,
1434 CompletableFuture<? extends U> snd,
1435 BiBlock<? super T,? super U> fn,
1436 CompletableFuture<Void> dst, Executor executor) {
1437 this.src = src; this.snd = snd;
1438 this.fn = fn; this.dst = dst;
1439 this.executor = executor;
1440 }
1441 public void run() {
1442 Object r, s; T t; U u; Throwable ex;
1443 CompletableFuture<? extends T> a;
1444 CompletableFuture<? extends U> b;
1445 BiBlock<? super T,? super U> fn;
1446 CompletableFuture<Void> dst;
1447 if ((dst = this.dst) != null &&
1448 (fn = this.fn) != null &&
1449 (a = this.src) != null &&
1450 (r = a.result) != null &&
1451 (b = this.snd) != null &&
1452 (s = b.result) != null &&
1453 compareAndSet(0, 1)) {
1454 if (r instanceof AltResult) {
1455 if ((ex = ((AltResult)r).ex) != null) {
1456 dst.completeExceptionally(new RuntimeException(ex));
1457 return;
1458 }
1459 t = null;
1460 }
1461 else
1462 t = (T) r;
1463 if (s instanceof AltResult) {
1464 if ((ex = ((AltResult)s).ex) != null) {
1465 dst.completeExceptionally(new RuntimeException(ex));
1466 return;
1467 }
1468 u = null;
1469 }
1470 else
1471 u = (U) s;
1472 try {
1473 if (executor != null)
1474 executor.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
1475 else {
1476 fn.accept(t, u);
1477 dst.complete(null);
1478 }
1479 } catch (Throwable rex) {
1480 dst.completeExceptionally(rex);
1481 }
1482 }
1483 }
1484 }
1485
1486 static final class AndRunnable<T> extends Completion {
1487 final CompletableFuture<? extends T> src;
1488 final CompletableFuture<?> snd;
1489 final Runnable fn;
1490 final CompletableFuture<Void> dst;
1491 final Executor executor;
1492 AndRunnable(CompletableFuture<? extends T> src,
1493 CompletableFuture<?> snd,
1494 Runnable fn,
1495 CompletableFuture<Void> dst, Executor executor) {
1496 this.src = src; this.snd = snd;
1497 this.fn = fn; this.dst = dst;
1498 this.executor = executor;
1499 }
1500 public void run() {
1501 Object r, s; Throwable ex;
1502 final CompletableFuture<? extends T> a;
1503 final CompletableFuture<?> b;
1504 final Runnable fn;
1505 final CompletableFuture<Void> dst;
1506 if ((dst = this.dst) != null &&
1507 (fn = this.fn) != null &&
1508 (a = this.src) != null &&
1509 (r = a.result) != null &&
1510 (b = this.snd) != null &&
1511 (s = b.result) != null &&
1512 compareAndSet(0, 1)) {
1513 if (r instanceof AltResult) {
1514 if ((ex = ((AltResult)r).ex) != null) {
1515 dst.completeExceptionally(new RuntimeException(ex));
1516 return;
1517 }
1518 }
1519 if (s instanceof AltResult) {
1520 if ((ex = ((AltResult)s).ex) != null) {
1521 dst.completeExceptionally(new RuntimeException(ex));
1522 return;
1523 }
1524 }
1525 try {
1526 if (executor != null)
1527 executor.execute(new AsyncRunnable(fn, dst));
1528 else {
1529 fn.run();
1530 dst.complete(null);
1531 }
1532 } catch (Throwable rex) {
1533 dst.completeExceptionally(rex);
1534 }
1535 }
1536 }
1537 }
1538
1539 static final class OrFunction<T,U> extends Completion {
1540 final CompletableFuture<? extends T> src;
1541 final CompletableFuture<? extends T> snd;
1542 final Function<? super T,? extends U> fn;
1543 final CompletableFuture<U> dst;
1544 final Executor executor;
1545 OrFunction(CompletableFuture<? extends T> src,
1546 CompletableFuture<? extends T> snd,
1547 Function<? super T,? extends U> fn,
1548 CompletableFuture<U> dst, Executor executor) {
1549 this.src = src; this.snd = snd;
1550 this.fn = fn; this.dst = dst;
1551 this.executor = executor;
1552 }
1553 public void run() {
1554 Object r; T t; Throwable ex;
1555 CompletableFuture<? extends T> a;
1556 CompletableFuture<? extends T> b;
1557 Function<? super T,? extends U> fn;
1558 CompletableFuture<U> dst;
1559 if ((dst = this.dst) != null &&
1560 (fn = this.fn) != null &&
1561 (((a = this.src) != null && (r = a.result) != null) ||
1562 ((b = this.snd) != null && (r = b.result) != null)) &&
1563 compareAndSet(0, 1)) {
1564 if (r instanceof AltResult) {
1565 if ((ex = ((AltResult)r).ex) != null) {
1566 dst.completeExceptionally(new RuntimeException(ex));
1567 return;
1568 }
1569 t = null;
1570 }
1571 else
1572 t = (T) r;
1573 try {
1574 if (executor != null)
1575 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
1576 else
1577 dst.complete(fn.apply(t));
1578 } catch (Throwable rex) {
1579 dst.completeExceptionally(rex);
1580 }
1581 }
1582 }
1583 }
1584
1585 static final class OrBlock<T> extends Completion {
1586 final CompletableFuture<? extends T> src;
1587 final CompletableFuture<? extends T> snd;
1588 final Block<? super T> fn;
1589 final CompletableFuture<Void> dst;
1590 final Executor executor;
1591 OrBlock(CompletableFuture<? extends T> src,
1592 CompletableFuture<? extends T> snd,
1593 Block<? super T> fn,
1594 CompletableFuture<Void> dst, Executor executor) {
1595 this.src = src; this.snd = snd;
1596 this.fn = fn; this.dst = dst;
1597 this.executor = executor;
1598 }
1599 public void run() {
1600 Object r; T t; Throwable ex;
1601 CompletableFuture<? extends T> a;
1602 CompletableFuture<? extends T> b;
1603 Block<? super T> fn;
1604 CompletableFuture<Void> dst;
1605 if ((dst = this.dst) != null &&
1606 (fn = this.fn) != null &&
1607 (((a = this.src) != null && (r = a.result) != null) ||
1608 ((b = this.snd) != null && (r = b.result) != null)) &&
1609 compareAndSet(0, 1)) {
1610 if (r instanceof AltResult) {
1611 if ((ex = ((AltResult)r).ex) != null) {
1612 dst.completeExceptionally(new RuntimeException(ex));
1613 return;
1614 }
1615 t = null;
1616 }
1617 else
1618 t = (T) r;
1619 try {
1620 if (executor != null)
1621 executor.execute(new AsyncBlock<T>(t, fn, dst));
1622 else {
1623 fn.accept(t);
1624 dst.complete(null);
1625 }
1626 } catch (Throwable rex) {
1627 dst.completeExceptionally(rex);
1628 }
1629 }
1630 }
1631 }
1632
1633 static final class OrRunnable<T> extends Completion {
1634 final CompletableFuture<? extends T> src;
1635 final CompletableFuture<?> snd;
1636 final Runnable fn;
1637 final CompletableFuture<Void> dst;
1638 final Executor executor;
1639 OrRunnable(CompletableFuture<? extends T> src,
1640 CompletableFuture<?> snd,
1641 Runnable fn,
1642 CompletableFuture<Void> dst, Executor executor) {
1643 this.src = src; this.snd = snd;
1644 this.fn = fn; this.dst = dst;
1645 this.executor = executor;
1646 }
1647 public void run() {
1648 Object r; Throwable ex;
1649 CompletableFuture<? extends T> a;
1650 final CompletableFuture<?> b;
1651 final Runnable fn;
1652 final CompletableFuture<Void> dst;
1653 if ((dst = this.dst) != null &&
1654 (fn = this.fn) != null &&
1655 (((a = this.src) != null && (r = a.result) != null) ||
1656 ((b = this.snd) != null && (r = b.result) != null)) &&
1657 compareAndSet(0, 1)) {
1658 if ((r instanceof AltResult) &&
1659 (ex = ((AltResult)r).ex) != null) {
1660 dst.completeExceptionally(new RuntimeException(ex));
1661 }
1662 else {
1663 try {
1664 if (executor != null)
1665 executor.execute(new AsyncRunnable(fn, dst));
1666 else {
1667 fn.run();
1668 dst.complete(null);
1669 }
1670 } catch (Throwable rex) {
1671 dst.completeExceptionally(rex);
1672 }
1673 }
1674 }
1675 }
1676 }
1677
1678 static final class ExceptionAction<T> extends Completion {
1679 final CompletableFuture<? extends T> src;
1680 final Function<? super Throwable, ? extends T> fn;
1681 final CompletableFuture<T> dst;
1682 ExceptionAction(CompletableFuture<? extends T> src,
1683 Function<? super Throwable, ? extends T> fn,
1684 CompletableFuture<T> dst) {
1685 this.src = src; this.fn = fn; this.dst = dst;
1686 }
1687 public void run() {
1688 CompletableFuture<? extends T> a;
1689 Function<? super Throwable, ? extends T> fn;
1690 CompletableFuture<T> dst;
1691 Object r; T t; Throwable ex;
1692 if ((dst = this.dst) != null &&
1693 (fn = this.fn) != null &&
1694 (a = this.src) != null &&
1695 (r = a.result) != null &&
1696 compareAndSet(0, 1)) {
1697 if (r instanceof AltResult) {
1698 if ((ex = ((AltResult)r).ex) != null) {
1699 try {
1700 dst.complete(fn.apply(ex));
1701 } catch (Throwable rex) {
1702 dst.completeExceptionally(rex);
1703 }
1704 return;
1705 }
1706 t = null;
1707 }
1708 else
1709 t = (T) r;
1710 dst.complete(t);
1711 }
1712 }
1713 }
1714
1715 /* ------------- then/and/or implementations -------------- */
1716
1717 private <U> CompletableFuture<U> thenFunction(Function<? super T,? extends U> fn,
1718 Executor executor) {
1719
1720 if (fn == null) throw new NullPointerException();
1721 CompletableFuture<U> dst = new CompletableFuture<U>();
1722 ThenFunction<T,U> d = null;
1723 Object r;
1724 if ((r = result) == null) {
1725 CompletionNode p = new CompletionNode
1726 (d = new ThenFunction<T,U>(this, fn, dst, executor));
1727 while ((r = result) == null) {
1728 if (UNSAFE.compareAndSwapObject
1729 (this, COMPLETIONS, p.next = completions, p))
1730 break;
1731 }
1732 }
1733 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1734 T t; Throwable ex = null;
1735 if (r instanceof AltResult) {
1736 if ((ex = ((AltResult)r).ex) != null)
1737 dst.completeExceptionally(new RuntimeException(ex));
1738 t = null;
1739 }
1740 else
1741 t = (T) r;
1742 if (ex == null) {
1743 try {
1744 if (executor != null)
1745 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
1746 else
1747 dst.complete(fn.apply(t));
1748 } catch (Throwable rex) {
1749 dst.completeExceptionally(rex);
1750 }
1751 }
1752 postComplete();
1753 }
1754 return dst;
1755 }
1756
1757 private CompletableFuture<Void> thenBlock(Block<? super T> fn,
1758 Executor executor) {
1759
1760 if (fn == null) throw new NullPointerException();
1761 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1762 ThenBlock<T> d = null;
1763 Object r;
1764 if ((r = result) == null) {
1765 CompletionNode p = new CompletionNode
1766 (d = new ThenBlock<T>(this, fn, dst, executor));
1767 while ((r = result) == null) {
1768 if (UNSAFE.compareAndSwapObject
1769 (this, COMPLETIONS, p.next = completions, p))
1770 break;
1771 }
1772 }
1773 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1774 T t; Throwable ex = null;
1775 if (r instanceof AltResult) {
1776 if ((ex = ((AltResult)r).ex) != null)
1777 dst.completeExceptionally(new RuntimeException(ex));
1778 t = null;
1779 }
1780 else
1781 t = (T) r;
1782 if (ex == null) {
1783 try {
1784 if (executor != null)
1785 executor.execute(new AsyncBlock<T>(t, fn, dst));
1786 else {
1787 fn.accept(t);
1788 dst.complete(null);
1789 }
1790 } catch (Throwable rex) {
1791 dst.completeExceptionally(rex);
1792 }
1793 }
1794 postComplete();
1795 }
1796 return dst;
1797 }
1798
1799 private CompletableFuture<Void> thenRunnable(Runnable action,
1800 Executor executor) {
1801 if (action == null) throw new NullPointerException();
1802 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1803 ThenRunnable<T> d = null;
1804 Object r;
1805 if ((r = result) == null) {
1806 CompletionNode p = new CompletionNode
1807 (d = new ThenRunnable<T>(this, action, dst, executor));
1808 while ((r = result) == null) {
1809 if (UNSAFE.compareAndSwapObject
1810 (this, COMPLETIONS, p.next = completions, p))
1811 break;
1812 }
1813 }
1814 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1815 Throwable ex = null;
1816 if (r instanceof AltResult) {
1817 if ((ex = ((AltResult)r).ex) != null)
1818 dst.completeExceptionally(new RuntimeException(ex));
1819 }
1820 if (ex == null) {
1821 try {
1822 if (executor != null)
1823 executor.execute(new AsyncRunnable(action, dst));
1824 else {
1825 action.run();
1826 dst.complete(null);
1827 }
1828 } catch (Throwable rex) {
1829 dst.completeExceptionally(rex);
1830 }
1831 }
1832 postComplete();
1833 }
1834 return dst;
1835 }
1836
1837 private <U,V> CompletableFuture<V> andFunction(CompletableFuture<? extends U> other,
1838 BiFunction<? super T,? super U,? extends V> fn,
1839 Executor executor) {
1840 if (other == null || fn == null) throw new NullPointerException();
1841 CompletableFuture<V> dst = new CompletableFuture<V>();
1842 AndFunction<T,U,V> d = null;
1843 Object r, s = null;
1844 if ((r = result) == null || (s = other.result) == null) {
1845 d = new AndFunction<T,U,V>(this, other, fn, dst, executor);
1846 CompletionNode q = null, p = new CompletionNode(d);
1847 while ((r == null && (r = result) == null) ||
1848 (s == null && (s = other.result) == null)) {
1849 if (q != null) {
1850 if (s != null ||
1851 UNSAFE.compareAndSwapObject
1852 (other, COMPLETIONS, q.next = other.completions, q))
1853 break;
1854 }
1855 else if (r != null ||
1856 UNSAFE.compareAndSwapObject
1857 (this, COMPLETIONS, p.next = completions, p)) {
1858 if (s != null)
1859 break;
1860 q = new CompletionNode(d);
1861 }
1862 }
1863 }
1864 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1865 T t; U u; Throwable ex = null;
1866 if (r instanceof AltResult) {
1867 if ((ex = ((AltResult)r).ex) != null)
1868 dst.completeExceptionally(new RuntimeException(ex));
1869 t = null;
1870 }
1871 else
1872 t = (T) r;
1873 if (ex != null)
1874 u = null;
1875 else if (s instanceof AltResult) {
1876 if ((ex = ((AltResult)s).ex) != null)
1877 dst.completeExceptionally(new RuntimeException(ex));
1878 u = null;
1879 }
1880 else
1881 u = (U) s;
1882 if (ex == null) {
1883 try {
1884 if (executor != null)
1885 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1886 else
1887 dst.complete(fn.apply(t, u));
1888 } catch (Throwable rex) {
1889 dst.completeExceptionally(rex);
1890 }
1891 }
1892 }
1893 if (r != null)
1894 postComplete();
1895 if (s != null)
1896 other.postComplete();
1897 return dst;
1898 }
1899
1900 private <U> CompletableFuture<Void> andBlock(CompletableFuture<? extends U> other,
1901 BiBlock<? super T,? super U> fn,
1902 Executor executor) {
1903 if (other == null || fn == null) throw new NullPointerException();
1904 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1905 AndBlock<T,U> d = null;
1906 Object r, s = null;
1907 if ((r = result) == null || (s = other.result) == null) {
1908 d = new AndBlock<T,U>(this, other, fn, dst, executor);
1909 CompletionNode q = null, p = new CompletionNode(d);
1910 while ((r == null && (r = result) == null) ||
1911 (s == null && (s = other.result) == null)) {
1912 if (q != null) {
1913 if (s != null ||
1914 UNSAFE.compareAndSwapObject
1915 (other, COMPLETIONS, q.next = other.completions, q))
1916 break;
1917 }
1918 else if (r != null ||
1919 UNSAFE.compareAndSwapObject
1920 (this, COMPLETIONS, p.next = completions, p)) {
1921 if (s != null)
1922 break;
1923 q = new CompletionNode(d);
1924 }
1925 }
1926 }
1927 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1928 T t; U u; Throwable ex = null;
1929 if (r instanceof AltResult) {
1930 if ((ex = ((AltResult)r).ex) != null)
1931 dst.completeExceptionally(new RuntimeException(ex));
1932 t = null;
1933 }
1934 else
1935 t = (T) r;
1936 if (ex != null)
1937 u = null;
1938 else if (s instanceof AltResult) {
1939 if ((ex = ((AltResult)s).ex) != null)
1940 dst.completeExceptionally(new RuntimeException(ex));
1941 u = null;
1942 }
1943 else
1944 u = (U) s;
1945 if (ex == null) {
1946 try {
1947 if (executor != null)
1948 executor.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
1949 else {
1950 fn.accept(t, u);
1951 dst.complete(null);
1952 }
1953 } catch (Throwable rex) {
1954 dst.completeExceptionally(rex);
1955 }
1956 }
1957 }
1958 if (r != null)
1959 postComplete();
1960 if (s != null)
1961 other.postComplete();
1962 return dst;
1963 }
1964
1965 private CompletableFuture<Void> andRunnable(CompletableFuture<?> other,
1966 Runnable action,
1967 Executor executor) {
1968 if (other == null || action == null) throw new NullPointerException();
1969 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1970 AndRunnable<T> d = null;
1971 Object r, s = null;
1972 if ((r = result) == null || (s = other.result) == null) {
1973 d = new AndRunnable<T>(this, other, action, dst, executor);
1974 CompletionNode q = null, p = new CompletionNode(d);
1975 while ((r == null && (r = result) == null) ||
1976 (s == null && (s = other.result) == null)) {
1977 if (q != null) {
1978 if (s != null ||
1979 UNSAFE.compareAndSwapObject
1980 (other, COMPLETIONS, q.next = other.completions, q))
1981 break;
1982 }
1983 else if (r != null ||
1984 UNSAFE.compareAndSwapObject
1985 (this, COMPLETIONS, p.next = completions, p)) {
1986 if (s != null)
1987 break;
1988 q = new CompletionNode(d);
1989 }
1990 }
1991 }
1992 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1993 Throwable ex = null;
1994 if ((r instanceof AltResult) &&
1995 (ex = ((AltResult)r).ex) != null)
1996 dst.completeExceptionally(new RuntimeException(ex));
1997 else if ((s instanceof AltResult) &&
1998 (ex = ((AltResult)s).ex) != null)
1999 dst.completeExceptionally(new RuntimeException(ex));
2000 else {
2001 try {
2002 if (executor != null)
2003 executor.execute(new AsyncRunnable(action, dst));
2004 else {
2005 action.run();
2006 dst.complete(null);
2007 }
2008 } catch (Throwable rex) {
2009 dst.completeExceptionally(rex);
2010 }
2011 }
2012 }
2013 if (r != null)
2014 postComplete();
2015 if (s != null)
2016 other.postComplete();
2017 return dst;
2018 }
2019
2020 private <U> CompletableFuture<U> orFunction(CompletableFuture<? extends T> other,
2021 Function<? super T, U> fn,
2022 Executor executor) {
2023 if (other == null || fn == null) throw new NullPointerException();
2024 CompletableFuture<U> dst = new CompletableFuture<U>();
2025 OrFunction<T,U> d = null;
2026 Object r;
2027 if ((r = result) == null && (r = other.result) == null) {
2028 d = new OrFunction<T,U>(this, other, fn, dst, executor);
2029 CompletionNode q = null, p = new CompletionNode(d);
2030 while ((r = result) == null && (r = other.result) == null) {
2031 if (q != null) {
2032 if (UNSAFE.compareAndSwapObject
2033 (other, COMPLETIONS, q.next = other.completions, q))
2034 break;
2035 }
2036 else if (UNSAFE.compareAndSwapObject
2037 (this, COMPLETIONS, p.next = completions, p))
2038 q = new CompletionNode(d);
2039 }
2040 }
2041 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2042 T t; Throwable ex = null;
2043 if (r instanceof AltResult) {
2044 if ((ex = ((AltResult)r).ex) != null)
2045 dst.completeExceptionally(new RuntimeException(ex));
2046 t = null;
2047 }
2048 else
2049 t = (T) r;
2050 if (ex == null) {
2051 try {
2052 if (executor != null)
2053 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
2054 else
2055 dst.complete(fn.apply(t));
2056 } catch (Throwable rex) {
2057 dst.completeExceptionally(rex);
2058 }
2059 }
2060 }
2061 if (r != null) {
2062 if (result != null)
2063 postComplete();
2064 if (other.result != null)
2065 other.postComplete();
2066 }
2067 return dst;
2068 }
2069
2070 private CompletableFuture<Void> orBlock(CompletableFuture<? extends T> other,
2071 Block<? super T> fn,
2072 Executor executor) {
2073 if (other == null || fn == null) throw new NullPointerException();
2074 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2075 OrBlock<T> d = null;
2076 Object r;
2077 if ((r = result) == null && (r = other.result) == null) {
2078 d = new OrBlock<T>(this, other, fn, dst, executor);
2079 CompletionNode q = null, p = new CompletionNode(d);
2080 while ((r = result) == null && (r = other.result) == null) {
2081 if (q != null) {
2082 if (UNSAFE.compareAndSwapObject
2083 (other, COMPLETIONS, q.next = other.completions, q))
2084 break;
2085 }
2086 else if (UNSAFE.compareAndSwapObject
2087 (this, COMPLETIONS, p.next = completions, p))
2088 q = new CompletionNode(d);
2089 }
2090 }
2091 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2092 T t; Throwable ex = null;
2093 if (r instanceof AltResult) {
2094 if ((ex = ((AltResult)r).ex) != null)
2095 dst.completeExceptionally(new RuntimeException(ex));
2096 t = null;
2097 }
2098 else
2099 t = (T) r;
2100 if (ex == null) {
2101 try {
2102 if (executor != null)
2103 executor.execute(new AsyncBlock<T>(t, fn, dst));
2104 else {
2105 fn.accept(t);
2106 dst.complete(null);
2107 }
2108 } catch (Throwable rex) {
2109 dst.completeExceptionally(rex);
2110 }
2111 }
2112 }
2113 if (r != null) {
2114 if (result != null)
2115 postComplete();
2116 if (other.result != null)
2117 other.postComplete();
2118 }
2119 return dst;
2120 }
2121
2122 private CompletableFuture<Void> orRunnable(CompletableFuture<?> other,
2123 Runnable action,
2124 Executor executor) {
2125 if (other == null || action == null) throw new NullPointerException();
2126 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2127 OrRunnable<T> d = null;
2128 Object r;
2129 if ((r = result) == null && (r = other.result) == null) {
2130 d = new OrRunnable<T>(this, other, action, dst, executor);
2131 CompletionNode q = null, p = new CompletionNode(d);
2132 while ((r = result) == null && (r = other.result) == null) {
2133 if (q != null) {
2134 if (UNSAFE.compareAndSwapObject
2135 (other, COMPLETIONS, q.next = other.completions, q))
2136 break;
2137 }
2138 else if (UNSAFE.compareAndSwapObject
2139 (this, COMPLETIONS, p.next = completions, p))
2140 q = new CompletionNode(d);
2141 }
2142 }
2143 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2144 Throwable ex = null;
2145 if ((r instanceof AltResult) &&
2146 (ex = ((AltResult)r).ex) != null)
2147 dst.completeExceptionally(new RuntimeException(ex));
2148 else {
2149 try {
2150 if (executor != null)
2151 executor.execute(new AsyncRunnable(action, dst));
2152 else {
2153 action.run();
2154 dst.complete(null);
2155 }
2156 } catch (Throwable rex) {
2157 dst.completeExceptionally(rex);
2158 }
2159 }
2160 }
2161 if (r != null) {
2162 if (result != null)
2163 postComplete();
2164 if (other.result != null)
2165 other.postComplete();
2166 }
2167 return dst;
2168 }
2169
2170 // Unsafe mechanics
2171 private static final sun.misc.Unsafe UNSAFE;
2172 private static final long RESULT;
2173 private static final long WAITERS;
2174 private static final long COMPLETIONS;
2175 static {
2176 try {
2177 UNSAFE = sun.misc.Unsafe.getUnsafe();
2178 Class<?> k = CompletableFuture.class;
2179 RESULT = UNSAFE.objectFieldOffset
2180 (k.getDeclaredField("result"));
2181 WAITERS = UNSAFE.objectFieldOffset
2182 (k.getDeclaredField("waiters"));
2183 COMPLETIONS = UNSAFE.objectFieldOffset
2184 (k.getDeclaredField("completions"));
2185 } catch (Exception e) {
2186 throw new Error(e);
2187 }
2188 }
2189
2190 }