ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.4
Committed: Thu Dec 27 20:49:19 2012 UTC (11 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.3: +2 -2 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Block;
9 import java.util.function.Supplier;
10 import java.util.function.Function;
11 import java.util.function.BiFunction;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.ForkJoinPool;
15 import java.util.concurrent.ForkJoinTask;
16 import java.util.concurrent.Executor;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.TimeoutException;
19 import java.util.concurrent.CancellationException;
20 import java.util.concurrent.atomic.AtomicInteger;
21 import java.util.concurrent.locks.LockSupport;
22
23
24 /**
25 * A {@link Future} that may be explicitly completed (setting its
26 * value and status), and may include dependent functions and actions
27 * that trigger upon its completion.
28 *
29 * <p>Similar methods are available for function-based usages in
30 * which dependent stages typically propagate values, as well as
31 * result-less action-based usages, that are normally associated with
32 * {@code CompletableFuture<Void>} Futures. Functions and actions
33 * supplied for dependent completions using {@code then}, {@code
34 * andThen}, {@code orThen}, and {@code exceptionally} may be
35 * performed by the thread that completes the current
36 * CompletableFuture, or by any other caller of these methods. There
37 * are no guarantees about the order of processing completions unless
38 * constrained by method {@code then} and related methods.
39 *
40 * <p>When two or more threads attempt to {@link #complete} or {@link
41 * #completeExceptionally} a CompletableFuture, only one of them will
42 * succeed. When completion entails computation of a function or
43 * action, it is executed <em>after</em> establishing precedence. If
44 * this function terminates abruptly with an exception, then method
45 * {@code complete} acts as {@code completeExceptionally} with that
46 * exception.
47 *
48 * <p>CompletableFutures themselves do not execute asynchronously.
49 * However, the {@code async} methods provide commonly useful ways to
50 * to commence asynchronous processing, using either a given {@link
51 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
52 * function or action that will result in the completion of a new
53 * CompletableFuture.
54 *
55 * @author Doug Lea
56 * @since 1.8
57 */
58 public class CompletableFuture<T> implements Future<T> {
59 /*
60 * Quick overview (more to come):
61 *
62 * 1. Non-nullness of field result indicates done. An AltResult is
63 * used to box null as a result, as well as to hold exceptions.
64 *
65 * 2. Waiters are held in a Treiber stack similar to the one used
66 * in FutureTask
67 *
68 * 3. Completions are also kept in a list/stack, and pulled off
69 * and run when completion is triggered.
70 */
71
72 static final class AltResult {
73 final Throwable ex; // null only for NIL
74 AltResult(Throwable ex) { this.ex = ex; }
75 }
76
77 static final AltResult NIL = new AltResult(null);
78
79 /**
80 * Simple linked list nodes to record waiting threads in a Treiber
81 * stack. See other classes such as Phaser and SynchronousQueue
82 * for more detailed explanation.
83 */
84 static final class WaitNode {
85 volatile Thread thread;
86 volatile WaitNode next;
87 }
88
89 /**
90 * Simple linked list nodes to record completions, used in
91 * basically the same way as WaitNodes
92 */
93 static final class CompletionNode {
94 final Completion completion;
95 volatile CompletionNode next;
96 CompletionNode(Completion completion) { this.completion = completion; }
97 }
98
99
100 volatile Object result; // either the result or boxed AltResult
101 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
102 volatile CompletionNode completions; // list (Treiber stack) of completions
103
104 /**
105 * Creates a new incomplete CompletableFuture.
106 */
107 public CompletableFuture() {
108 }
109
110 /**
111 * Asynchronously executes in the {@link
112 * ForkJoinPool#commonPool()}, a task that completes the returned
113 * CompletableFuture with the result of the given Supplier.
114 *
115 * @param supplier a function returning the value to be used
116 * to complete the returned CompletableFuture.
117 * @return the CompletableFuture.
118 */
119 public static <U> CompletableFuture<U> async(Supplier<U> supplier) {
120 if (supplier == null) throw new NullPointerException();
121 CompletableFuture<U> f = new CompletableFuture<U>();
122 ForkJoinPool.commonPool().
123 execute((ForkJoinTask<?>)new AsyncSupplier(supplier, f));
124 return f;
125 }
126
127 /**
128 * Asynchronously executes using the given executor, a task that
129 * completes the returned CompletableFuture with the result of the
130 * given Supplier.
131 *
132 * @param supplier a function returning the value to be used
133 * to complete the returned CompletableFuture.
134 * @param executor the executor to use for asynchronous execution
135 * @return the CompletableFuture.
136 */
137 public static <U> CompletableFuture<U> async(Supplier<U> supplier,
138 Executor executor) {
139 if (executor == null || supplier == null)
140 throw new NullPointerException();
141 CompletableFuture<U> f = new CompletableFuture<U>();
142 executor.execute(new AsyncSupplier(supplier, f));
143 return f;
144 }
145
146 /**
147 * Asynchronously executes in the {@link
148 * ForkJoinPool#commonPool()} a task that runs the given action,
149 * and then completes the returned CompletableFuture
150 *
151 * @param runnable the action to run before completing the
152 * returned CompletableFuture.
153 * @return the CompletableFuture.
154 */
155 public static CompletableFuture<Void> async(Runnable runnable) {
156 if (runnable == null) throw new NullPointerException();
157 CompletableFuture<Void> f = new CompletableFuture<Void>();
158 ForkJoinPool.commonPool().
159 execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
160 return f;
161 }
162
163 /**
164 * Asynchronously executes using the given executor, a task that
165 * runs the given action, and then completes the returned
166 * CompletableFuture
167 *
168 * @param runnable the action to run before completing the
169 * returned CompletableFuture.
170 * @param executor the executor to use for asynchronous execution
171 * @return the CompletableFuture.
172 */
173 public static CompletableFuture<Void> async(Runnable runnable,
174 Executor executor) {
175 if (executor == null || runnable == null)
176 throw new NullPointerException();
177 CompletableFuture<Void> f = new CompletableFuture<Void>();
178 executor.execute(new AsyncRunnable(runnable, f));
179 return f;
180 }
181
182 /**
183 * Returns {@code true} if completed in any fashion: normally,
184 * exceptionally, or cancellation.
185 *
186 * @return {@code true} if completed
187 */
188 public boolean isDone() {
189 return result != null;
190 }
191
192 /**
193 * Returns the result value when complete, or throws an
194 * (unchecked) exception if completed exceptionally. To better
195 * conform with the use of common functional forms, this method
196 * transforms any checked exception possible with {@link
197 * Future#get} into an (unchecked) {@link RuntimeException} with
198 * the underlying exception as its cause. (The checked exception
199 * convention is available using the timed form of get.)
200 *
201 * @return the result value
202 */
203 public T get() {
204 Object r; Throwable ex;
205 if ((r = result) == null)
206 return waitingGet();
207 if (r instanceof AltResult) {
208 if ((ex = ((AltResult)r).ex) != null)
209 rethrow(ex);
210 return null;
211 }
212 return (T)r;
213 }
214
215 /**
216 * Returns the result value (or throws any encountered exception)
217 * if completed, else returns the given valueIfAbsent.
218 *
219 * @param valueIfAbsent the value to return if not completed
220 * @return the result value, if completed, else the given valueIfAbsent
221 */
222 public T getNow(T valueIfAbsent) {
223 Object r; Throwable ex;
224 if ((r = result) == null)
225 return valueIfAbsent;
226 if (r instanceof AltResult) {
227 if ((ex = ((AltResult)r).ex) != null)
228 rethrow(ex);
229 return null;
230 }
231 return (T)r;
232 }
233
234 /**
235 * Waits if necessary for at most the given time for completion,
236 * and then retrieves its result, if available.
237 *
238 * @param timeout the maximum time to wait
239 * @param unit the time unit of the timeout argument
240 * @return the computed result
241 * @throws CancellationException if the computation was cancelled
242 * @throws ExecutionException if the computation threw an
243 * exception
244 * @throws InterruptedException if the current thread was interrupted
245 * while waiting
246 * @throws TimeoutException if the wait timed out
247 */
248 public T get(long timeout, TimeUnit unit)
249 throws InterruptedException, ExecutionException, TimeoutException {
250 Object r; Throwable ex;
251 long nanos = unit.toNanos(timeout);
252 if (Thread.interrupted())
253 throw new InterruptedException();
254 if ((r = result) == null)
255 r = timedAwaitDone(nanos);
256 if (r instanceof AltResult) {
257 if ((ex = ((AltResult)r).ex) != null)
258 throw new ExecutionException(ex);
259 return null;
260 }
261 return (T)r;
262 }
263
264 /**
265 * If not already completed, sets the value returned by {@link
266 * #get()} and related methods to the given value.
267 *
268 * @param value the result value
269 * @return true if this invocation caused this CompletableFuture
270 * to transition to a completed state, else false.
271 */
272 public boolean complete(T value) {
273 if (result == null &&
274 UNSAFE.compareAndSwapObject(this, RESULT, null,
275 (value == null) ? NIL : value)) {
276 postComplete();
277 return true;
278 }
279 return false;
280 }
281
282 /**
283 * If not already completed, causes invocations of {@link #get()}
284 * and related methods to throw the given exception.
285 *
286 * @param ex the exception
287 * @return true if this invocation caused this CompletableFuture
288 * to transition to a completed state, else false.
289 */
290 public boolean completeExceptionally(Throwable ex) {
291 if (ex == null) throw new NullPointerException();
292 if (result == null) {
293 Object r = new AltResult(ex);
294 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
295 postComplete();
296 return true;
297 }
298 }
299 return false;
300 }
301
302 /**
303 * Creates and returns a CompletableFuture that is completed with
304 * the result of the given function of this CompletableFuture.
305 * If this CompletableFuture completes exceptionally,
306 * then the returned CompletableFuture also does so,
307 * with a RuntimeException having this exception as
308 * its cause.
309 *
310 * @param fn the function to use to compute the value of
311 * the returned CompletableFuture
312 * @return the new CompletableFuture
313 */
314 public <U> CompletableFuture<U> then(Function<? super T,? extends U> fn) {
315 return thenFunction(fn, null);
316 }
317
318 /**
319 * Creates and returns a CompletableFuture that is asynchronously
320 * completed using the {@link ForkJoinPool#commonPool()} with the
321 * result of the given function of this CompletableFuture. If
322 * this CompletableFuture completes exceptionally, then the
323 * returned CompletableFuture also does so, with a
324 * RuntimeException having this exception as its cause.
325 *
326 * @param fn the function to use to compute the value of
327 * the returned CompletableFuture
328 * @return the new CompletableFuture
329 */
330 public <U> CompletableFuture<U> thenAsync(Function<? super T,? extends U> fn) {
331 return thenFunction(fn, ForkJoinPool.commonPool());
332 }
333
334 /**
335 * Creates and returns a CompletableFuture that is asynchronously
336 * completed using the given executor with the result of the given
337 * function of this CompletableFuture. If this CompletableFuture
338 * completes exceptionally, then the returned CompletableFuture
339 * also does so, with a RuntimeException having this exception as
340 * its cause.
341 *
342 * @param fn the function to use to compute the value of
343 * the returned CompletableFuture
344 * @param executor the executor to use for asynchronous execution
345 * @return the new CompletableFuture
346 */
347 public <U> CompletableFuture<U> thenAsync(Function<? super T,? extends U> fn,
348 Executor executor) {
349 if (executor == null) throw new NullPointerException();
350 return thenFunction(fn, executor);
351 }
352
353 /**
354 * Creates and returns a CompletableFuture that is completed after
355 * performing the given action if/when this CompletableFuture
356 * completes. If this CompletableFuture completes exceptionally,
357 * then the returned CompletableFuture also does so, with a
358 * RuntimeException having this exception as its cause.
359 *
360 * @param action the action to perform before completing the
361 * returned CompletableFuture
362 * @return the new CompletableFuture
363 */
364 public CompletableFuture<Void> then(Runnable action) {
365 return thenRunnable(action, null);
366 }
367
368 /**
369 * Creates and returns a CompletableFuture that is asynchronously
370 * completed using the {@link ForkJoinPool#commonPool()} after
371 * performing the given action if/when this CompletableFuture
372 * completes. If this CompletableFuture completes exceptionally,
373 * then the returned CompletableFuture also does so, with a
374 * RuntimeException having this exception as its cause.
375 *
376 * @param action the action to perform before completing the
377 * returned CompletableFuture
378 * @return the new CompletableFuture
379 */
380 public CompletableFuture<Void> thenAsync(Runnable action) {
381 return thenRunnable(action, ForkJoinPool.commonPool());
382 }
383
384 /**
385 * Creates and returns a CompletableFuture that is asynchronously
386 * completed using the given executor after performing the given
387 * action if/when this CompletableFuture completes. If this
388 * CompletableFuture completes exceptionally, then the returned
389 * CompletableFuture also does so, with a RuntimeException having
390 * this exception as its cause.
391 *
392 * @param action the action to perform before completing the
393 * returned CompletableFuture
394 * @param executor the executor to use for asynchronous execution
395 * @return the new CompletableFuture
396 */
397 public CompletableFuture<Void> thenAsync(Runnable action, Executor executor) {
398 if (executor == null) throw new NullPointerException();
399 return thenRunnable(action, executor);
400 }
401
402 /**
403 * Creates and returns a CompletableFuture that is completed with
404 * the result of the given function of this and the other given
405 * CompletableFuture's results if/when both complete. If this or
406 * the other CompletableFuture complete exceptionally, then the
407 * returned CompletableFuture also does so, with a
408 * RuntimeException having the exception as its cause.
409 *
410 * @param other the other CompletableFuture
411 * @param fn the function to use to compute the value of
412 * the returned CompletableFuture
413 * @return the new CompletableFuture
414 */
415 public <U,V> CompletableFuture<V> andThen(CompletableFuture<? extends U> other,
416 BiFunction<? super T,? super U,? extends V> fn) {
417 return andFunction(other, fn, null);
418 }
419
420 /**
421 * Creates and returns a CompletableFuture that is asynchronously
422 * completed using the {@link ForkJoinPool#commonPool()} with
423 * the result of the given function of this and the other given
424 * CompletableFuture's results if/when both complete. If this or
425 * the other CompletableFuture complete exceptionally, then the
426 * returned CompletableFuture also does so, with a
427 * RuntimeException having the exception as its cause.
428 *
429 * @param other the other CompletableFuture
430 * @param fn the function to use to compute the value of
431 * the returned CompletableFuture
432 * @return the new CompletableFuture
433 */
434 public <U,V> CompletableFuture<V> andThenAsync(CompletableFuture<? extends U> other,
435 BiFunction<? super T,? super U,? extends V> fn) {
436 return andFunction(other, fn, ForkJoinPool.commonPool());
437 }
438
439 /**
440 * Creates and returns a CompletableFuture that is
441 * asynchronously completed using the given executor with the
442 * result of the given function of this and the other given
443 * CompletableFuture's results if/when both complete. If this or
444 * the other CompletableFuture complete exceptionally, then the
445 * returned CompletableFuture also does so, with a
446 * RuntimeException having the exception as its cause.
447 *
448 * @param other the other CompletableFuture
449 * @param fn the function to use to compute the value of
450 * the returned CompletableFuture
451 * @param executor the executor to use for asynchronous execution
452 * @return the new CompletableFuture
453 */
454
455 public <U,V> CompletableFuture<V> andThenAsync(CompletableFuture<? extends U> other,
456 BiFunction<? super T,? super U,? extends V> fn,
457 Executor executor) {
458 if (executor == null) throw new NullPointerException();
459 return andFunction(other, fn, executor);
460 }
461
462 /**
463 * Creates and returns a CompletableFuture that is completed
464 * if/when this and the other given CompletableFuture both
465 * complete. If this and/or the other CompletableFuture complete
466 * exceptionally, then the returned CompletableFuture also does
467 * so, with a RuntimeException having the one of the exceptions as
468 * its cause.
469 *
470 * @param other the other CompletableFuture
471 * @param action the action to perform before completing the
472 * returned CompletableFuture
473 * @return the new CompletableFuture
474 */
475 public CompletableFuture<Void> andThen(CompletableFuture<?> other,
476 Runnable action) {
477 return andRunnable(other, action, null);
478 }
479
480 /**
481 * Creates and returns a CompletableFuture that is completed
482 * asynchronously using the {@link ForkJoinPool#commonPool()}
483 * if/when this and the other given CompletableFuture both
484 * complete. If this and/or the other CompletableFuture complete
485 * exceptionally, then the returned CompletableFuture also does
486 * so, with a RuntimeException having the one of the exceptions as
487 * its cause.
488 *
489 * @param other the other CompletableFuture
490 * @param action the action to perform before completing the
491 * returned CompletableFuture
492 * @return the new CompletableFuture
493 */
494 public CompletableFuture<Void> andThenAsync(CompletableFuture<?> other,
495 Runnable action) {
496 return andRunnable(other, action, ForkJoinPool.commonPool());
497 }
498
499 /**
500 * Creates and returns a CompletableFuture that is completed
501 * asynchronously using the given executor
502 * if/when this and the other given CompletableFuture both
503 * complete. If this and/or the other CompletableFuture complete
504 * exceptionally, then the returned CompletableFuture also does
505 * so, with a RuntimeException having the one of the exceptions as
506 * its cause.
507 *
508 * @param other the other CompletableFuture
509 * @param action the action to perform before completing the
510 * returned CompletableFuture
511 * @param executor the executor to use for asynchronous execution
512 * @return the new CompletableFuture
513 */
514 public CompletableFuture<Void> andThenAsync(CompletableFuture<?> other,
515 Runnable action,
516 Executor executor) {
517 if (executor == null) throw new NullPointerException();
518 return andRunnable(other, action, executor);
519 }
520
521 /**
522 * Creates and returns a CompletableFuture that is completed with
523 * the result of the given function of either this or the other
524 * given CompletableFuture's results if/when either complete. If
525 * this and/or the other CompletableFuture complete exceptionally,
526 * then the returned CompletableFuture may also do so, with a
527 * RuntimeException having one of these exceptions as its cause.
528 * No guarantees are made about which result or exception is used
529 * in the returned CompletableFuture.
530 *
531 * @param other the other CompletableFuture
532 * @param fn the function to use to compute the value of
533 * the returned CompletableFuture
534 * @return the new CompletableFuture
535 */
536 public <U> CompletableFuture<U> orThen(CompletableFuture<? extends T> other,
537 Function<? super T, U> fn) {
538 return orFunction(other, fn, null);
539 }
540
541 /**
542 * Creates and returns a CompletableFuture that is completed
543 * asynchronously using the {@link ForkJoinPool#commonPool()} with
544 * the result of the given function of either this or the other
545 * given CompletableFuture's results if/when either complete. If
546 * this and/or the other CompletableFuture complete exceptionally,
547 * then the returned CompletableFuture may also do so, with a
548 * RuntimeException having one of these exceptions as its cause.
549 * No guarantees are made about which result or exception is used
550 * in the returned CompletableFuture.
551 *
552 * @param other the other CompletableFuture
553 * @param fn the function to use to compute the value of
554 * the returned CompletableFuture
555 * @return the new CompletableFuture
556 */
557 public <U> CompletableFuture<U> orThenAsync(CompletableFuture<? extends T> other,
558 Function<? super T, U> fn) {
559 return orFunction(other, fn, ForkJoinPool.commonPool());
560 }
561
562 /**
563 * Creates and returns a CompletableFuture that is completed
564 * asynchronously using the given executor with the result of the
565 * given function of either this or the other given
566 * CompletableFuture's results if/when either complete. If this
567 * and/or the other CompletableFuture complete exceptionally, then
568 * the returned CompletableFuture may also do so, with a
569 * RuntimeException having one of these exceptions as its cause.
570 * No guarantees are made about which result or exception is used
571 * in the returned CompletableFuture.
572 *
573 * @param other the other CompletableFuture
574 * @param fn the function to use to compute the value of
575 * the returned CompletableFuture
576 * @param executor the executor to use for asynchronous execution
577 * @return the new CompletableFuture
578 */
579 public <U> CompletableFuture<U> orThen(CompletableFuture<? extends T> other,
580 Function<? super T, U> fn,
581 Executor executor) {
582 if (executor == null) throw new NullPointerException();
583 return orFunction(other, fn, executor);
584 }
585
586 /**
587 * Creates and returns a CompletableFuture that is completed
588 * after this or the other given CompletableFuture complete. If
589 * this and/or the other CompletableFuture complete exceptionally,
590 * then the returned CompletableFuture may also do so, with a
591 * RuntimeException having one of these exceptions as its cause.
592 * No guarantees are made about which exception is used in the
593 * returned CompletableFuture.
594 *
595 * @param other the other CompletableFuture
596 * @param action the action to perform before completing the
597 * returned CompletableFuture
598 * @return the new CompletableFuture
599 */
600 public CompletableFuture<Void> orThen(CompletableFuture<?> other,
601 Runnable action) {
602 return orRunnable(other, action, null);
603 }
604
605 /**
606 * Creates and returns a CompletableFuture that is completed
607 * asynchronously using the {@link ForkJoinPool#commonPool()}
608 * after this or the other given CompletableFuture complete. If
609 * this and/or the other CompletableFuture complete exceptionally,
610 * then the returned CompletableFuture may also do so, with a
611 * RuntimeException having one of these exceptions as its cause.
612 * No guarantees are made about which exception is used in the
613 * returned CompletableFuture.
614 *
615 * @param other the other CompletableFuture
616 * @param action the action to perform before completing the
617 * returned CompletableFuture
618 * @return the new CompletableFuture
619 */
620 public CompletableFuture<Void> orThenAsync(CompletableFuture<?> other,
621 Runnable action) {
622 return orRunnable(other, action, ForkJoinPool.commonPool());
623 }
624
625 /**
626 * Creates and returns a CompletableFuture that is completed
627 * asynchronously using the given executor after this or the other
628 * given CompletableFuture complete. If this and/or the other
629 * CompletableFuture complete exceptionally, then the returned
630 * CompletableFuture may also do so, with a RuntimeException
631 * having one of these exceptions as its cause. No guarantees are
632 * made about which exception is used in the returned
633 * CompletableFuture.
634 *
635 * @param other the other CompletableFuture
636 * @param action the action to perform before completing the
637 * returned CompletableFuture
638 * @param executor the executor to use for asynchronous execution
639 * @return the new CompletableFuture
640 */
641 public CompletableFuture<Void> orThenAsync(CompletableFuture<?> other,
642 Runnable action,
643 Executor executor) {
644 if (executor == null) throw new NullPointerException();
645 return orRunnable(other, action, executor);
646 }
647
648 /**
649 * Creates and returns a CompletableFuture that is completed after
650 * performing the given action with the exception triggering this
651 * CompletableFuture's completion if/when it completes
652 * exceptionally.
653 *
654 * @param action the action to perform before completing the
655 * returned CompletableFuture
656 * @return the new CompletableFuture
657 */
658 public CompletableFuture<Void> exceptionally(Block<Throwable> action) {
659 if (action == null) throw new NullPointerException();
660 CompletableFuture<Void> dst = new CompletableFuture<Void>();
661 ExceptionAction<T> d = null;
662 Object r; Throwable ex;
663 if ((r = result) == null) {
664 CompletionNode p =
665 new CompletionNode(d = new ExceptionAction<T>(this, action, dst));
666 while ((r = result) == null) {
667 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
668 p.next = completions, p))
669 break;
670 }
671 }
672 if (r != null && (d == null || d.compareAndSet(0, 1)) &&
673 (r instanceof AltResult) && (ex = ((AltResult)r).ex) != null) {
674 try {
675 action.accept(ex);
676 dst.complete(null);
677 } catch (Throwable rex) {
678 dst.completeExceptionally(rex);
679 }
680 }
681 if (r != null)
682 postComplete();
683 return dst;
684 }
685
686 /**
687 * Attempts to complete this CompletableFuture with
688 * a {@link CancellationException}.
689 *
690 * @param mayInterruptIfRunning this value has no effect in this
691 * implementation because interrupts are not used to control
692 * processing.
693 *
694 * @return {@code true} if this task is now cancelled
695 */
696 public boolean cancel(boolean mayInterruptIfRunning) {
697 Object r;
698 while ((r = result) == null) {
699 r = new AltResult(new CancellationException());
700 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
701 postComplete();
702 return true;
703 }
704 }
705 return ((r instanceof AltResult) &&
706 (((AltResult)r).ex instanceof CancellationException));
707 }
708
709 /**
710 * Returns {@code true} if this CompletableFuture was cancelled
711 * before it completed normally.
712 *
713 * @return {@code true} if this CompletableFuture was cancelled
714 * before it completed normally
715 */
716 public boolean isCancelled() {
717 Object r;
718 return ((r = result) != null &&
719 (r instanceof AltResult) &&
720 (((AltResult)r).ex instanceof CancellationException));
721 }
722
723 /**
724 * Whether or not already completed, sets the value subsequently
725 * returned by method get() and related methods to the given
726 * value. This method is designed for use in error recovery
727 * actions, and is very unlikely to be useful otherwise.
728 *
729 * @param value the completion value
730 */
731 public void force(T value) {
732 result = (value == null) ? NIL : value;
733 postComplete();
734 }
735
736 /**
737 * Removes and signals all waiting threads and runs all completions
738 */
739 private void postComplete() {
740 WaitNode q; Thread t;
741 while ((q = waiters) != null) {
742 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
743 (t = q.thread) != null) {
744 q.thread = null;
745 LockSupport.unpark(t);
746 }
747 }
748
749 CompletionNode h; Completion c;
750 while ((h = completions) != null) {
751 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
752 (c = h.completion) != null)
753 c.run();
754 }
755 }
756
757 /* ------------- waiting for completions -------------- */
758
759 /**
760 * Heuristic spin value for waitingGet() before blocking on
761 * multiprocessors
762 */
763 static final int WAITING_GET_SPINS = 256;
764
765 /**
766 * Returns result after waiting.
767 */
768 private T waitingGet() {
769 WaitNode q = null;
770 boolean queued = false, interrupted = false;
771 int h = 0, spins = 0;
772 for (Object r;;) {
773 if ((r = result) != null) {
774 if (q != null) // suppress unpark
775 q.thread = null;
776 postComplete(); // help release others
777 if (interrupted)
778 Thread.currentThread().interrupt();
779 if (r instanceof AltResult) {
780 if (r != NIL)
781 rethrow(((AltResult)r).ex);
782 return null;
783 }
784 return (T)r;
785 }
786 else if (h == 0) {
787 h = ThreadLocalRandom.current().nextInt();
788 if (Runtime.getRuntime().availableProcessors() > 1)
789 spins = WAITING_GET_SPINS;
790 }
791 else if (spins > 0) {
792 h ^= h << 1; // xorshift
793 h ^= h >>> 3;
794 if ((h ^= h << 10) >= 0)
795 --spins;
796 }
797 else if (q == null)
798 q = new WaitNode();
799 else if (!queued)
800 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
801 q.next = waiters, q);
802 else if (Thread.interrupted())
803 interrupted = true;
804 else if (q.thread == null)
805 q.thread = Thread.currentThread();
806 else
807 LockSupport.park(this);
808 }
809 }
810
811 /**
812 * Awaits completion or aborts on interrupt or timeout.
813 *
814 * @param nanos time to wait
815 * @return raw result
816 */
817 private Object timedAwaitDone(long nanos)
818 throws InterruptedException, TimeoutException {
819 final long deadline = System.nanoTime() + nanos;
820 WaitNode q = null;
821 boolean queued = false;
822 for (Object r;;) {
823 if (Thread.interrupted()) {
824 removeWaiter(q);
825 throw new InterruptedException();
826 }
827 else if ((r = result) != null) {
828 if (q != null)
829 q.thread = null;
830 postComplete();
831 return r;
832 }
833 else if (q == null)
834 q = new WaitNode();
835 else if (!queued)
836 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
837 q.next = waiters, q);
838 else if ((nanos = deadline - System.nanoTime()) <= 0L) {
839 removeWaiter(q);
840 throw new TimeoutException();
841 }
842 else if (q.thread == null)
843 q.thread = Thread.currentThread();
844 else
845 LockSupport.parkNanos(this, nanos);
846 }
847 }
848
849 /**
850 * Tries to unlink a timed-out or interrupted wait node to avoid
851 * accumulating garbage. Internal nodes are simply unspliced
852 * without CAS since it is harmless if they are traversed anyway
853 * by releasers. To avoid effects of unsplicing from already
854 * removed nodes, the list is retraversed in case of an apparent
855 * race. This is slow when there are a lot of nodes, but we don't
856 * expect lists to be long enough to outweigh higher-overhead
857 * schemes.
858 */
859 private void removeWaiter(WaitNode node) {
860 if (node != null) {
861 node.thread = null;
862 retry:
863 for (;;) { // restart on removeWaiter race
864 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
865 s = q.next;
866 if (q.thread != null)
867 pred = q;
868 else if (pred != null) {
869 pred.next = s;
870 if (pred.thread == null) // check for race
871 continue retry;
872 }
873 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
874 continue retry;
875 }
876 break;
877 }
878 }
879 }
880
881 /* ------------- Async tasks -------------- */
882
883 /** Base class can act as either FJ or plain Runnable */
884 static abstract class Async extends ForkJoinTask<Void> implements Runnable {
885 public final Void getRawResult() { return null; }
886 public final void setRawResult(Void v) { }
887 public final void run() { exec(); }
888 }
889
890 static final class AsyncRunnable extends Async {
891 final Runnable runnable;
892 final CompletableFuture<Void> dst;
893 AsyncRunnable(Runnable runnable, CompletableFuture<Void> dst) {
894 this.runnable = runnable; this.dst = dst;
895 }
896 public final boolean exec() {
897 Runnable fn;
898 CompletableFuture<Void> d;
899 if ((fn = this.runnable) == null || (d = this.dst) == null)
900 throw new NullPointerException();
901 try {
902 fn.run();
903 d.complete(null);
904 } catch (Throwable ex) {
905 d.completeExceptionally(ex);
906 }
907 return true;
908 }
909 private static final long serialVersionUID = 5232453952276885070L;
910 }
911
912 static final class AsyncSupplier<U> extends Async {
913 final Supplier<U> supplier;
914 final CompletableFuture<U> dst;
915 AsyncSupplier(Supplier<U> supplier, CompletableFuture<U> dst) {
916 this.supplier = supplier; this.dst = dst;
917 }
918 public final boolean exec() {
919 Supplier<U> fn;
920 CompletableFuture<U> d;
921 if ((fn = this.supplier) == null || (d = this.dst) == null)
922 throw new NullPointerException();
923 try {
924 d.complete(fn.get());
925 } catch (Throwable ex) {
926 d.completeExceptionally(ex);
927 }
928 return true;
929 }
930 private static final long serialVersionUID = 5232453952276885070L;
931 }
932
933 static final class AsyncFunction<T,U> extends Async {
934 Function<? super T,? extends U> fn;
935 T arg;
936 final CompletableFuture<U> dst;
937 AsyncFunction(T arg, Function<? super T,? extends U> fn,
938 CompletableFuture<U> dst) {
939 this.arg = arg; this.fn = fn; this.dst = dst;
940 }
941 public final boolean exec() {
942 Function<? super T,? extends U> fn;
943 CompletableFuture<U> d;
944 if ((fn = this.fn) == null || (d = this.dst) == null)
945 throw new NullPointerException();
946 try {
947 d.complete(fn.apply(arg));
948 } catch (Throwable ex) {
949 d.completeExceptionally(ex);
950 }
951 return true;
952 }
953 private static final long serialVersionUID = 5232453952276885070L;
954 }
955
956 static final class AsyncBiFunction<T,U,V> extends Async {
957 final BiFunction<? super T,? super U,? extends V> fn;
958 final T arg1;
959 final U arg2;
960 final CompletableFuture<V> dst;
961 AsyncBiFunction(T arg1, U arg2,
962 BiFunction<? super T,? super U,? extends V> fn,
963 CompletableFuture<V> dst) {
964 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
965 }
966 public final boolean exec() {
967 BiFunction<? super T,? super U,? extends V> fn;
968 CompletableFuture<V> d;
969 if ((fn = this.fn) == null || (d = this.dst) == null)
970 throw new NullPointerException();
971 try {
972 d.complete(fn.apply(arg1, arg2));
973 } catch (Throwable ex) {
974 d.completeExceptionally(ex);
975 }
976 return true;
977 }
978 private static final long serialVersionUID = 5232453952276885070L;
979 }
980
981 /* ------------- Completions -------------- */
982
983 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
984 static abstract class Completion extends AtomicInteger implements Runnable {
985 }
986
987 static final class ThenFunction<T,U> extends Completion {
988 final CompletableFuture<? extends T> src;
989 final Function<? super T,? extends U> fn;
990 final CompletableFuture<U> dst;
991 final Executor executor;
992 ThenFunction(CompletableFuture<? extends T> src,
993 final Function<? super T,? extends U> fn,
994 final CompletableFuture<U> dst, Executor executor) {
995 this.src = src; this.fn = fn; this.dst = dst;
996 this.executor = executor;
997 }
998 public void run() {
999 CompletableFuture<? extends T> a;
1000 Function<? super T,? extends U> fn;
1001 CompletableFuture<U> dst;
1002 Object r; T t; Throwable ex;
1003 if ((dst = this.dst) != null &&
1004 (fn = this.fn) != null &&
1005 (a = this.src) != null &&
1006 (r = a.result) != null &&
1007 compareAndSet(0, 1)) {
1008 if (r instanceof AltResult) {
1009 if ((ex = ((AltResult)r).ex) != null) {
1010 dst.completeExceptionally(new RuntimeException(ex));
1011 return;
1012 }
1013 t = null;
1014 }
1015 else
1016 t = (T) r;
1017 try {
1018 if (executor != null)
1019 executor.execute(new AsyncFunction(t, fn, dst));
1020 else
1021 dst.complete(fn.apply(t));
1022 } catch (Throwable rex) {
1023 dst.completeExceptionally(rex);
1024 }
1025 }
1026 }
1027 }
1028
1029 static final class ThenRunnable<T> extends Completion {
1030 final CompletableFuture<? extends T> src;
1031 final Runnable fn;
1032 final CompletableFuture<Void> dst;
1033 final Executor executor;
1034 ThenRunnable(CompletableFuture<? extends T> src,
1035 Runnable fn,
1036 CompletableFuture<Void> dst,
1037 Executor executor) {
1038 this.src = src; this.fn = fn; this.dst = dst;
1039 this.executor = executor;
1040 }
1041 public void run() {
1042 CompletableFuture<? extends T> a;
1043 Runnable fn;
1044 CompletableFuture<Void> dst;
1045 Object r; Throwable ex;
1046 if ((dst = this.dst) != null &&
1047 (fn = this.fn) != null &&
1048 (a = this.src) != null &&
1049 (r = a.result) != null &&
1050 compareAndSet(0, 1)) {
1051 if (r instanceof AltResult) {
1052 if ((ex = ((AltResult)r).ex) != null) {
1053 dst.completeExceptionally(new RuntimeException(ex));
1054 return;
1055 }
1056 }
1057 try {
1058 if (executor != null)
1059 executor.execute(new AsyncRunnable(fn, dst));
1060 else {
1061 fn.run();
1062 dst.complete(null);
1063 }
1064 } catch (Throwable rex) {
1065 dst.completeExceptionally(rex);
1066 }
1067 }
1068 }
1069 }
1070
1071 static final class AndFunction<T,U,V> extends Completion {
1072 final CompletableFuture<? extends T> src;
1073 final CompletableFuture<? extends U> snd;
1074 final BiFunction<? super T,? super U,? extends V> fn;
1075 final CompletableFuture<V> dst;
1076 final Executor executor;
1077 AndFunction(CompletableFuture<? extends T> src,
1078 CompletableFuture<? extends U> snd,
1079 BiFunction<? super T,? super U,? extends V> fn,
1080 CompletableFuture<V> dst, Executor executor) {
1081 this.src = src; this.snd = snd;
1082 this.fn = fn; this.dst = dst;
1083 this.executor = executor;
1084 }
1085 public void run() {
1086 Object r, s; T t; U u; Throwable ex;
1087 CompletableFuture<? extends T> a;
1088 CompletableFuture<? extends U> b;
1089 BiFunction<? super T,? super U,? extends V> fn;
1090 CompletableFuture<V> dst;
1091 if ((dst = this.dst) != null &&
1092 (fn = this.fn) != null &&
1093 (a = this.src) != null &&
1094 (r = a.result) != null &&
1095 (b = this.snd) != null &&
1096 (s = b.result) != null &&
1097 compareAndSet(0, 1)) {
1098 if (r instanceof AltResult) {
1099 if ((ex = ((AltResult)r).ex) != null) {
1100 dst.completeExceptionally(new RuntimeException(ex));
1101 return;
1102 }
1103 t = null;
1104 }
1105 else
1106 t = (T) r;
1107 if (s instanceof AltResult) {
1108 if ((ex = ((AltResult)s).ex) != null) {
1109 dst.completeExceptionally(new RuntimeException(ex));
1110 return;
1111 }
1112 u = null;
1113 }
1114 else
1115 u = (U) s;
1116 try {
1117 if (executor != null)
1118 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1119 else
1120 dst.complete(fn.apply(t, u));
1121 } catch (Throwable rex) {
1122 dst.completeExceptionally(rex);
1123 }
1124 }
1125 }
1126 }
1127
1128 static final class AndRunnable<T> extends Completion {
1129 final CompletableFuture<? extends T> src;
1130 final CompletableFuture<?> snd;
1131 final Runnable fn;
1132 final CompletableFuture<Void> dst;
1133 final Executor executor;
1134 AndRunnable(CompletableFuture<? extends T> src,
1135 CompletableFuture<?> snd,
1136 Runnable fn,
1137 CompletableFuture<Void> dst, Executor executor) {
1138 this.src = src; this.snd = snd;
1139 this.fn = fn; this.dst = dst;
1140 this.executor = executor;
1141 }
1142 public void run() {
1143 Object r, s; Throwable ex;
1144 final CompletableFuture<? extends T> a;
1145 final CompletableFuture<?> b;
1146 final Runnable fn;
1147 final CompletableFuture<Void> dst;
1148 if ((dst = this.dst) != null &&
1149 (fn = this.fn) != null &&
1150 (a = this.src) != null &&
1151 (r = a.result) != null &&
1152 (b = this.snd) != null &&
1153 (s = b.result) != null &&
1154 compareAndSet(0, 1)) {
1155 if (r instanceof AltResult) {
1156 if ((ex = ((AltResult)r).ex) != null) {
1157 dst.completeExceptionally(new RuntimeException(ex));
1158 return;
1159 }
1160 }
1161 if (s instanceof AltResult) {
1162 if ((ex = ((AltResult)s).ex) != null) {
1163 dst.completeExceptionally(new RuntimeException(ex));
1164 return;
1165 }
1166 }
1167 try {
1168 if (executor != null)
1169 executor.execute(new AsyncRunnable(fn, dst));
1170 else {
1171 fn.run();
1172 dst.complete(null);
1173 }
1174 } catch (Throwable rex) {
1175 dst.completeExceptionally(rex);
1176 }
1177 }
1178 }
1179 }
1180
1181 static final class OrFunction<T,U> extends Completion {
1182 final CompletableFuture<? extends T> src;
1183 final CompletableFuture<? extends T> snd;
1184 final Function<? super T,? extends U> fn;
1185 final CompletableFuture<U> dst;
1186 final Executor executor;
1187 OrFunction(CompletableFuture<? extends T> src,
1188 CompletableFuture<? extends T> snd,
1189 Function<? super T,? extends U> fn,
1190 CompletableFuture<U> dst, Executor executor) {
1191 this.src = src; this.snd = snd;
1192 this.fn = fn; this.dst = dst;
1193 this.executor = executor;
1194 }
1195 public void run() {
1196 Object r; T t; Throwable ex;
1197 CompletableFuture<? extends T> a;
1198 CompletableFuture<? extends T> b;
1199 Function<? super T,? extends U> fn;
1200 CompletableFuture<U> dst;
1201 if ((dst = this.dst) != null &&
1202 (fn = this.fn) != null &&
1203 (((a = this.src) != null && (r = a.result) != null) ||
1204 ((b = this.snd) != null && (r = b.result) != null)) &&
1205 compareAndSet(0, 1)) {
1206 if (r instanceof AltResult) {
1207 if ((ex = ((AltResult)r).ex) != null) {
1208 dst.completeExceptionally(new RuntimeException(ex));
1209 return;
1210 }
1211 t = null;
1212 }
1213 else
1214 t = (T) r;
1215 try {
1216 if (executor != null)
1217 executor.execute(new AsyncFunction(t, fn, dst));
1218 else
1219 dst.complete(fn.apply(t));
1220 } catch (Throwable rex) {
1221 dst.completeExceptionally(rex);
1222 }
1223 }
1224 }
1225 }
1226
1227 static final class OrRunnable<T> extends Completion {
1228 final CompletableFuture<? extends T> src;
1229 final CompletableFuture<?> snd;
1230 final Runnable fn;
1231 final CompletableFuture<Void> dst;
1232 final Executor executor;
1233 OrRunnable(CompletableFuture<? extends T> src,
1234 CompletableFuture<?> snd,
1235 Runnable fn,
1236 CompletableFuture<Void> dst, Executor executor) {
1237 this.src = src; this.snd = snd;
1238 this.fn = fn; this.dst = dst;
1239 this.executor = executor;
1240 }
1241 public void run() {
1242 Object r; Throwable ex;
1243 CompletableFuture<? extends T> a;
1244 final CompletableFuture<?> b;
1245 final Runnable fn;
1246 final CompletableFuture<Void> dst;
1247 if ((dst = this.dst) != null &&
1248 (fn = this.fn) != null &&
1249 (((a = this.src) != null && (r = a.result) != null) ||
1250 ((b = this.snd) != null && (r = b.result) != null)) &&
1251 compareAndSet(0, 1)) {
1252 if ((r instanceof AltResult) &&
1253 (ex = ((AltResult)r).ex) != null) {
1254 dst.completeExceptionally(new RuntimeException(ex));
1255 }
1256 else {
1257 try {
1258 if (executor != null)
1259 executor.execute(new AsyncRunnable(fn, dst));
1260 else {
1261 fn.run();
1262 dst.complete(null);
1263 }
1264 } catch (Throwable rex) {
1265 dst.completeExceptionally(rex);
1266 }
1267 }
1268 }
1269 }
1270 }
1271
1272 static final class ExceptionAction<T> extends Completion {
1273 final CompletableFuture<? extends T> src;
1274 final Block<? super Throwable> fn;
1275 final CompletableFuture<Void> dst;
1276 ExceptionAction(CompletableFuture<? extends T> src,
1277 Block<? super Throwable> fn,
1278 CompletableFuture<Void> dst) {
1279 this.src = src; this.fn = fn; this.dst = dst;
1280 }
1281 public void run() {
1282 CompletableFuture<? extends T> a;
1283 Block<? super Throwable> fn;
1284 CompletableFuture<Void> dst;
1285 Object r; Throwable ex;
1286 if ((dst = this.dst) != null &&
1287 (fn = this.fn) != null &&
1288 (a = this.src) != null &&
1289 (r = a.result) != null &&
1290 compareAndSet(0, 1)) {
1291 if ((r instanceof AltResult) &&
1292 (ex = ((AltResult)r).ex) != null) {
1293 try {
1294 fn.accept(ex);
1295 dst.complete(null);
1296 } catch (Throwable rex) {
1297 dst.completeExceptionally(rex);
1298 }
1299 }
1300 }
1301 }
1302 }
1303
1304 /* ------------- then/and/or implementations -------------- */
1305
1306 private <U> CompletableFuture<U> thenFunction(Function<? super T,? extends U> fn,
1307 Executor executor) {
1308
1309 if (fn == null) throw new NullPointerException();
1310 CompletableFuture<U> dst = new CompletableFuture<U>();
1311 ThenFunction<T,U> d = null;
1312 Object r;
1313 if ((r = result) == null) {
1314 CompletionNode p = new CompletionNode
1315 (d = new ThenFunction<T,U>(this, fn, dst, executor));
1316 while ((r = result) == null) {
1317 if (UNSAFE.compareAndSwapObject
1318 (this, COMPLETIONS, p.next = completions, p))
1319 break;
1320 }
1321 }
1322 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1323 T t; Throwable ex = null;
1324 if (r instanceof AltResult) {
1325 if ((ex = ((AltResult)r).ex) != null)
1326 dst.completeExceptionally(new RuntimeException(ex));
1327 t = null;
1328 }
1329 else
1330 t = (T) r;
1331 if (ex == null) {
1332 try {
1333 if (executor != null)
1334 executor.execute(new AsyncFunction(t, fn, dst));
1335 else
1336 dst.complete(fn.apply(t));
1337 } catch (Throwable rex) {
1338 dst.completeExceptionally(rex);
1339 }
1340 }
1341 postComplete();
1342 }
1343 return dst;
1344 }
1345
1346 private CompletableFuture<Void> thenRunnable(Runnable action,
1347 Executor executor) {
1348 if (action == null) throw new NullPointerException();
1349 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1350 ThenRunnable<T> d = null;
1351 Object r;
1352 if ((r = result) == null) {
1353 CompletionNode p = new CompletionNode
1354 (d = new ThenRunnable<T>(this, action, dst, executor));
1355 while ((r = result) == null) {
1356 if (UNSAFE.compareAndSwapObject
1357 (this, COMPLETIONS, p.next = completions, p))
1358 break;
1359 }
1360 }
1361 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1362 Throwable ex = null;
1363 if (r instanceof AltResult) {
1364 if ((ex = ((AltResult)r).ex) != null)
1365 dst.completeExceptionally(new RuntimeException(ex));
1366 }
1367 if (ex == null) {
1368 try {
1369 if (executor != null)
1370 executor.execute(new AsyncRunnable(action, dst));
1371 else {
1372 action.run();
1373 dst.complete(null);
1374 }
1375 } catch (Throwable rex) {
1376 dst.completeExceptionally(rex);
1377 }
1378 }
1379 postComplete();
1380 }
1381 return dst;
1382 }
1383
1384 private <U,V> CompletableFuture<V> andFunction(CompletableFuture<? extends U> other,
1385 BiFunction<? super T,? super U,? extends V> fn,
1386 Executor executor) {
1387 if (other == null || fn == null) throw new NullPointerException();
1388 CompletableFuture<V> dst = new CompletableFuture<V>();
1389 AndFunction<T,U,V> d = null;
1390 Object r, s = null;
1391 if ((r = result) == null || (s = other.result) == null) {
1392 d = new AndFunction<T,U,V>(this, other, fn, dst, executor);
1393 CompletionNode q = null, p = new CompletionNode(d);
1394 while ((r == null && (r = result) == null) ||
1395 (s == null && (s = other.result) == null)) {
1396 if (q != null) {
1397 if (s != null ||
1398 UNSAFE.compareAndSwapObject
1399 (other, COMPLETIONS, q.next = other.completions, q))
1400 break;
1401 }
1402 else if (r != null ||
1403 UNSAFE.compareAndSwapObject
1404 (this, COMPLETIONS, p.next = completions, p)) {
1405 if (s != null)
1406 break;
1407 q = new CompletionNode(d);
1408 }
1409 }
1410 }
1411 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1412 T t; U u; Throwable ex = null;
1413 if (r instanceof AltResult) {
1414 if ((ex = ((AltResult)r).ex) != null)
1415 dst.completeExceptionally(new RuntimeException(ex));
1416 t = null;
1417 }
1418 else
1419 t = (T) r;
1420 if (ex != null)
1421 u = null;
1422 else if (s instanceof AltResult) {
1423 if ((ex = ((AltResult)s).ex) != null)
1424 dst.completeExceptionally(new RuntimeException(ex));
1425 u = null;
1426 }
1427 else
1428 u = (U) s;
1429 if (ex == null) {
1430 try {
1431 if (executor != null)
1432 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1433 else
1434 dst.complete(fn.apply(t, u));
1435 } catch (Throwable rex) {
1436 dst.completeExceptionally(rex);
1437 }
1438 }
1439 }
1440 if (r != null)
1441 postComplete();
1442 if (s != null)
1443 other.postComplete();
1444 return dst;
1445 }
1446
1447 private CompletableFuture<Void> andRunnable(CompletableFuture<?> other,
1448 Runnable action,
1449 Executor executor) {
1450 if (other == null || action == null) throw new NullPointerException();
1451 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1452 AndRunnable<T> d = null;
1453 Object r, s = null;
1454 if ((r = result) == null || (s = other.result) == null) {
1455 d = new AndRunnable<T>(this, other, action, dst, executor);
1456 CompletionNode q = null, p = new CompletionNode(d);
1457 while ((r == null && (r = result) == null) ||
1458 (s == null && (s = other.result) == null)) {
1459 if (q != null) {
1460 if (s != null ||
1461 UNSAFE.compareAndSwapObject
1462 (other, COMPLETIONS, q.next = other.completions, q))
1463 break;
1464 }
1465 else if (r != null ||
1466 UNSAFE.compareAndSwapObject
1467 (this, COMPLETIONS, p.next = completions, p)) {
1468 if (s != null)
1469 break;
1470 q = new CompletionNode(d);
1471 }
1472 }
1473 }
1474 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1475 Throwable ex = null;
1476 if ((r instanceof AltResult) &&
1477 (ex = ((AltResult)r).ex) != null)
1478 dst.completeExceptionally(new RuntimeException(ex));
1479 else if ((s instanceof AltResult) &&
1480 (ex = ((AltResult)s).ex) != null)
1481 dst.completeExceptionally(new RuntimeException(ex));
1482 else {
1483 try {
1484 if (executor != null)
1485 executor.execute(new AsyncRunnable(action, dst));
1486 else {
1487 action.run();
1488 dst.complete(null);
1489 }
1490 } catch (Throwable rex) {
1491 dst.completeExceptionally(rex);
1492 }
1493 }
1494 }
1495 if (r != null)
1496 postComplete();
1497 if (s != null)
1498 other.postComplete();
1499 return dst;
1500 }
1501
1502 private <U> CompletableFuture<U> orFunction(CompletableFuture<? extends T> other,
1503 Function<? super T, U> fn,
1504 Executor executor) {
1505 if (other == null || fn == null) throw new NullPointerException();
1506 CompletableFuture<U> dst = new CompletableFuture<U>();
1507 OrFunction<T,U> d = null;
1508 Object r;
1509 if ((r = result) == null && (r = other.result) == null) {
1510 d = new OrFunction<T,U>(this, other, fn, dst, executor);
1511 CompletionNode q = null, p = new CompletionNode(d);
1512 while ((r = result) == null && (r = other.result) == null) {
1513 if (q != null) {
1514 if (UNSAFE.compareAndSwapObject
1515 (other, COMPLETIONS, q.next = other.completions, q))
1516 break;
1517 }
1518 else if (UNSAFE.compareAndSwapObject
1519 (this, COMPLETIONS, p.next = completions, p))
1520 q = new CompletionNode(d);
1521 }
1522 }
1523 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1524 T t; Throwable ex = null;
1525 if (r instanceof AltResult) {
1526 if ((ex = ((AltResult)r).ex) != null)
1527 dst.completeExceptionally(new RuntimeException(ex));
1528 t = null;
1529 }
1530 else
1531 t = (T) r;
1532 if (ex == null) {
1533 try {
1534 if (executor != null)
1535 executor.execute(new AsyncFunction(t, fn, dst));
1536 else
1537 dst.complete(fn.apply(t));
1538 } catch (Throwable rex) {
1539 dst.completeExceptionally(rex);
1540 }
1541 }
1542 }
1543 if (r != null) {
1544 if (result != null)
1545 postComplete();
1546 if (other.result != null)
1547 other.postComplete();
1548 }
1549 return dst;
1550 }
1551
1552 private CompletableFuture<Void> orRunnable(CompletableFuture<?> other,
1553 Runnable action,
1554 Executor executor) {
1555 if (other == null || action == null) throw new NullPointerException();
1556 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1557 OrRunnable<T> d = null;
1558 Object r;
1559 if ((r = result) == null && (r = other.result) == null) {
1560 d = new OrRunnable<T>(this, other, action, dst, executor);
1561 CompletionNode q = null, p = new CompletionNode(d);
1562 while ((r = result) == null && (r = other.result) == null) {
1563 if (q != null) {
1564 if (UNSAFE.compareAndSwapObject
1565 (other, COMPLETIONS, q.next = other.completions, q))
1566 break;
1567 }
1568 else if (UNSAFE.compareAndSwapObject
1569 (this, COMPLETIONS, p.next = completions, p))
1570 q = new CompletionNode(d);
1571 }
1572 }
1573 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1574 Throwable ex = null;
1575 if ((r instanceof AltResult) &&
1576 (ex = ((AltResult)r).ex) != null)
1577 dst.completeExceptionally(new RuntimeException(ex));
1578 else {
1579 try {
1580 if (executor != null)
1581 executor.execute(new AsyncRunnable(action, dst));
1582 else {
1583 action.run();
1584 dst.complete(null);
1585 }
1586 } catch (Throwable rex) {
1587 dst.completeExceptionally(rex);
1588 }
1589 }
1590 }
1591 if (r != null) {
1592 if (result != null)
1593 postComplete();
1594 if (other.result != null)
1595 other.postComplete();
1596 }
1597 return dst;
1598 }
1599
1600 /* ------------- misc -------------- */
1601
1602 /**
1603 * A version of "sneaky throw" to relay exceptions
1604 */
1605 static void rethrow(final Throwable ex) {
1606 if (ex != null) {
1607 if (ex instanceof Error)
1608 throw (Error)ex;
1609 if (ex instanceof RuntimeException)
1610 throw (RuntimeException)ex;
1611 throw uncheckedThrowable(ex, RuntimeException.class);
1612 }
1613 }
1614
1615 /**
1616 * The sneaky part of sneaky throw, relying on generics
1617 * limitations to evade compiler complaints about rethrowing
1618 * unchecked exceptions
1619 */
1620 @SuppressWarnings("unchecked") static <T extends Throwable>
1621 T uncheckedThrowable(final Throwable t, final Class<T> c) {
1622 return (T)t; // rely on vacuous cast
1623 }
1624
1625 // Unsafe mechanics
1626 private static final sun.misc.Unsafe UNSAFE;
1627 private static final long RESULT;
1628 private static final long WAITERS;
1629 private static final long COMPLETIONS;
1630 static {
1631 try {
1632 UNSAFE = sun.misc.Unsafe.getUnsafe();
1633 Class<?> k = CompletableFuture.class;
1634 RESULT = UNSAFE.objectFieldOffset
1635 (k.getDeclaredField("result"));
1636 WAITERS = UNSAFE.objectFieldOffset
1637 (k.getDeclaredField("waiters"));
1638 COMPLETIONS = UNSAFE.objectFieldOffset
1639 (k.getDeclaredField("completions"));
1640 } catch (Exception e) {
1641 throw new Error(e);
1642 }
1643 }
1644
1645 }