ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.5
Committed: Fri Dec 28 14:03:11 2012 UTC (11 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.4: +31 -37 lines
Log Message:
avoid bad casts in exception reporting

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Block;
9 import java.util.function.Supplier;
10 import java.util.function.Function;
11 import java.util.function.BiFunction;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.ForkJoinPool;
15 import java.util.concurrent.ForkJoinTask;
16 import java.util.concurrent.Executor;
17 import java.util.concurrent.ThreadLocalRandom;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.TimeoutException;
20 import java.util.concurrent.CancellationException;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import java.util.concurrent.locks.LockSupport;
23
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>Similar methods are available for function-based usages in
31 * which dependent stages typically propagate values, as well as
32 * result-less action-based usages, that are normally associated with
33 * {@code CompletableFuture<Void>} Futures. Functions and actions
34 * supplied for dependent completions using {@code then}, {@code
35 * andThen}, {@code orThen}, and {@code exceptionally} may be
36 * performed by the thread that completes the current
37 * CompletableFuture, or by any other caller of these methods. There
38 * are no guarantees about the order of processing completions unless
39 * constrained by method {@code then} and related methods.
40 *
41 * <p>When two or more threads attempt to {@link #complete} or {@link
42 * #completeExceptionally} a CompletableFuture, only one of them will
43 * succeed. Upon exceptional completion, or when a completion entails
44 * computation of a function or action, and it terminates abruptly
45 * with an exception, then further completions act as {@code
46 * completeExceptionally} with that exception.
47 *
48 * <p>CompletableFutures themselves do not execute asynchronously.
49 * However, the {@code async} methods provide commonly useful ways to
50 * to commence asynchronous processing, using either a given {@link
51 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
52 * function or action that will result in the completion of a new
53 * CompletableFuture.
54 *
55 * @author Doug Lea
56 * @since 1.8
57 */
58 public class CompletableFuture<T> implements Future<T> {
59 /*
60 * Quick overview (more to come):
61 *
62 * 1. Non-nullness of field result indicates done. An AltResult is
63 * used to box null as a result, as well as to hold exceptions.
64 *
65 * 2. Waiters are held in a Treiber stack similar to the one used
66 * in FutureTask
67 *
68 * 3. Completions are also kept in a list/stack, and pulled off
69 * and run when completion is triggered.
70 */
71
72 static final class AltResult {
73 final Throwable ex; // null only for NIL
74 AltResult(Throwable ex) { this.ex = ex; }
75 }
76
77 static final AltResult NIL = new AltResult(null);
78
79 /**
80 * Simple linked list nodes to record waiting threads in a Treiber
81 * stack. See other classes such as Phaser and SynchronousQueue
82 * for more detailed explanation.
83 */
84 static final class WaitNode {
85 volatile Thread thread;
86 volatile WaitNode next;
87 }
88
89 /**
90 * Simple linked list nodes to record completions, used in
91 * basically the same way as WaitNodes
92 */
93 static final class CompletionNode {
94 final Completion completion;
95 volatile CompletionNode next;
96 CompletionNode(Completion completion) { this.completion = completion; }
97 }
98
99
100 volatile Object result; // either the result or boxed AltResult
101 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
102 volatile CompletionNode completions; // list (Treiber stack) of completions
103
104 /**
105 * Creates a new incomplete CompletableFuture.
106 */
107 public CompletableFuture() {
108 }
109
110 /**
111 * Asynchronously executes in the {@link
112 * ForkJoinPool#commonPool()}, a task that completes the returned
113 * CompletableFuture with the result of the given Supplier.
114 *
115 * @param supplier a function returning the value to be used
116 * to complete the returned CompletableFuture.
117 * @return the CompletableFuture.
118 */
119 public static <U> CompletableFuture<U> async(Supplier<U> supplier) {
120 if (supplier == null) throw new NullPointerException();
121 CompletableFuture<U> f = new CompletableFuture<U>();
122 ForkJoinPool.commonPool().
123 execute((ForkJoinTask<?>)new AsyncSupplier(supplier, f));
124 return f;
125 }
126
127 /**
128 * Asynchronously executes using the given executor, a task that
129 * completes the returned CompletableFuture with the result of the
130 * given Supplier.
131 *
132 * @param supplier a function returning the value to be used
133 * to complete the returned CompletableFuture.
134 * @param executor the executor to use for asynchronous execution
135 * @return the CompletableFuture.
136 */
137 public static <U> CompletableFuture<U> async(Supplier<U> supplier,
138 Executor executor) {
139 if (executor == null || supplier == null)
140 throw new NullPointerException();
141 CompletableFuture<U> f = new CompletableFuture<U>();
142 executor.execute(new AsyncSupplier(supplier, f));
143 return f;
144 }
145
146 /**
147 * Asynchronously executes in the {@link
148 * ForkJoinPool#commonPool()} a task that runs the given action,
149 * and then completes the returned CompletableFuture
150 *
151 * @param runnable the action to run before completing the
152 * returned CompletableFuture.
153 * @return the CompletableFuture.
154 */
155 public static CompletableFuture<Void> async(Runnable runnable) {
156 if (runnable == null) throw new NullPointerException();
157 CompletableFuture<Void> f = new CompletableFuture<Void>();
158 ForkJoinPool.commonPool().
159 execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
160 return f;
161 }
162
163 /**
164 * Asynchronously executes using the given executor, a task that
165 * runs the given action, and then completes the returned
166 * CompletableFuture
167 *
168 * @param runnable the action to run before completing the
169 * returned CompletableFuture.
170 * @param executor the executor to use for asynchronous execution
171 * @return the CompletableFuture.
172 */
173 public static CompletableFuture<Void> async(Runnable runnable,
174 Executor executor) {
175 if (executor == null || runnable == null)
176 throw new NullPointerException();
177 CompletableFuture<Void> f = new CompletableFuture<Void>();
178 executor.execute(new AsyncRunnable(runnable, f));
179 return f;
180 }
181
182 /**
183 * Returns {@code true} if completed in any fashion: normally,
184 * exceptionally, or cancellation.
185 *
186 * @return {@code true} if completed
187 */
188 public boolean isDone() {
189 return result != null;
190 }
191
192 /**
193 * Returns the result value when complete, or throws an
194 * (unchecked) exception if completed exceptionally. To better
195 * conform with the use of common functional forms, this method
196 * transforms any checked exception possible with {@link
197 * Future#get} into an (unchecked) {@link RuntimeException} with
198 * the underlying exception as its cause. (The checked exception
199 * convention is available using the timed form of get.)
200 *
201 * @return the result value
202 */
203 public T get() {
204 Object r; Throwable ex;
205 if ((r = result) == null)
206 return waitingGet();
207 if (r instanceof AltResult) {
208 if ((ex = ((AltResult)r).ex) != null) {
209 if (ex instanceof Error)
210 throw (Error)ex;
211 if (ex instanceof RuntimeException)
212 throw (RuntimeException)ex;
213 throw new RuntimeException(ex);
214 }
215 return null;
216 }
217 return (T)r;
218 }
219
220 /**
221 * Returns the result value (or throws any encountered exception)
222 * if completed, else returns the given valueIfAbsent.
223 *
224 * @param valueIfAbsent the value to return if not completed
225 * @return the result value, if completed, else the given valueIfAbsent
226 */
227 public T getNow(T valueIfAbsent) {
228 Object r; Throwable ex;
229 if ((r = result) == null)
230 return valueIfAbsent;
231 if (r instanceof AltResult) {
232 if ((ex = ((AltResult)r).ex) != null) {
233 if (ex instanceof Error)
234 throw (Error)ex;
235 if (ex instanceof RuntimeException)
236 throw (RuntimeException)ex;
237 throw new RuntimeException(ex);
238 }
239 return null;
240 }
241 return (T)r;
242 }
243
244 /**
245 * Waits if necessary for at most the given time for completion,
246 * and then retrieves its result, if available.
247 *
248 * @param timeout the maximum time to wait
249 * @param unit the time unit of the timeout argument
250 * @return the computed result
251 * @throws CancellationException if the computation was cancelled
252 * @throws ExecutionException if the computation threw an
253 * exception
254 * @throws InterruptedException if the current thread was interrupted
255 * while waiting
256 * @throws TimeoutException if the wait timed out
257 */
258 public T get(long timeout, TimeUnit unit)
259 throws InterruptedException, ExecutionException, TimeoutException {
260 Object r; Throwable ex;
261 long nanos = unit.toNanos(timeout);
262 if (Thread.interrupted())
263 throw new InterruptedException();
264 if ((r = result) == null)
265 r = timedAwaitDone(nanos);
266 if (r instanceof AltResult) {
267 if ((ex = ((AltResult)r).ex) != null) {
268 if (ex instanceof ExecutionException) // avoid re-wrap
269 throw (ExecutionException)ex;
270 throw new ExecutionException(ex);
271 }
272 return null;
273 }
274 return (T)r;
275 }
276
277 /**
278 * If not already completed, sets the value returned by {@link
279 * #get()} and related methods to the given value.
280 *
281 * @param value the result value
282 * @return true if this invocation caused this CompletableFuture
283 * to transition to a completed state, else false.
284 */
285 public boolean complete(T value) {
286 if (result == null &&
287 UNSAFE.compareAndSwapObject(this, RESULT, null,
288 (value == null) ? NIL : value)) {
289 postComplete();
290 return true;
291 }
292 return false;
293 }
294
295 /**
296 * If not already completed, causes invocations of {@link #get()}
297 * and related methods to throw the given exception.
298 *
299 * @param ex the exception
300 * @return true if this invocation caused this CompletableFuture
301 * to transition to a completed state, else false.
302 */
303 public boolean completeExceptionally(Throwable ex) {
304 if (ex == null) throw new NullPointerException();
305 if (result == null) {
306 Object r = new AltResult(ex);
307 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
308 postComplete();
309 return true;
310 }
311 }
312 return false;
313 }
314
315 /**
316 * Creates and returns a CompletableFuture that is completed with
317 * the result of the given function of this CompletableFuture.
318 * If this CompletableFuture completes exceptionally,
319 * then the returned CompletableFuture also does so,
320 * with a RuntimeException having this exception as
321 * its cause.
322 *
323 * @param fn the function to use to compute the value of
324 * the returned CompletableFuture
325 * @return the new CompletableFuture
326 */
327 public <U> CompletableFuture<U> then(Function<? super T,? extends U> fn) {
328 return thenFunction(fn, null);
329 }
330
331 /**
332 * Creates and returns a CompletableFuture that is asynchronously
333 * completed using the {@link ForkJoinPool#commonPool()} with the
334 * result of the given function of this CompletableFuture. If
335 * this CompletableFuture completes exceptionally, then the
336 * returned CompletableFuture also does so, with a
337 * RuntimeException having this exception as its cause.
338 *
339 * @param fn the function to use to compute the value of
340 * the returned CompletableFuture
341 * @return the new CompletableFuture
342 */
343 public <U> CompletableFuture<U> thenAsync(Function<? super T,? extends U> fn) {
344 return thenFunction(fn, ForkJoinPool.commonPool());
345 }
346
347 /**
348 * Creates and returns a CompletableFuture that is asynchronously
349 * completed using the given executor with the result of the given
350 * function of this CompletableFuture. If this CompletableFuture
351 * completes exceptionally, then the returned CompletableFuture
352 * also does so, with a RuntimeException having this exception as
353 * its cause.
354 *
355 * @param fn the function to use to compute the value of
356 * the returned CompletableFuture
357 * @param executor the executor to use for asynchronous execution
358 * @return the new CompletableFuture
359 */
360 public <U> CompletableFuture<U> thenAsync(Function<? super T,? extends U> fn,
361 Executor executor) {
362 if (executor == null) throw new NullPointerException();
363 return thenFunction(fn, executor);
364 }
365
366 /**
367 * Creates and returns a CompletableFuture that is completed after
368 * performing the given action if/when this CompletableFuture
369 * completes. If this CompletableFuture completes exceptionally,
370 * then the returned CompletableFuture also does so, with a
371 * RuntimeException having this exception as its cause.
372 *
373 * @param action the action to perform before completing the
374 * returned CompletableFuture
375 * @return the new CompletableFuture
376 */
377 public CompletableFuture<Void> then(Runnable action) {
378 return thenRunnable(action, null);
379 }
380
381 /**
382 * Creates and returns a CompletableFuture that is asynchronously
383 * completed using the {@link ForkJoinPool#commonPool()} after
384 * performing the given action if/when this CompletableFuture
385 * completes. If this CompletableFuture completes exceptionally,
386 * then the returned CompletableFuture also does so, with a
387 * RuntimeException having this exception as its cause.
388 *
389 * @param action the action to perform before completing the
390 * returned CompletableFuture
391 * @return the new CompletableFuture
392 */
393 public CompletableFuture<Void> thenAsync(Runnable action) {
394 return thenRunnable(action, ForkJoinPool.commonPool());
395 }
396
397 /**
398 * Creates and returns a CompletableFuture that is asynchronously
399 * completed using the given executor after performing the given
400 * action if/when this CompletableFuture completes. If this
401 * CompletableFuture completes exceptionally, then the returned
402 * CompletableFuture also does so, with a RuntimeException having
403 * this exception as its cause.
404 *
405 * @param action the action to perform before completing the
406 * returned CompletableFuture
407 * @param executor the executor to use for asynchronous execution
408 * @return the new CompletableFuture
409 */
410 public CompletableFuture<Void> thenAsync(Runnable action, Executor executor) {
411 if (executor == null) throw new NullPointerException();
412 return thenRunnable(action, executor);
413 }
414
415 /**
416 * Creates and returns a CompletableFuture that is completed with
417 * the result of the given function of this and the other given
418 * CompletableFuture's results if/when both complete. If this or
419 * the other CompletableFuture complete exceptionally, then the
420 * returned CompletableFuture also does so, with a
421 * RuntimeException having the exception as its cause.
422 *
423 * @param other the other CompletableFuture
424 * @param fn the function to use to compute the value of
425 * the returned CompletableFuture
426 * @return the new CompletableFuture
427 */
428 public <U,V> CompletableFuture<V> andThen(CompletableFuture<? extends U> other,
429 BiFunction<? super T,? super U,? extends V> fn) {
430 return andFunction(other, fn, null);
431 }
432
433 /**
434 * Creates and returns a CompletableFuture that is asynchronously
435 * completed using the {@link ForkJoinPool#commonPool()} with
436 * the result of the given function of this and the other given
437 * CompletableFuture's results if/when both complete. If this or
438 * the other CompletableFuture complete exceptionally, then the
439 * returned CompletableFuture also does so, with a
440 * RuntimeException having the exception as its cause.
441 *
442 * @param other the other CompletableFuture
443 * @param fn the function to use to compute the value of
444 * the returned CompletableFuture
445 * @return the new CompletableFuture
446 */
447 public <U,V> CompletableFuture<V> andThenAsync(CompletableFuture<? extends U> other,
448 BiFunction<? super T,? super U,? extends V> fn) {
449 return andFunction(other, fn, ForkJoinPool.commonPool());
450 }
451
452 /**
453 * Creates and returns a CompletableFuture that is
454 * asynchronously completed using the given executor with the
455 * result of the given function of this and the other given
456 * CompletableFuture's results if/when both complete. If this or
457 * the other CompletableFuture complete exceptionally, then the
458 * returned CompletableFuture also does so, with a
459 * RuntimeException having the exception as its cause.
460 *
461 * @param other the other CompletableFuture
462 * @param fn the function to use to compute the value of
463 * the returned CompletableFuture
464 * @param executor the executor to use for asynchronous execution
465 * @return the new CompletableFuture
466 */
467
468 public <U,V> CompletableFuture<V> andThenAsync(CompletableFuture<? extends U> other,
469 BiFunction<? super T,? super U,? extends V> fn,
470 Executor executor) {
471 if (executor == null) throw new NullPointerException();
472 return andFunction(other, fn, executor);
473 }
474
475 /**
476 * Creates and returns a CompletableFuture that is completed
477 * if/when this and the other given CompletableFuture both
478 * complete. If this and/or the other CompletableFuture complete
479 * exceptionally, then the returned CompletableFuture also does
480 * so, with a RuntimeException having the one of the exceptions as
481 * its cause.
482 *
483 * @param other the other CompletableFuture
484 * @param action the action to perform before completing the
485 * returned CompletableFuture
486 * @return the new CompletableFuture
487 */
488 public CompletableFuture<Void> andThen(CompletableFuture<?> other,
489 Runnable action) {
490 return andRunnable(other, action, null);
491 }
492
493 /**
494 * Creates and returns a CompletableFuture that is completed
495 * asynchronously using the {@link ForkJoinPool#commonPool()}
496 * if/when this and the other given CompletableFuture both
497 * complete. If this and/or the other CompletableFuture complete
498 * exceptionally, then the returned CompletableFuture also does
499 * so, with a RuntimeException having the one of the exceptions as
500 * its cause.
501 *
502 * @param other the other CompletableFuture
503 * @param action the action to perform before completing the
504 * returned CompletableFuture
505 * @return the new CompletableFuture
506 */
507 public CompletableFuture<Void> andThenAsync(CompletableFuture<?> other,
508 Runnable action) {
509 return andRunnable(other, action, ForkJoinPool.commonPool());
510 }
511
512 /**
513 * Creates and returns a CompletableFuture that is completed
514 * asynchronously using the given executor
515 * if/when this and the other given CompletableFuture both
516 * complete. If this and/or the other CompletableFuture complete
517 * exceptionally, then the returned CompletableFuture also does
518 * so, with a RuntimeException having the one of the exceptions as
519 * its cause.
520 *
521 * @param other the other CompletableFuture
522 * @param action the action to perform before completing the
523 * returned CompletableFuture
524 * @param executor the executor to use for asynchronous execution
525 * @return the new CompletableFuture
526 */
527 public CompletableFuture<Void> andThenAsync(CompletableFuture<?> other,
528 Runnable action,
529 Executor executor) {
530 if (executor == null) throw new NullPointerException();
531 return andRunnable(other, action, executor);
532 }
533
534 /**
535 * Creates and returns a CompletableFuture that is completed with
536 * the result of the given function of either this or the other
537 * given CompletableFuture's results if/when either complete. If
538 * this and/or the other CompletableFuture complete exceptionally,
539 * then the returned CompletableFuture may also do so, with a
540 * RuntimeException having one of these exceptions as its cause.
541 * No guarantees are made about which result or exception is used
542 * in the returned CompletableFuture.
543 *
544 * @param other the other CompletableFuture
545 * @param fn the function to use to compute the value of
546 * the returned CompletableFuture
547 * @return the new CompletableFuture
548 */
549 public <U> CompletableFuture<U> orThen(CompletableFuture<? extends T> other,
550 Function<? super T, U> fn) {
551 return orFunction(other, fn, null);
552 }
553
554 /**
555 * Creates and returns a CompletableFuture that is completed
556 * asynchronously using the {@link ForkJoinPool#commonPool()} with
557 * the result of the given function of either this or the other
558 * given CompletableFuture's results if/when either complete. If
559 * this and/or the other CompletableFuture complete exceptionally,
560 * then the returned CompletableFuture may also do so, with a
561 * RuntimeException having one of these exceptions as its cause.
562 * No guarantees are made about which result or exception is used
563 * in the returned CompletableFuture.
564 *
565 * @param other the other CompletableFuture
566 * @param fn the function to use to compute the value of
567 * the returned CompletableFuture
568 * @return the new CompletableFuture
569 */
570 public <U> CompletableFuture<U> orThenAsync(CompletableFuture<? extends T> other,
571 Function<? super T, U> fn) {
572 return orFunction(other, fn, ForkJoinPool.commonPool());
573 }
574
575 /**
576 * Creates and returns a CompletableFuture that is completed
577 * asynchronously using the given executor with the result of the
578 * given function of either this or the other given
579 * CompletableFuture's results if/when either complete. If this
580 * and/or the other CompletableFuture complete exceptionally, then
581 * the returned CompletableFuture may also do so, with a
582 * RuntimeException having one of these exceptions as its cause.
583 * No guarantees are made about which result or exception is used
584 * in the returned CompletableFuture.
585 *
586 * @param other the other CompletableFuture
587 * @param fn the function to use to compute the value of
588 * the returned CompletableFuture
589 * @param executor the executor to use for asynchronous execution
590 * @return the new CompletableFuture
591 */
592 public <U> CompletableFuture<U> orThen(CompletableFuture<? extends T> other,
593 Function<? super T, U> fn,
594 Executor executor) {
595 if (executor == null) throw new NullPointerException();
596 return orFunction(other, fn, executor);
597 }
598
599 /**
600 * Creates and returns a CompletableFuture that is completed
601 * after this or the other given CompletableFuture complete. If
602 * this and/or the other CompletableFuture complete exceptionally,
603 * then the returned CompletableFuture may also do so, with a
604 * RuntimeException having one of these exceptions as its cause.
605 * No guarantees are made about which exception is used in the
606 * returned CompletableFuture.
607 *
608 * @param other the other CompletableFuture
609 * @param action the action to perform before completing the
610 * returned CompletableFuture
611 * @return the new CompletableFuture
612 */
613 public CompletableFuture<Void> orThen(CompletableFuture<?> other,
614 Runnable action) {
615 return orRunnable(other, action, null);
616 }
617
618 /**
619 * Creates and returns a CompletableFuture that is completed
620 * asynchronously using the {@link ForkJoinPool#commonPool()}
621 * after this or the other given CompletableFuture complete. If
622 * this and/or the other CompletableFuture complete exceptionally,
623 * then the returned CompletableFuture may also do so, with a
624 * RuntimeException having one of these exceptions as its cause.
625 * No guarantees are made about which exception is used in the
626 * returned CompletableFuture.
627 *
628 * @param other the other CompletableFuture
629 * @param action the action to perform before completing the
630 * returned CompletableFuture
631 * @return the new CompletableFuture
632 */
633 public CompletableFuture<Void> orThenAsync(CompletableFuture<?> other,
634 Runnable action) {
635 return orRunnable(other, action, ForkJoinPool.commonPool());
636 }
637
638 /**
639 * Creates and returns a CompletableFuture that is completed
640 * asynchronously using the given executor after this or the other
641 * given CompletableFuture complete. If this and/or the other
642 * CompletableFuture complete exceptionally, then the returned
643 * CompletableFuture may also do so, with a RuntimeException
644 * having one of these exceptions as its cause. No guarantees are
645 * made about which exception is used in the returned
646 * CompletableFuture.
647 *
648 * @param other the other CompletableFuture
649 * @param action the action to perform before completing the
650 * returned CompletableFuture
651 * @param executor the executor to use for asynchronous execution
652 * @return the new CompletableFuture
653 */
654 public CompletableFuture<Void> orThenAsync(CompletableFuture<?> other,
655 Runnable action,
656 Executor executor) {
657 if (executor == null) throw new NullPointerException();
658 return orRunnable(other, action, executor);
659 }
660
661 /**
662 * Creates and returns a CompletableFuture that is completed after
663 * performing the given action with the exception triggering this
664 * CompletableFuture's completion if/when it completes
665 * exceptionally.
666 *
667 * @param action the action to perform before completing the
668 * returned CompletableFuture
669 * @return the new CompletableFuture
670 */
671 public CompletableFuture<Void> exceptionally(Block<Throwable> action) {
672 if (action == null) throw new NullPointerException();
673 CompletableFuture<Void> dst = new CompletableFuture<Void>();
674 ExceptionAction<T> d = null;
675 Object r; Throwable ex;
676 if ((r = result) == null) {
677 CompletionNode p =
678 new CompletionNode(d = new ExceptionAction<T>(this, action, dst));
679 while ((r = result) == null) {
680 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
681 p.next = completions, p))
682 break;
683 }
684 }
685 if (r != null && (d == null || d.compareAndSet(0, 1)) &&
686 (r instanceof AltResult) && (ex = ((AltResult)r).ex) != null) {
687 try {
688 action.accept(ex);
689 dst.complete(null);
690 } catch (Throwable rex) {
691 dst.completeExceptionally(rex);
692 }
693 }
694 if (r != null)
695 postComplete();
696 return dst;
697 }
698
699 /**
700 * Attempts to complete this CompletableFuture with
701 * a {@link CancellationException}.
702 *
703 * @param mayInterruptIfRunning this value has no effect in this
704 * implementation because interrupts are not used to control
705 * processing.
706 *
707 * @return {@code true} if this task is now cancelled
708 */
709 public boolean cancel(boolean mayInterruptIfRunning) {
710 Object r;
711 while ((r = result) == null) {
712 r = new AltResult(new CancellationException());
713 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
714 postComplete();
715 return true;
716 }
717 }
718 return ((r instanceof AltResult) &&
719 (((AltResult)r).ex instanceof CancellationException));
720 }
721
722 /**
723 * Returns {@code true} if this CompletableFuture was cancelled
724 * before it completed normally.
725 *
726 * @return {@code true} if this CompletableFuture was cancelled
727 * before it completed normally
728 */
729 public boolean isCancelled() {
730 Object r;
731 return ((r = result) != null &&
732 (r instanceof AltResult) &&
733 (((AltResult)r).ex instanceof CancellationException));
734 }
735
736 /**
737 * Whether or not already completed, sets the value subsequently
738 * returned by method get() and related methods to the given
739 * value. This method is designed for use in error recovery
740 * actions, and is very unlikely to be useful otherwise.
741 *
742 * @param value the completion value
743 */
744 public void force(T value) {
745 result = (value == null) ? NIL : value;
746 postComplete();
747 }
748
749 /**
750 * Removes and signals all waiting threads and runs all completions
751 */
752 private void postComplete() {
753 WaitNode q; Thread t;
754 while ((q = waiters) != null) {
755 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
756 (t = q.thread) != null) {
757 q.thread = null;
758 LockSupport.unpark(t);
759 }
760 }
761
762 CompletionNode h; Completion c;
763 while ((h = completions) != null) {
764 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
765 (c = h.completion) != null)
766 c.run();
767 }
768 }
769
770 /* ------------- waiting for completions -------------- */
771
772 /**
773 * Heuristic spin value for waitingGet() before blocking on
774 * multiprocessors
775 */
776 static final int WAITING_GET_SPINS = 256;
777
778 /**
779 * Returns result after waiting.
780 */
781 private T waitingGet() {
782 WaitNode q = null;
783 boolean queued = false, interrupted = false;
784 int h = 0, spins = 0;
785 for (Object r;;) {
786 if ((r = result) != null) {
787 Throwable ex;
788 if (q != null) // suppress unpark
789 q.thread = null;
790 postComplete(); // help release others
791 if (interrupted)
792 Thread.currentThread().interrupt();
793 if (r instanceof AltResult) {
794 if ((ex = ((AltResult)r).ex) != null) {
795 if (ex instanceof Error)
796 throw (Error)ex;
797 if (ex instanceof RuntimeException)
798 throw (RuntimeException)ex;
799 throw new RuntimeException(ex);
800 }
801 return null;
802 }
803 return (T)r;
804 }
805 else if (h == 0) {
806 h = ThreadLocalRandom.current().nextInt();
807 if (Runtime.getRuntime().availableProcessors() > 1)
808 spins = WAITING_GET_SPINS;
809 }
810 else if (spins > 0) {
811 h ^= h << 1; // xorshift
812 h ^= h >>> 3;
813 if ((h ^= h << 10) >= 0)
814 --spins;
815 }
816 else if (q == null)
817 q = new WaitNode();
818 else if (!queued)
819 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
820 q.next = waiters, q);
821 else if (Thread.interrupted())
822 interrupted = true;
823 else if (q.thread == null)
824 q.thread = Thread.currentThread();
825 else
826 LockSupport.park(this);
827 }
828 }
829
830 /**
831 * Awaits completion or aborts on interrupt or timeout.
832 *
833 * @param nanos time to wait
834 * @return raw result
835 */
836 private Object timedAwaitDone(long nanos)
837 throws InterruptedException, TimeoutException {
838 final long deadline = System.nanoTime() + nanos;
839 WaitNode q = null;
840 boolean queued = false;
841 for (Object r;;) {
842 if (Thread.interrupted()) {
843 removeWaiter(q);
844 throw new InterruptedException();
845 }
846 else if ((r = result) != null) {
847 if (q != null)
848 q.thread = null;
849 postComplete();
850 return r;
851 }
852 else if (q == null)
853 q = new WaitNode();
854 else if (!queued)
855 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
856 q.next = waiters, q);
857 else if ((nanos = deadline - System.nanoTime()) <= 0L) {
858 removeWaiter(q);
859 throw new TimeoutException();
860 }
861 else if (q.thread == null)
862 q.thread = Thread.currentThread();
863 else
864 LockSupport.parkNanos(this, nanos);
865 }
866 }
867
868 /**
869 * Tries to unlink a timed-out or interrupted wait node to avoid
870 * accumulating garbage. Internal nodes are simply unspliced
871 * without CAS since it is harmless if they are traversed anyway
872 * by releasers. To avoid effects of unsplicing from already
873 * removed nodes, the list is retraversed in case of an apparent
874 * race. This is slow when there are a lot of nodes, but we don't
875 * expect lists to be long enough to outweigh higher-overhead
876 * schemes.
877 */
878 private void removeWaiter(WaitNode node) {
879 if (node != null) {
880 node.thread = null;
881 retry:
882 for (;;) { // restart on removeWaiter race
883 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
884 s = q.next;
885 if (q.thread != null)
886 pred = q;
887 else if (pred != null) {
888 pred.next = s;
889 if (pred.thread == null) // check for race
890 continue retry;
891 }
892 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
893 continue retry;
894 }
895 break;
896 }
897 }
898 }
899
900 /* ------------- Async tasks -------------- */
901
902 /** Base class can act as either FJ or plain Runnable */
903 static abstract class Async extends ForkJoinTask<Void> implements Runnable {
904 public final Void getRawResult() { return null; }
905 public final void setRawResult(Void v) { }
906 public final void run() { exec(); }
907 }
908
909 static final class AsyncRunnable extends Async {
910 final Runnable runnable;
911 final CompletableFuture<Void> dst;
912 AsyncRunnable(Runnable runnable, CompletableFuture<Void> dst) {
913 this.runnable = runnable; this.dst = dst;
914 }
915 public final boolean exec() {
916 Runnable fn;
917 CompletableFuture<Void> d;
918 if ((fn = this.runnable) == null || (d = this.dst) == null)
919 throw new NullPointerException();
920 try {
921 fn.run();
922 d.complete(null);
923 } catch (Throwable ex) {
924 d.completeExceptionally(ex);
925 }
926 return true;
927 }
928 private static final long serialVersionUID = 5232453952276885070L;
929 }
930
931 static final class AsyncSupplier<U> extends Async {
932 final Supplier<U> supplier;
933 final CompletableFuture<U> dst;
934 AsyncSupplier(Supplier<U> supplier, CompletableFuture<U> dst) {
935 this.supplier = supplier; this.dst = dst;
936 }
937 public final boolean exec() {
938 Supplier<U> fn;
939 CompletableFuture<U> d;
940 if ((fn = this.supplier) == null || (d = this.dst) == null)
941 throw new NullPointerException();
942 try {
943 d.complete(fn.get());
944 } catch (Throwable ex) {
945 d.completeExceptionally(ex);
946 }
947 return true;
948 }
949 private static final long serialVersionUID = 5232453952276885070L;
950 }
951
952 static final class AsyncFunction<T,U> extends Async {
953 Function<? super T,? extends U> fn;
954 T arg;
955 final CompletableFuture<U> dst;
956 AsyncFunction(T arg, Function<? super T,? extends U> fn,
957 CompletableFuture<U> dst) {
958 this.arg = arg; this.fn = fn; this.dst = dst;
959 }
960 public final boolean exec() {
961 Function<? super T,? extends U> fn;
962 CompletableFuture<U> d;
963 if ((fn = this.fn) == null || (d = this.dst) == null)
964 throw new NullPointerException();
965 try {
966 d.complete(fn.apply(arg));
967 } catch (Throwable ex) {
968 d.completeExceptionally(ex);
969 }
970 return true;
971 }
972 private static final long serialVersionUID = 5232453952276885070L;
973 }
974
975 static final class AsyncBiFunction<T,U,V> extends Async {
976 final BiFunction<? super T,? super U,? extends V> fn;
977 final T arg1;
978 final U arg2;
979 final CompletableFuture<V> dst;
980 AsyncBiFunction(T arg1, U arg2,
981 BiFunction<? super T,? super U,? extends V> fn,
982 CompletableFuture<V> dst) {
983 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
984 }
985 public final boolean exec() {
986 BiFunction<? super T,? super U,? extends V> fn;
987 CompletableFuture<V> d;
988 if ((fn = this.fn) == null || (d = this.dst) == null)
989 throw new NullPointerException();
990 try {
991 d.complete(fn.apply(arg1, arg2));
992 } catch (Throwable ex) {
993 d.completeExceptionally(ex);
994 }
995 return true;
996 }
997 private static final long serialVersionUID = 5232453952276885070L;
998 }
999
1000 /* ------------- Completions -------------- */
1001
1002 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1003 static abstract class Completion extends AtomicInteger implements Runnable {
1004 }
1005
1006 static final class ThenFunction<T,U> extends Completion {
1007 final CompletableFuture<? extends T> src;
1008 final Function<? super T,? extends U> fn;
1009 final CompletableFuture<U> dst;
1010 final Executor executor;
1011 ThenFunction(CompletableFuture<? extends T> src,
1012 final Function<? super T,? extends U> fn,
1013 final CompletableFuture<U> dst, Executor executor) {
1014 this.src = src; this.fn = fn; this.dst = dst;
1015 this.executor = executor;
1016 }
1017 public void run() {
1018 CompletableFuture<? extends T> a;
1019 Function<? super T,? extends U> fn;
1020 CompletableFuture<U> dst;
1021 Object r; T t; Throwable ex;
1022 if ((dst = this.dst) != null &&
1023 (fn = this.fn) != null &&
1024 (a = this.src) != null &&
1025 (r = a.result) != null &&
1026 compareAndSet(0, 1)) {
1027 if (r instanceof AltResult) {
1028 if ((ex = ((AltResult)r).ex) != null) {
1029 dst.completeExceptionally(new RuntimeException(ex));
1030 return;
1031 }
1032 t = null;
1033 }
1034 else
1035 t = (T) r;
1036 try {
1037 if (executor != null)
1038 executor.execute(new AsyncFunction(t, fn, dst));
1039 else
1040 dst.complete(fn.apply(t));
1041 } catch (Throwable rex) {
1042 dst.completeExceptionally(rex);
1043 }
1044 }
1045 }
1046 }
1047
1048 static final class ThenRunnable<T> extends Completion {
1049 final CompletableFuture<? extends T> src;
1050 final Runnable fn;
1051 final CompletableFuture<Void> dst;
1052 final Executor executor;
1053 ThenRunnable(CompletableFuture<? extends T> src,
1054 Runnable fn,
1055 CompletableFuture<Void> dst,
1056 Executor executor) {
1057 this.src = src; this.fn = fn; this.dst = dst;
1058 this.executor = executor;
1059 }
1060 public void run() {
1061 CompletableFuture<? extends T> a;
1062 Runnable fn;
1063 CompletableFuture<Void> dst;
1064 Object r; Throwable ex;
1065 if ((dst = this.dst) != null &&
1066 (fn = this.fn) != null &&
1067 (a = this.src) != null &&
1068 (r = a.result) != null &&
1069 compareAndSet(0, 1)) {
1070 if (r instanceof AltResult) {
1071 if ((ex = ((AltResult)r).ex) != null) {
1072 dst.completeExceptionally(new RuntimeException(ex));
1073 return;
1074 }
1075 }
1076 try {
1077 if (executor != null)
1078 executor.execute(new AsyncRunnable(fn, dst));
1079 else {
1080 fn.run();
1081 dst.complete(null);
1082 }
1083 } catch (Throwable rex) {
1084 dst.completeExceptionally(rex);
1085 }
1086 }
1087 }
1088 }
1089
1090 static final class AndFunction<T,U,V> extends Completion {
1091 final CompletableFuture<? extends T> src;
1092 final CompletableFuture<? extends U> snd;
1093 final BiFunction<? super T,? super U,? extends V> fn;
1094 final CompletableFuture<V> dst;
1095 final Executor executor;
1096 AndFunction(CompletableFuture<? extends T> src,
1097 CompletableFuture<? extends U> snd,
1098 BiFunction<? super T,? super U,? extends V> fn,
1099 CompletableFuture<V> dst, Executor executor) {
1100 this.src = src; this.snd = snd;
1101 this.fn = fn; this.dst = dst;
1102 this.executor = executor;
1103 }
1104 public void run() {
1105 Object r, s; T t; U u; Throwable ex;
1106 CompletableFuture<? extends T> a;
1107 CompletableFuture<? extends U> b;
1108 BiFunction<? super T,? super U,? extends V> fn;
1109 CompletableFuture<V> dst;
1110 if ((dst = this.dst) != null &&
1111 (fn = this.fn) != null &&
1112 (a = this.src) != null &&
1113 (r = a.result) != null &&
1114 (b = this.snd) != null &&
1115 (s = b.result) != null &&
1116 compareAndSet(0, 1)) {
1117 if (r instanceof AltResult) {
1118 if ((ex = ((AltResult)r).ex) != null) {
1119 dst.completeExceptionally(new RuntimeException(ex));
1120 return;
1121 }
1122 t = null;
1123 }
1124 else
1125 t = (T) r;
1126 if (s instanceof AltResult) {
1127 if ((ex = ((AltResult)s).ex) != null) {
1128 dst.completeExceptionally(new RuntimeException(ex));
1129 return;
1130 }
1131 u = null;
1132 }
1133 else
1134 u = (U) s;
1135 try {
1136 if (executor != null)
1137 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1138 else
1139 dst.complete(fn.apply(t, u));
1140 } catch (Throwable rex) {
1141 dst.completeExceptionally(rex);
1142 }
1143 }
1144 }
1145 }
1146
1147 static final class AndRunnable<T> extends Completion {
1148 final CompletableFuture<? extends T> src;
1149 final CompletableFuture<?> snd;
1150 final Runnable fn;
1151 final CompletableFuture<Void> dst;
1152 final Executor executor;
1153 AndRunnable(CompletableFuture<? extends T> src,
1154 CompletableFuture<?> snd,
1155 Runnable fn,
1156 CompletableFuture<Void> dst, Executor executor) {
1157 this.src = src; this.snd = snd;
1158 this.fn = fn; this.dst = dst;
1159 this.executor = executor;
1160 }
1161 public void run() {
1162 Object r, s; Throwable ex;
1163 final CompletableFuture<? extends T> a;
1164 final CompletableFuture<?> b;
1165 final Runnable fn;
1166 final CompletableFuture<Void> dst;
1167 if ((dst = this.dst) != null &&
1168 (fn = this.fn) != null &&
1169 (a = this.src) != null &&
1170 (r = a.result) != null &&
1171 (b = this.snd) != null &&
1172 (s = b.result) != null &&
1173 compareAndSet(0, 1)) {
1174 if (r instanceof AltResult) {
1175 if ((ex = ((AltResult)r).ex) != null) {
1176 dst.completeExceptionally(new RuntimeException(ex));
1177 return;
1178 }
1179 }
1180 if (s instanceof AltResult) {
1181 if ((ex = ((AltResult)s).ex) != null) {
1182 dst.completeExceptionally(new RuntimeException(ex));
1183 return;
1184 }
1185 }
1186 try {
1187 if (executor != null)
1188 executor.execute(new AsyncRunnable(fn, dst));
1189 else {
1190 fn.run();
1191 dst.complete(null);
1192 }
1193 } catch (Throwable rex) {
1194 dst.completeExceptionally(rex);
1195 }
1196 }
1197 }
1198 }
1199
1200 static final class OrFunction<T,U> extends Completion {
1201 final CompletableFuture<? extends T> src;
1202 final CompletableFuture<? extends T> snd;
1203 final Function<? super T,? extends U> fn;
1204 final CompletableFuture<U> dst;
1205 final Executor executor;
1206 OrFunction(CompletableFuture<? extends T> src,
1207 CompletableFuture<? extends T> snd,
1208 Function<? super T,? extends U> fn,
1209 CompletableFuture<U> dst, Executor executor) {
1210 this.src = src; this.snd = snd;
1211 this.fn = fn; this.dst = dst;
1212 this.executor = executor;
1213 }
1214 public void run() {
1215 Object r; T t; Throwable ex;
1216 CompletableFuture<? extends T> a;
1217 CompletableFuture<? extends T> b;
1218 Function<? super T,? extends U> fn;
1219 CompletableFuture<U> dst;
1220 if ((dst = this.dst) != null &&
1221 (fn = this.fn) != null &&
1222 (((a = this.src) != null && (r = a.result) != null) ||
1223 ((b = this.snd) != null && (r = b.result) != null)) &&
1224 compareAndSet(0, 1)) {
1225 if (r instanceof AltResult) {
1226 if ((ex = ((AltResult)r).ex) != null) {
1227 dst.completeExceptionally(new RuntimeException(ex));
1228 return;
1229 }
1230 t = null;
1231 }
1232 else
1233 t = (T) r;
1234 try {
1235 if (executor != null)
1236 executor.execute(new AsyncFunction(t, fn, dst));
1237 else
1238 dst.complete(fn.apply(t));
1239 } catch (Throwable rex) {
1240 dst.completeExceptionally(rex);
1241 }
1242 }
1243 }
1244 }
1245
1246 static final class OrRunnable<T> extends Completion {
1247 final CompletableFuture<? extends T> src;
1248 final CompletableFuture<?> snd;
1249 final Runnable fn;
1250 final CompletableFuture<Void> dst;
1251 final Executor executor;
1252 OrRunnable(CompletableFuture<? extends T> src,
1253 CompletableFuture<?> snd,
1254 Runnable fn,
1255 CompletableFuture<Void> dst, Executor executor) {
1256 this.src = src; this.snd = snd;
1257 this.fn = fn; this.dst = dst;
1258 this.executor = executor;
1259 }
1260 public void run() {
1261 Object r; Throwable ex;
1262 CompletableFuture<? extends T> a;
1263 final CompletableFuture<?> b;
1264 final Runnable fn;
1265 final CompletableFuture<Void> dst;
1266 if ((dst = this.dst) != null &&
1267 (fn = this.fn) != null &&
1268 (((a = this.src) != null && (r = a.result) != null) ||
1269 ((b = this.snd) != null && (r = b.result) != null)) &&
1270 compareAndSet(0, 1)) {
1271 if ((r instanceof AltResult) &&
1272 (ex = ((AltResult)r).ex) != null) {
1273 dst.completeExceptionally(new RuntimeException(ex));
1274 }
1275 else {
1276 try {
1277 if (executor != null)
1278 executor.execute(new AsyncRunnable(fn, dst));
1279 else {
1280 fn.run();
1281 dst.complete(null);
1282 }
1283 } catch (Throwable rex) {
1284 dst.completeExceptionally(rex);
1285 }
1286 }
1287 }
1288 }
1289 }
1290
1291 static final class ExceptionAction<T> extends Completion {
1292 final CompletableFuture<? extends T> src;
1293 final Block<? super Throwable> fn;
1294 final CompletableFuture<Void> dst;
1295 ExceptionAction(CompletableFuture<? extends T> src,
1296 Block<? super Throwable> fn,
1297 CompletableFuture<Void> dst) {
1298 this.src = src; this.fn = fn; this.dst = dst;
1299 }
1300 public void run() {
1301 CompletableFuture<? extends T> a;
1302 Block<? super Throwable> fn;
1303 CompletableFuture<Void> dst;
1304 Object r; Throwable ex;
1305 if ((dst = this.dst) != null &&
1306 (fn = this.fn) != null &&
1307 (a = this.src) != null &&
1308 (r = a.result) != null &&
1309 compareAndSet(0, 1)) {
1310 if ((r instanceof AltResult) &&
1311 (ex = ((AltResult)r).ex) != null) {
1312 try {
1313 fn.accept(ex);
1314 dst.complete(null);
1315 } catch (Throwable rex) {
1316 dst.completeExceptionally(rex);
1317 }
1318 }
1319 }
1320 }
1321 }
1322
1323 /* ------------- then/and/or implementations -------------- */
1324
1325 private <U> CompletableFuture<U> thenFunction(Function<? super T,? extends U> fn,
1326 Executor executor) {
1327
1328 if (fn == null) throw new NullPointerException();
1329 CompletableFuture<U> dst = new CompletableFuture<U>();
1330 ThenFunction<T,U> d = null;
1331 Object r;
1332 if ((r = result) == null) {
1333 CompletionNode p = new CompletionNode
1334 (d = new ThenFunction<T,U>(this, fn, dst, executor));
1335 while ((r = result) == null) {
1336 if (UNSAFE.compareAndSwapObject
1337 (this, COMPLETIONS, p.next = completions, p))
1338 break;
1339 }
1340 }
1341 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1342 T t; Throwable ex = null;
1343 if (r instanceof AltResult) {
1344 if ((ex = ((AltResult)r).ex) != null)
1345 dst.completeExceptionally(new RuntimeException(ex));
1346 t = null;
1347 }
1348 else
1349 t = (T) r;
1350 if (ex == null) {
1351 try {
1352 if (executor != null)
1353 executor.execute(new AsyncFunction(t, fn, dst));
1354 else
1355 dst.complete(fn.apply(t));
1356 } catch (Throwable rex) {
1357 dst.completeExceptionally(rex);
1358 }
1359 }
1360 postComplete();
1361 }
1362 return dst;
1363 }
1364
1365 private CompletableFuture<Void> thenRunnable(Runnable action,
1366 Executor executor) {
1367 if (action == null) throw new NullPointerException();
1368 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1369 ThenRunnable<T> d = null;
1370 Object r;
1371 if ((r = result) == null) {
1372 CompletionNode p = new CompletionNode
1373 (d = new ThenRunnable<T>(this, action, dst, executor));
1374 while ((r = result) == null) {
1375 if (UNSAFE.compareAndSwapObject
1376 (this, COMPLETIONS, p.next = completions, p))
1377 break;
1378 }
1379 }
1380 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1381 Throwable ex = null;
1382 if (r instanceof AltResult) {
1383 if ((ex = ((AltResult)r).ex) != null)
1384 dst.completeExceptionally(new RuntimeException(ex));
1385 }
1386 if (ex == null) {
1387 try {
1388 if (executor != null)
1389 executor.execute(new AsyncRunnable(action, dst));
1390 else {
1391 action.run();
1392 dst.complete(null);
1393 }
1394 } catch (Throwable rex) {
1395 dst.completeExceptionally(rex);
1396 }
1397 }
1398 postComplete();
1399 }
1400 return dst;
1401 }
1402
1403 private <U,V> CompletableFuture<V> andFunction(CompletableFuture<? extends U> other,
1404 BiFunction<? super T,? super U,? extends V> fn,
1405 Executor executor) {
1406 if (other == null || fn == null) throw new NullPointerException();
1407 CompletableFuture<V> dst = new CompletableFuture<V>();
1408 AndFunction<T,U,V> d = null;
1409 Object r, s = null;
1410 if ((r = result) == null || (s = other.result) == null) {
1411 d = new AndFunction<T,U,V>(this, other, fn, dst, executor);
1412 CompletionNode q = null, p = new CompletionNode(d);
1413 while ((r == null && (r = result) == null) ||
1414 (s == null && (s = other.result) == null)) {
1415 if (q != null) {
1416 if (s != null ||
1417 UNSAFE.compareAndSwapObject
1418 (other, COMPLETIONS, q.next = other.completions, q))
1419 break;
1420 }
1421 else if (r != null ||
1422 UNSAFE.compareAndSwapObject
1423 (this, COMPLETIONS, p.next = completions, p)) {
1424 if (s != null)
1425 break;
1426 q = new CompletionNode(d);
1427 }
1428 }
1429 }
1430 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1431 T t; U u; Throwable ex = null;
1432 if (r instanceof AltResult) {
1433 if ((ex = ((AltResult)r).ex) != null)
1434 dst.completeExceptionally(new RuntimeException(ex));
1435 t = null;
1436 }
1437 else
1438 t = (T) r;
1439 if (ex != null)
1440 u = null;
1441 else if (s instanceof AltResult) {
1442 if ((ex = ((AltResult)s).ex) != null)
1443 dst.completeExceptionally(new RuntimeException(ex));
1444 u = null;
1445 }
1446 else
1447 u = (U) s;
1448 if (ex == null) {
1449 try {
1450 if (executor != null)
1451 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1452 else
1453 dst.complete(fn.apply(t, u));
1454 } catch (Throwable rex) {
1455 dst.completeExceptionally(rex);
1456 }
1457 }
1458 }
1459 if (r != null)
1460 postComplete();
1461 if (s != null)
1462 other.postComplete();
1463 return dst;
1464 }
1465
1466 private CompletableFuture<Void> andRunnable(CompletableFuture<?> other,
1467 Runnable action,
1468 Executor executor) {
1469 if (other == null || action == null) throw new NullPointerException();
1470 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1471 AndRunnable<T> d = null;
1472 Object r, s = null;
1473 if ((r = result) == null || (s = other.result) == null) {
1474 d = new AndRunnable<T>(this, other, action, dst, executor);
1475 CompletionNode q = null, p = new CompletionNode(d);
1476 while ((r == null && (r = result) == null) ||
1477 (s == null && (s = other.result) == null)) {
1478 if (q != null) {
1479 if (s != null ||
1480 UNSAFE.compareAndSwapObject
1481 (other, COMPLETIONS, q.next = other.completions, q))
1482 break;
1483 }
1484 else if (r != null ||
1485 UNSAFE.compareAndSwapObject
1486 (this, COMPLETIONS, p.next = completions, p)) {
1487 if (s != null)
1488 break;
1489 q = new CompletionNode(d);
1490 }
1491 }
1492 }
1493 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1494 Throwable ex = null;
1495 if ((r instanceof AltResult) &&
1496 (ex = ((AltResult)r).ex) != null)
1497 dst.completeExceptionally(new RuntimeException(ex));
1498 else if ((s instanceof AltResult) &&
1499 (ex = ((AltResult)s).ex) != null)
1500 dst.completeExceptionally(new RuntimeException(ex));
1501 else {
1502 try {
1503 if (executor != null)
1504 executor.execute(new AsyncRunnable(action, dst));
1505 else {
1506 action.run();
1507 dst.complete(null);
1508 }
1509 } catch (Throwable rex) {
1510 dst.completeExceptionally(rex);
1511 }
1512 }
1513 }
1514 if (r != null)
1515 postComplete();
1516 if (s != null)
1517 other.postComplete();
1518 return dst;
1519 }
1520
1521 private <U> CompletableFuture<U> orFunction(CompletableFuture<? extends T> other,
1522 Function<? super T, U> fn,
1523 Executor executor) {
1524 if (other == null || fn == null) throw new NullPointerException();
1525 CompletableFuture<U> dst = new CompletableFuture<U>();
1526 OrFunction<T,U> d = null;
1527 Object r;
1528 if ((r = result) == null && (r = other.result) == null) {
1529 d = new OrFunction<T,U>(this, other, fn, dst, executor);
1530 CompletionNode q = null, p = new CompletionNode(d);
1531 while ((r = result) == null && (r = other.result) == null) {
1532 if (q != null) {
1533 if (UNSAFE.compareAndSwapObject
1534 (other, COMPLETIONS, q.next = other.completions, q))
1535 break;
1536 }
1537 else if (UNSAFE.compareAndSwapObject
1538 (this, COMPLETIONS, p.next = completions, p))
1539 q = new CompletionNode(d);
1540 }
1541 }
1542 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1543 T t; Throwable ex = null;
1544 if (r instanceof AltResult) {
1545 if ((ex = ((AltResult)r).ex) != null)
1546 dst.completeExceptionally(new RuntimeException(ex));
1547 t = null;
1548 }
1549 else
1550 t = (T) r;
1551 if (ex == null) {
1552 try {
1553 if (executor != null)
1554 executor.execute(new AsyncFunction(t, fn, dst));
1555 else
1556 dst.complete(fn.apply(t));
1557 } catch (Throwable rex) {
1558 dst.completeExceptionally(rex);
1559 }
1560 }
1561 }
1562 if (r != null) {
1563 if (result != null)
1564 postComplete();
1565 if (other.result != null)
1566 other.postComplete();
1567 }
1568 return dst;
1569 }
1570
1571 private CompletableFuture<Void> orRunnable(CompletableFuture<?> other,
1572 Runnable action,
1573 Executor executor) {
1574 if (other == null || action == null) throw new NullPointerException();
1575 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1576 OrRunnable<T> d = null;
1577 Object r;
1578 if ((r = result) == null && (r = other.result) == null) {
1579 d = new OrRunnable<T>(this, other, action, dst, executor);
1580 CompletionNode q = null, p = new CompletionNode(d);
1581 while ((r = result) == null && (r = other.result) == null) {
1582 if (q != null) {
1583 if (UNSAFE.compareAndSwapObject
1584 (other, COMPLETIONS, q.next = other.completions, q))
1585 break;
1586 }
1587 else if (UNSAFE.compareAndSwapObject
1588 (this, COMPLETIONS, p.next = completions, p))
1589 q = new CompletionNode(d);
1590 }
1591 }
1592 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1593 Throwable ex = null;
1594 if ((r instanceof AltResult) &&
1595 (ex = ((AltResult)r).ex) != null)
1596 dst.completeExceptionally(new RuntimeException(ex));
1597 else {
1598 try {
1599 if (executor != null)
1600 executor.execute(new AsyncRunnable(action, dst));
1601 else {
1602 action.run();
1603 dst.complete(null);
1604 }
1605 } catch (Throwable rex) {
1606 dst.completeExceptionally(rex);
1607 }
1608 }
1609 }
1610 if (r != null) {
1611 if (result != null)
1612 postComplete();
1613 if (other.result != null)
1614 other.postComplete();
1615 }
1616 return dst;
1617 }
1618
1619 // Unsafe mechanics
1620 private static final sun.misc.Unsafe UNSAFE;
1621 private static final long RESULT;
1622 private static final long WAITERS;
1623 private static final long COMPLETIONS;
1624 static {
1625 try {
1626 UNSAFE = sun.misc.Unsafe.getUnsafe();
1627 Class<?> k = CompletableFuture.class;
1628 RESULT = UNSAFE.objectFieldOffset
1629 (k.getDeclaredField("result"));
1630 WAITERS = UNSAFE.objectFieldOffset
1631 (k.getDeclaredField("waiters"));
1632 COMPLETIONS = UNSAFE.objectFieldOffset
1633 (k.getDeclaredField("completions"));
1634 } catch (Exception e) {
1635 throw new Error(e);
1636 }
1637 }
1638
1639 }