ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.6
Committed: Fri Dec 28 19:18:30 2012 UTC (11 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.5: +56 -39 lines
Log Message:
Allow exceptionally to compute value; rename force

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Function;
10 import java.util.function.BiFunction;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.ForkJoinPool;
14 import java.util.concurrent.ForkJoinTask;
15 import java.util.concurrent.Executor;
16 import java.util.concurrent.ThreadLocalRandom;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.TimeoutException;
19 import java.util.concurrent.CancellationException;
20 import java.util.concurrent.atomic.AtomicInteger;
21 import java.util.concurrent.locks.LockSupport;
22
23
24 /**
25 * A {@link Future} that may be explicitly completed (setting its
26 * value and status), and may include dependent functions and actions
27 * that trigger upon its completion.
28 *
29 * <p>Similar methods are available for function-based usages in
30 * which dependent stages typically propagate values, as well as
31 * result-less action-based usages, that are normally associated with
32 * {@code CompletableFuture<Void>} Futures. Functions and actions
33 * supplied for dependent completions using {@code then}, {@code
34 * andThen}, {@code orThen}, and {@code exceptionally} may be
35 * performed by the thread that completes the current
36 * CompletableFuture, or by any other caller of these methods. There
37 * are no guarantees about the order of processing completions unless
38 * constrained by method {@code then} and related methods.
39 *
40 * <p>When two or more threads attempt to {@link #complete} or {@link
41 * #completeExceptionally} a CompletableFuture, only one of them will
42 * succeed. Upon exceptional completion, or when a completion entails
43 * computation of a function or action, and it terminates abruptly
44 * with an exception, then further completions act as {@code
45 * completeExceptionally} with that exception.
46 *
47 * <p>CompletableFutures themselves do not execute asynchronously.
48 * However, the {@code async} methods provide commonly useful ways to
49 * to commence asynchronous processing, using either a given {@link
50 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
51 * function or action that will result in the completion of a new
52 * CompletableFuture.
53 *
54 * @author Doug Lea
55 * @since 1.8
56 */
57 public class CompletableFuture<T> implements Future<T> {
58 /*
59 * Quick overview (more to come):
60 *
61 * 1. Non-nullness of field result indicates done. An AltResult is
62 * used to box null as a result, as well as to hold exceptions.
63 *
64 * 2. Waiters are held in a Treiber stack similar to the one used
65 * in FutureTask
66 *
67 * 3. Completions are also kept in a list/stack, and pulled off
68 * and run when completion is triggered.
69 */
70
71 static final class AltResult {
72 final Throwable ex; // null only for NIL
73 AltResult(Throwable ex) { this.ex = ex; }
74 }
75
76 static final AltResult NIL = new AltResult(null);
77
78 /**
79 * Simple linked list nodes to record waiting threads in a Treiber
80 * stack. See other classes such as Phaser and SynchronousQueue
81 * for more detailed explanation.
82 */
83 static final class WaitNode {
84 volatile Thread thread;
85 volatile WaitNode next;
86 }
87
88 /**
89 * Simple linked list nodes to record completions, used in
90 * basically the same way as WaitNodes
91 */
92 static final class CompletionNode {
93 final Completion completion;
94 volatile CompletionNode next;
95 CompletionNode(Completion completion) { this.completion = completion; }
96 }
97
98
99 volatile Object result; // either the result or boxed AltResult
100 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
101 volatile CompletionNode completions; // list (Treiber stack) of completions
102
103 /**
104 * Creates a new incomplete CompletableFuture.
105 */
106 public CompletableFuture() {
107 }
108
109 /**
110 * Asynchronously executes in the {@link
111 * ForkJoinPool#commonPool()}, a task that completes the returned
112 * CompletableFuture with the result of the given Supplier.
113 *
114 * @param supplier a function returning the value to be used
115 * to complete the returned CompletableFuture.
116 * @return the CompletableFuture.
117 */
118 public static <U> CompletableFuture<U> async(Supplier<U> supplier) {
119 if (supplier == null) throw new NullPointerException();
120 CompletableFuture<U> f = new CompletableFuture<U>();
121 ForkJoinPool.commonPool().
122 execute((ForkJoinTask<?>)new AsyncSupplier(supplier, f));
123 return f;
124 }
125
126 /**
127 * Asynchronously executes using the given executor, a task that
128 * completes the returned CompletableFuture with the result of the
129 * given Supplier.
130 *
131 * @param supplier a function returning the value to be used
132 * to complete the returned CompletableFuture.
133 * @param executor the executor to use for asynchronous execution
134 * @return the CompletableFuture.
135 */
136 public static <U> CompletableFuture<U> async(Supplier<U> supplier,
137 Executor executor) {
138 if (executor == null || supplier == null)
139 throw new NullPointerException();
140 CompletableFuture<U> f = new CompletableFuture<U>();
141 executor.execute(new AsyncSupplier(supplier, f));
142 return f;
143 }
144
145 /**
146 * Asynchronously executes in the {@link
147 * ForkJoinPool#commonPool()} a task that runs the given action,
148 * and then completes the returned CompletableFuture
149 *
150 * @param runnable the action to run before completing the
151 * returned CompletableFuture.
152 * @return the CompletableFuture.
153 */
154 public static CompletableFuture<Void> async(Runnable runnable) {
155 if (runnable == null) throw new NullPointerException();
156 CompletableFuture<Void> f = new CompletableFuture<Void>();
157 ForkJoinPool.commonPool().
158 execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
159 return f;
160 }
161
162 /**
163 * Asynchronously executes using the given executor, a task that
164 * runs the given action, and then completes the returned
165 * CompletableFuture
166 *
167 * @param runnable the action to run before completing the
168 * returned CompletableFuture.
169 * @param executor the executor to use for asynchronous execution
170 * @return the CompletableFuture.
171 */
172 public static CompletableFuture<Void> async(Runnable runnable,
173 Executor executor) {
174 if (executor == null || runnable == null)
175 throw new NullPointerException();
176 CompletableFuture<Void> f = new CompletableFuture<Void>();
177 executor.execute(new AsyncRunnable(runnable, f));
178 return f;
179 }
180
181 /**
182 * Returns {@code true} if completed in any fashion: normally,
183 * exceptionally, or cancellation.
184 *
185 * @return {@code true} if completed
186 */
187 public boolean isDone() {
188 return result != null;
189 }
190
191 /**
192 * Returns the result value when complete, or throws an
193 * (unchecked) exception if completed exceptionally. To better
194 * conform with the use of common functional forms, this method
195 * transforms any checked exception possible with {@link
196 * Future#get} into an (unchecked) {@link RuntimeException} with
197 * the underlying exception as its cause. (The checked exception
198 * convention is available using the timed form of get.)
199 *
200 * @return the result value
201 */
202 public T get() {
203 Object r; Throwable ex;
204 if ((r = result) == null)
205 return waitingGet();
206 if (r instanceof AltResult) {
207 if ((ex = ((AltResult)r).ex) != null) {
208 if (ex instanceof Error)
209 throw (Error)ex;
210 if (ex instanceof RuntimeException)
211 throw (RuntimeException)ex;
212 throw new RuntimeException(ex);
213 }
214 return null;
215 }
216 return (T)r;
217 }
218
219 /**
220 * Returns the result value (or throws any encountered exception)
221 * if completed, else returns the given valueIfAbsent.
222 *
223 * @param valueIfAbsent the value to return if not completed
224 * @return the result value, if completed, else the given valueIfAbsent
225 */
226 public T getNow(T valueIfAbsent) {
227 Object r; Throwable ex;
228 if ((r = result) == null)
229 return valueIfAbsent;
230 if (r instanceof AltResult) {
231 if ((ex = ((AltResult)r).ex) != null) {
232 if (ex instanceof Error)
233 throw (Error)ex;
234 if (ex instanceof RuntimeException)
235 throw (RuntimeException)ex;
236 throw new RuntimeException(ex);
237 }
238 return null;
239 }
240 return (T)r;
241 }
242
243 /**
244 * Waits if necessary for at most the given time for completion,
245 * and then retrieves its result, if available.
246 *
247 * @param timeout the maximum time to wait
248 * @param unit the time unit of the timeout argument
249 * @return the computed result
250 * @throws CancellationException if the computation was cancelled
251 * @throws ExecutionException if the computation threw an
252 * exception
253 * @throws InterruptedException if the current thread was interrupted
254 * while waiting
255 * @throws TimeoutException if the wait timed out
256 */
257 public T get(long timeout, TimeUnit unit)
258 throws InterruptedException, ExecutionException, TimeoutException {
259 Object r; Throwable ex;
260 long nanos = unit.toNanos(timeout);
261 if (Thread.interrupted())
262 throw new InterruptedException();
263 if ((r = result) == null)
264 r = timedAwaitDone(nanos);
265 if (r instanceof AltResult) {
266 if ((ex = ((AltResult)r).ex) != null) {
267 if (ex instanceof ExecutionException) // avoid re-wrap
268 throw (ExecutionException)ex;
269 throw new ExecutionException(ex);
270 }
271 return null;
272 }
273 return (T)r;
274 }
275
276 /**
277 * If not already completed, sets the value returned by {@link
278 * #get()} and related methods to the given value.
279 *
280 * @param value the result value
281 * @return true if this invocation caused this CompletableFuture
282 * to transition to a completed state, else false.
283 */
284 public boolean complete(T value) {
285 if (result == null &&
286 UNSAFE.compareAndSwapObject(this, RESULT, null,
287 (value == null) ? NIL : value)) {
288 postComplete();
289 return true;
290 }
291 return false;
292 }
293
294 /**
295 * If not already completed, causes invocations of {@link #get()}
296 * and related methods to throw the given exception.
297 *
298 * @param ex the exception
299 * @return true if this invocation caused this CompletableFuture
300 * to transition to a completed state, else false.
301 */
302 public boolean completeExceptionally(Throwable ex) {
303 if (ex == null) throw new NullPointerException();
304 if (result == null) {
305 Object r = new AltResult(ex);
306 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
307 postComplete();
308 return true;
309 }
310 }
311 return false;
312 }
313
314 /**
315 * Creates and returns a CompletableFuture that is completed with
316 * the result of the given function of this CompletableFuture.
317 * If this CompletableFuture completes exceptionally,
318 * then the returned CompletableFuture also does so,
319 * with a RuntimeException having this exception as
320 * its cause.
321 *
322 * @param fn the function to use to compute the value of
323 * the returned CompletableFuture
324 * @return the new CompletableFuture
325 */
326 public <U> CompletableFuture<U> then(Function<? super T,? extends U> fn) {
327 return thenFunction(fn, null);
328 }
329
330 /**
331 * Creates and returns a CompletableFuture that is asynchronously
332 * completed using the {@link ForkJoinPool#commonPool()} with the
333 * result of the given function of this CompletableFuture. If
334 * this CompletableFuture completes exceptionally, then the
335 * returned CompletableFuture also does so, with a
336 * RuntimeException having this exception as its cause.
337 *
338 * @param fn the function to use to compute the value of
339 * the returned CompletableFuture
340 * @return the new CompletableFuture
341 */
342 public <U> CompletableFuture<U> thenAsync(Function<? super T,? extends U> fn) {
343 return thenFunction(fn, ForkJoinPool.commonPool());
344 }
345
346 /**
347 * Creates and returns a CompletableFuture that is asynchronously
348 * completed using the given executor with the result of the given
349 * function of this CompletableFuture. If this CompletableFuture
350 * completes exceptionally, then the returned CompletableFuture
351 * also does so, with a RuntimeException having this exception as
352 * its cause.
353 *
354 * @param fn the function to use to compute the value of
355 * the returned CompletableFuture
356 * @param executor the executor to use for asynchronous execution
357 * @return the new CompletableFuture
358 */
359 public <U> CompletableFuture<U> thenAsync(Function<? super T,? extends U> fn,
360 Executor executor) {
361 if (executor == null) throw new NullPointerException();
362 return thenFunction(fn, executor);
363 }
364
365 /**
366 * Creates and returns a CompletableFuture that is completed after
367 * performing the given action if/when this CompletableFuture
368 * completes. If this CompletableFuture completes exceptionally,
369 * then the returned CompletableFuture also does so, with a
370 * RuntimeException having this exception as its cause.
371 *
372 * @param action the action to perform before completing the
373 * returned CompletableFuture
374 * @return the new CompletableFuture
375 */
376 public CompletableFuture<Void> then(Runnable action) {
377 return thenRunnable(action, null);
378 }
379
380 /**
381 * Creates and returns a CompletableFuture that is asynchronously
382 * completed using the {@link ForkJoinPool#commonPool()} after
383 * performing the given action if/when this CompletableFuture
384 * completes. If this CompletableFuture completes exceptionally,
385 * then the returned CompletableFuture also does so, with a
386 * RuntimeException having this exception as its cause.
387 *
388 * @param action the action to perform before completing the
389 * returned CompletableFuture
390 * @return the new CompletableFuture
391 */
392 public CompletableFuture<Void> thenAsync(Runnable action) {
393 return thenRunnable(action, ForkJoinPool.commonPool());
394 }
395
396 /**
397 * Creates and returns a CompletableFuture that is asynchronously
398 * completed using the given executor after performing the given
399 * action if/when this CompletableFuture completes. If this
400 * CompletableFuture completes exceptionally, then the returned
401 * CompletableFuture also does so, with a RuntimeException having
402 * this exception as its cause.
403 *
404 * @param action the action to perform before completing the
405 * returned CompletableFuture
406 * @param executor the executor to use for asynchronous execution
407 * @return the new CompletableFuture
408 */
409 public CompletableFuture<Void> thenAsync(Runnable action, Executor executor) {
410 if (executor == null) throw new NullPointerException();
411 return thenRunnable(action, executor);
412 }
413
414 /**
415 * Creates and returns a CompletableFuture that is completed with
416 * the result of the given function of this and the other given
417 * CompletableFuture's results if/when both complete. If this or
418 * the other CompletableFuture complete exceptionally, then the
419 * returned CompletableFuture also does so, with a
420 * RuntimeException having the exception as its cause.
421 *
422 * @param other the other CompletableFuture
423 * @param fn the function to use to compute the value of
424 * the returned CompletableFuture
425 * @return the new CompletableFuture
426 */
427 public <U,V> CompletableFuture<V> andThen(CompletableFuture<? extends U> other,
428 BiFunction<? super T,? super U,? extends V> fn) {
429 return andFunction(other, fn, null);
430 }
431
432 /**
433 * Creates and returns a CompletableFuture that is asynchronously
434 * completed using the {@link ForkJoinPool#commonPool()} with
435 * the result of the given function of this and the other given
436 * CompletableFuture's results if/when both complete. If this or
437 * the other CompletableFuture complete exceptionally, then the
438 * returned CompletableFuture also does so, with a
439 * RuntimeException having the exception as its cause.
440 *
441 * @param other the other CompletableFuture
442 * @param fn the function to use to compute the value of
443 * the returned CompletableFuture
444 * @return the new CompletableFuture
445 */
446 public <U,V> CompletableFuture<V> andThenAsync(CompletableFuture<? extends U> other,
447 BiFunction<? super T,? super U,? extends V> fn) {
448 return andFunction(other, fn, ForkJoinPool.commonPool());
449 }
450
451 /**
452 * Creates and returns a CompletableFuture that is
453 * asynchronously completed using the given executor with the
454 * result of the given function of this and the other given
455 * CompletableFuture's results if/when both complete. If this or
456 * the other CompletableFuture complete exceptionally, then the
457 * returned CompletableFuture also does so, with a
458 * RuntimeException having the exception as its cause.
459 *
460 * @param other the other CompletableFuture
461 * @param fn the function to use to compute the value of
462 * the returned CompletableFuture
463 * @param executor the executor to use for asynchronous execution
464 * @return the new CompletableFuture
465 */
466
467 public <U,V> CompletableFuture<V> andThenAsync(CompletableFuture<? extends U> other,
468 BiFunction<? super T,? super U,? extends V> fn,
469 Executor executor) {
470 if (executor == null) throw new NullPointerException();
471 return andFunction(other, fn, executor);
472 }
473
474 /**
475 * Creates and returns a CompletableFuture that is completed
476 * if/when this and the other given CompletableFuture both
477 * complete. If this and/or the other CompletableFuture complete
478 * exceptionally, then the returned CompletableFuture also does
479 * so, with a RuntimeException having the one of the exceptions as
480 * its cause.
481 *
482 * @param other the other CompletableFuture
483 * @param action the action to perform before completing the
484 * returned CompletableFuture
485 * @return the new CompletableFuture
486 */
487 public CompletableFuture<Void> andThen(CompletableFuture<?> other,
488 Runnable action) {
489 return andRunnable(other, action, null);
490 }
491
492 /**
493 * Creates and returns a CompletableFuture that is completed
494 * asynchronously using the {@link ForkJoinPool#commonPool()}
495 * if/when this and the other given CompletableFuture both
496 * complete. If this and/or the other CompletableFuture complete
497 * exceptionally, then the returned CompletableFuture also does
498 * so, with a RuntimeException having the one of the exceptions as
499 * its cause.
500 *
501 * @param other the other CompletableFuture
502 * @param action the action to perform before completing the
503 * returned CompletableFuture
504 * @return the new CompletableFuture
505 */
506 public CompletableFuture<Void> andThenAsync(CompletableFuture<?> other,
507 Runnable action) {
508 return andRunnable(other, action, ForkJoinPool.commonPool());
509 }
510
511 /**
512 * Creates and returns a CompletableFuture that is completed
513 * asynchronously using the given executor
514 * if/when this and the other given CompletableFuture both
515 * complete. If this and/or the other CompletableFuture complete
516 * exceptionally, then the returned CompletableFuture also does
517 * so, with a RuntimeException having the one of the exceptions as
518 * its cause.
519 *
520 * @param other the other CompletableFuture
521 * @param action the action to perform before completing the
522 * returned CompletableFuture
523 * @param executor the executor to use for asynchronous execution
524 * @return the new CompletableFuture
525 */
526 public CompletableFuture<Void> andThenAsync(CompletableFuture<?> other,
527 Runnable action,
528 Executor executor) {
529 if (executor == null) throw new NullPointerException();
530 return andRunnable(other, action, executor);
531 }
532
533 /**
534 * Creates and returns a CompletableFuture that is completed with
535 * the result of the given function of either this or the other
536 * given CompletableFuture's results if/when either complete. If
537 * this and/or the other CompletableFuture complete exceptionally,
538 * then the returned CompletableFuture may also do so, with a
539 * RuntimeException having one of these exceptions as its cause.
540 * No guarantees are made about which result or exception is used
541 * in the returned CompletableFuture.
542 *
543 * @param other the other CompletableFuture
544 * @param fn the function to use to compute the value of
545 * the returned CompletableFuture
546 * @return the new CompletableFuture
547 */
548 public <U> CompletableFuture<U> orThen(CompletableFuture<? extends T> other,
549 Function<? super T, U> fn) {
550 return orFunction(other, fn, null);
551 }
552
553 /**
554 * Creates and returns a CompletableFuture that is completed
555 * asynchronously using the {@link ForkJoinPool#commonPool()} with
556 * the result of the given function of either this or the other
557 * given CompletableFuture's results if/when either complete. If
558 * this and/or the other CompletableFuture complete exceptionally,
559 * then the returned CompletableFuture may also do so, with a
560 * RuntimeException having one of these exceptions as its cause.
561 * No guarantees are made about which result or exception is used
562 * in the returned CompletableFuture.
563 *
564 * @param other the other CompletableFuture
565 * @param fn the function to use to compute the value of
566 * the returned CompletableFuture
567 * @return the new CompletableFuture
568 */
569 public <U> CompletableFuture<U> orThenAsync(CompletableFuture<? extends T> other,
570 Function<? super T, U> fn) {
571 return orFunction(other, fn, ForkJoinPool.commonPool());
572 }
573
574 /**
575 * Creates and returns a CompletableFuture that is completed
576 * asynchronously using the given executor with the result of the
577 * given function of either this or the other given
578 * CompletableFuture's results if/when either complete. If this
579 * and/or the other CompletableFuture complete exceptionally, then
580 * the returned CompletableFuture may also do so, with a
581 * RuntimeException having one of these exceptions as its cause.
582 * No guarantees are made about which result or exception is used
583 * in the returned CompletableFuture.
584 *
585 * @param other the other CompletableFuture
586 * @param fn the function to use to compute the value of
587 * the returned CompletableFuture
588 * @param executor the executor to use for asynchronous execution
589 * @return the new CompletableFuture
590 */
591 public <U> CompletableFuture<U> orThen(CompletableFuture<? extends T> other,
592 Function<? super T, U> fn,
593 Executor executor) {
594 if (executor == null) throw new NullPointerException();
595 return orFunction(other, fn, executor);
596 }
597
598 /**
599 * Creates and returns a CompletableFuture that is completed
600 * after this or the other given CompletableFuture complete. If
601 * this and/or the other CompletableFuture complete exceptionally,
602 * then the returned CompletableFuture may also do so, with a
603 * RuntimeException having one of these exceptions as its cause.
604 * No guarantees are made about which exception is used in the
605 * returned CompletableFuture.
606 *
607 * @param other the other CompletableFuture
608 * @param action the action to perform before completing the
609 * returned CompletableFuture
610 * @return the new CompletableFuture
611 */
612 public CompletableFuture<Void> orThen(CompletableFuture<?> other,
613 Runnable action) {
614 return orRunnable(other, action, null);
615 }
616
617 /**
618 * Creates and returns a CompletableFuture that is completed
619 * asynchronously using the {@link ForkJoinPool#commonPool()}
620 * after this or the other given CompletableFuture complete. If
621 * this and/or the other CompletableFuture complete exceptionally,
622 * then the returned CompletableFuture may also do so, with a
623 * RuntimeException having one of these exceptions as its cause.
624 * No guarantees are made about which exception is used in the
625 * returned CompletableFuture.
626 *
627 * @param other the other CompletableFuture
628 * @param action the action to perform before completing the
629 * returned CompletableFuture
630 * @return the new CompletableFuture
631 */
632 public CompletableFuture<Void> orThenAsync(CompletableFuture<?> other,
633 Runnable action) {
634 return orRunnable(other, action, ForkJoinPool.commonPool());
635 }
636
637 /**
638 * Creates and returns a CompletableFuture that is completed
639 * asynchronously using the given executor after this or the other
640 * given CompletableFuture complete. If this and/or the other
641 * CompletableFuture complete exceptionally, then the returned
642 * CompletableFuture may also do so, with a RuntimeException
643 * having one of these exceptions as its cause. No guarantees are
644 * made about which exception is used in the returned
645 * CompletableFuture.
646 *
647 * @param other the other CompletableFuture
648 * @param action the action to perform before completing the
649 * returned CompletableFuture
650 * @param executor the executor to use for asynchronous execution
651 * @return the new CompletableFuture
652 */
653 public CompletableFuture<Void> orThenAsync(CompletableFuture<?> other,
654 Runnable action,
655 Executor executor) {
656 if (executor == null) throw new NullPointerException();
657 return orRunnable(other, action, executor);
658 }
659
660 /**
661 * Creates and returns a CompletableFuture that is completed with
662 * the result of the given function of the exception triggering
663 * this CompletableFuture's completion if/when it completes
664 * exceptionally; Otherwise, if this CompletableFuture completes
665 * normally, then the returned CompletableFuture also completes
666 * normally with the same value.
667 *
668 * @param fn the function to use to compute the value of the
669 * returned CompletableFuture if this CompletableFuture completed
670 * exceptionally
671 * @return the new CompletableFuture
672 */
673 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
674 if (fn == null) throw new NullPointerException();
675 CompletableFuture<T> dst = new CompletableFuture<T>();
676 ExceptionAction<T> d = null;
677 Object r;
678 if ((r = result) == null) {
679 CompletionNode p =
680 new CompletionNode(d = new ExceptionAction<T>(this, fn, dst));
681 while ((r = result) == null) {
682 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
683 p.next = completions, p))
684 break;
685 }
686 }
687 if (r != null && (d == null || d.compareAndSet(0, 1))) {
688 T t; Throwable ex = null;
689 if (r instanceof AltResult) {
690 if ((ex = ((AltResult)r).ex) != null) {
691 try {
692 dst.complete(fn.apply(ex));
693 } catch (Throwable rex) {
694 dst.completeExceptionally(rex);
695 }
696 }
697 t = null;
698 }
699 else
700 t = (T) r;
701 if (ex == null)
702 dst.complete(t);
703 }
704 if (r != null)
705 postComplete();
706 return dst;
707 }
708
709 /**
710 * Attempts to complete this CompletableFuture with
711 * a {@link CancellationException}.
712 *
713 * @param mayInterruptIfRunning this value has no effect in this
714 * implementation because interrupts are not used to control
715 * processing.
716 *
717 * @return {@code true} if this task is now cancelled
718 */
719 public boolean cancel(boolean mayInterruptIfRunning) {
720 Object r;
721 while ((r = result) == null) {
722 r = new AltResult(new CancellationException());
723 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
724 postComplete();
725 return true;
726 }
727 }
728 return ((r instanceof AltResult) &&
729 (((AltResult)r).ex instanceof CancellationException));
730 }
731
732 /**
733 * Returns {@code true} if this CompletableFuture was cancelled
734 * before it completed normally.
735 *
736 * @return {@code true} if this CompletableFuture was cancelled
737 * before it completed normally
738 */
739 public boolean isCancelled() {
740 Object r;
741 return ((r = result) != null &&
742 (r instanceof AltResult) &&
743 (((AltResult)r).ex instanceof CancellationException));
744 }
745
746 /**
747 * Forcibly sets or resets the value subsequently returned by
748 * method get() and related methods, whether or not already
749 * completed. This method is designed for use only in error
750 * recovery actions, and even in such situations may result in
751 * ongoing dependent completions using established versus
752 * overwritten values.
753 *
754 * @param value the completion value
755 */
756 public void obtrudeValue(T value) {
757 result = (value == null) ? NIL : value;
758 postComplete();
759 }
760
761 /**
762 * Removes and signals all waiting threads and runs all completions
763 */
764 private void postComplete() {
765 WaitNode q; Thread t;
766 while ((q = waiters) != null) {
767 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
768 (t = q.thread) != null) {
769 q.thread = null;
770 LockSupport.unpark(t);
771 }
772 }
773
774 CompletionNode h; Completion c;
775 while ((h = completions) != null) {
776 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
777 (c = h.completion) != null)
778 c.run();
779 }
780 }
781
782 /* ------------- waiting for completions -------------- */
783
784 /**
785 * Heuristic spin value for waitingGet() before blocking on
786 * multiprocessors
787 */
788 static final int WAITING_GET_SPINS = 256;
789
790 /**
791 * Returns result after waiting.
792 */
793 private T waitingGet() {
794 WaitNode q = null;
795 boolean queued = false, interrupted = false;
796 int h = 0, spins = 0;
797 for (Object r;;) {
798 if ((r = result) != null) {
799 Throwable ex;
800 if (q != null) // suppress unpark
801 q.thread = null;
802 postComplete(); // help release others
803 if (interrupted)
804 Thread.currentThread().interrupt();
805 if (r instanceof AltResult) {
806 if ((ex = ((AltResult)r).ex) != null) {
807 if (ex instanceof Error)
808 throw (Error)ex;
809 if (ex instanceof RuntimeException)
810 throw (RuntimeException)ex;
811 throw new RuntimeException(ex);
812 }
813 return null;
814 }
815 return (T)r;
816 }
817 else if (h == 0) {
818 h = ThreadLocalRandom.current().nextInt();
819 if (Runtime.getRuntime().availableProcessors() > 1)
820 spins = WAITING_GET_SPINS;
821 }
822 else if (spins > 0) {
823 h ^= h << 1; // xorshift
824 h ^= h >>> 3;
825 if ((h ^= h << 10) >= 0)
826 --spins;
827 }
828 else if (q == null)
829 q = new WaitNode();
830 else if (!queued)
831 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
832 q.next = waiters, q);
833 else if (Thread.interrupted())
834 interrupted = true;
835 else if (q.thread == null)
836 q.thread = Thread.currentThread();
837 else
838 LockSupport.park(this);
839 }
840 }
841
842 /**
843 * Awaits completion or aborts on interrupt or timeout.
844 *
845 * @param nanos time to wait
846 * @return raw result
847 */
848 private Object timedAwaitDone(long nanos)
849 throws InterruptedException, TimeoutException {
850 final long deadline = System.nanoTime() + nanos;
851 WaitNode q = null;
852 boolean queued = false;
853 for (Object r;;) {
854 if (Thread.interrupted()) {
855 removeWaiter(q);
856 throw new InterruptedException();
857 }
858 else if ((r = result) != null) {
859 if (q != null)
860 q.thread = null;
861 postComplete();
862 return r;
863 }
864 else if (q == null)
865 q = new WaitNode();
866 else if (!queued)
867 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
868 q.next = waiters, q);
869 else if ((nanos = deadline - System.nanoTime()) <= 0L) {
870 removeWaiter(q);
871 throw new TimeoutException();
872 }
873 else if (q.thread == null)
874 q.thread = Thread.currentThread();
875 else
876 LockSupport.parkNanos(this, nanos);
877 }
878 }
879
880 /**
881 * Tries to unlink a timed-out or interrupted wait node to avoid
882 * accumulating garbage. Internal nodes are simply unspliced
883 * without CAS since it is harmless if they are traversed anyway
884 * by releasers. To avoid effects of unsplicing from already
885 * removed nodes, the list is retraversed in case of an apparent
886 * race. This is slow when there are a lot of nodes, but we don't
887 * expect lists to be long enough to outweigh higher-overhead
888 * schemes.
889 */
890 private void removeWaiter(WaitNode node) {
891 if (node != null) {
892 node.thread = null;
893 retry:
894 for (;;) { // restart on removeWaiter race
895 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
896 s = q.next;
897 if (q.thread != null)
898 pred = q;
899 else if (pred != null) {
900 pred.next = s;
901 if (pred.thread == null) // check for race
902 continue retry;
903 }
904 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
905 continue retry;
906 }
907 break;
908 }
909 }
910 }
911
912 /* ------------- Async tasks -------------- */
913
914 /** Base class can act as either FJ or plain Runnable */
915 static abstract class Async extends ForkJoinTask<Void> implements Runnable {
916 public final Void getRawResult() { return null; }
917 public final void setRawResult(Void v) { }
918 public final void run() { exec(); }
919 }
920
921 static final class AsyncRunnable extends Async {
922 final Runnable runnable;
923 final CompletableFuture<Void> dst;
924 AsyncRunnable(Runnable runnable, CompletableFuture<Void> dst) {
925 this.runnable = runnable; this.dst = dst;
926 }
927 public final boolean exec() {
928 Runnable fn;
929 CompletableFuture<Void> d;
930 if ((fn = this.runnable) == null || (d = this.dst) == null)
931 throw new NullPointerException();
932 try {
933 fn.run();
934 d.complete(null);
935 } catch (Throwable ex) {
936 d.completeExceptionally(ex);
937 }
938 return true;
939 }
940 private static final long serialVersionUID = 5232453952276885070L;
941 }
942
943 static final class AsyncSupplier<U> extends Async {
944 final Supplier<U> supplier;
945 final CompletableFuture<U> dst;
946 AsyncSupplier(Supplier<U> supplier, CompletableFuture<U> dst) {
947 this.supplier = supplier; this.dst = dst;
948 }
949 public final boolean exec() {
950 Supplier<U> fn;
951 CompletableFuture<U> d;
952 if ((fn = this.supplier) == null || (d = this.dst) == null)
953 throw new NullPointerException();
954 try {
955 d.complete(fn.get());
956 } catch (Throwable ex) {
957 d.completeExceptionally(ex);
958 }
959 return true;
960 }
961 private static final long serialVersionUID = 5232453952276885070L;
962 }
963
964 static final class AsyncFunction<T,U> extends Async {
965 Function<? super T,? extends U> fn;
966 T arg;
967 final CompletableFuture<U> dst;
968 AsyncFunction(T arg, Function<? super T,? extends U> fn,
969 CompletableFuture<U> dst) {
970 this.arg = arg; this.fn = fn; this.dst = dst;
971 }
972 public final boolean exec() {
973 Function<? super T,? extends U> fn;
974 CompletableFuture<U> d;
975 if ((fn = this.fn) == null || (d = this.dst) == null)
976 throw new NullPointerException();
977 try {
978 d.complete(fn.apply(arg));
979 } catch (Throwable ex) {
980 d.completeExceptionally(ex);
981 }
982 return true;
983 }
984 private static final long serialVersionUID = 5232453952276885070L;
985 }
986
987 static final class AsyncBiFunction<T,U,V> extends Async {
988 final BiFunction<? super T,? super U,? extends V> fn;
989 final T arg1;
990 final U arg2;
991 final CompletableFuture<V> dst;
992 AsyncBiFunction(T arg1, U arg2,
993 BiFunction<? super T,? super U,? extends V> fn,
994 CompletableFuture<V> dst) {
995 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
996 }
997 public final boolean exec() {
998 BiFunction<? super T,? super U,? extends V> fn;
999 CompletableFuture<V> d;
1000 if ((fn = this.fn) == null || (d = this.dst) == null)
1001 throw new NullPointerException();
1002 try {
1003 d.complete(fn.apply(arg1, arg2));
1004 } catch (Throwable ex) {
1005 d.completeExceptionally(ex);
1006 }
1007 return true;
1008 }
1009 private static final long serialVersionUID = 5232453952276885070L;
1010 }
1011
1012 /* ------------- Completions -------------- */
1013
1014 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1015 static abstract class Completion extends AtomicInteger implements Runnable {
1016 }
1017
1018 static final class ThenFunction<T,U> extends Completion {
1019 final CompletableFuture<? extends T> src;
1020 final Function<? super T,? extends U> fn;
1021 final CompletableFuture<U> dst;
1022 final Executor executor;
1023 ThenFunction(CompletableFuture<? extends T> src,
1024 final Function<? super T,? extends U> fn,
1025 final CompletableFuture<U> dst, Executor executor) {
1026 this.src = src; this.fn = fn; this.dst = dst;
1027 this.executor = executor;
1028 }
1029 public void run() {
1030 CompletableFuture<? extends T> a;
1031 Function<? super T,? extends U> fn;
1032 CompletableFuture<U> dst;
1033 Object r; T t; Throwable ex;
1034 if ((dst = this.dst) != null &&
1035 (fn = this.fn) != null &&
1036 (a = this.src) != null &&
1037 (r = a.result) != null &&
1038 compareAndSet(0, 1)) {
1039 if (r instanceof AltResult) {
1040 if ((ex = ((AltResult)r).ex) != null) {
1041 dst.completeExceptionally(new RuntimeException(ex));
1042 return;
1043 }
1044 t = null;
1045 }
1046 else
1047 t = (T) r;
1048 try {
1049 if (executor != null)
1050 executor.execute(new AsyncFunction(t, fn, dst));
1051 else
1052 dst.complete(fn.apply(t));
1053 } catch (Throwable rex) {
1054 dst.completeExceptionally(rex);
1055 }
1056 }
1057 }
1058 }
1059
1060 static final class ThenRunnable<T> extends Completion {
1061 final CompletableFuture<? extends T> src;
1062 final Runnable fn;
1063 final CompletableFuture<Void> dst;
1064 final Executor executor;
1065 ThenRunnable(CompletableFuture<? extends T> src,
1066 Runnable fn,
1067 CompletableFuture<Void> dst,
1068 Executor executor) {
1069 this.src = src; this.fn = fn; this.dst = dst;
1070 this.executor = executor;
1071 }
1072 public void run() {
1073 CompletableFuture<? extends T> a;
1074 Runnable fn;
1075 CompletableFuture<Void> dst;
1076 Object r; Throwable ex;
1077 if ((dst = this.dst) != null &&
1078 (fn = this.fn) != null &&
1079 (a = this.src) != null &&
1080 (r = a.result) != null &&
1081 compareAndSet(0, 1)) {
1082 if (r instanceof AltResult) {
1083 if ((ex = ((AltResult)r).ex) != null) {
1084 dst.completeExceptionally(new RuntimeException(ex));
1085 return;
1086 }
1087 }
1088 try {
1089 if (executor != null)
1090 executor.execute(new AsyncRunnable(fn, dst));
1091 else {
1092 fn.run();
1093 dst.complete(null);
1094 }
1095 } catch (Throwable rex) {
1096 dst.completeExceptionally(rex);
1097 }
1098 }
1099 }
1100 }
1101
1102 static final class AndFunction<T,U,V> extends Completion {
1103 final CompletableFuture<? extends T> src;
1104 final CompletableFuture<? extends U> snd;
1105 final BiFunction<? super T,? super U,? extends V> fn;
1106 final CompletableFuture<V> dst;
1107 final Executor executor;
1108 AndFunction(CompletableFuture<? extends T> src,
1109 CompletableFuture<? extends U> snd,
1110 BiFunction<? super T,? super U,? extends V> fn,
1111 CompletableFuture<V> dst, Executor executor) {
1112 this.src = src; this.snd = snd;
1113 this.fn = fn; this.dst = dst;
1114 this.executor = executor;
1115 }
1116 public void run() {
1117 Object r, s; T t; U u; Throwable ex;
1118 CompletableFuture<? extends T> a;
1119 CompletableFuture<? extends U> b;
1120 BiFunction<? super T,? super U,? extends V> fn;
1121 CompletableFuture<V> dst;
1122 if ((dst = this.dst) != null &&
1123 (fn = this.fn) != null &&
1124 (a = this.src) != null &&
1125 (r = a.result) != null &&
1126 (b = this.snd) != null &&
1127 (s = b.result) != null &&
1128 compareAndSet(0, 1)) {
1129 if (r instanceof AltResult) {
1130 if ((ex = ((AltResult)r).ex) != null) {
1131 dst.completeExceptionally(new RuntimeException(ex));
1132 return;
1133 }
1134 t = null;
1135 }
1136 else
1137 t = (T) r;
1138 if (s instanceof AltResult) {
1139 if ((ex = ((AltResult)s).ex) != null) {
1140 dst.completeExceptionally(new RuntimeException(ex));
1141 return;
1142 }
1143 u = null;
1144 }
1145 else
1146 u = (U) s;
1147 try {
1148 if (executor != null)
1149 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1150 else
1151 dst.complete(fn.apply(t, u));
1152 } catch (Throwable rex) {
1153 dst.completeExceptionally(rex);
1154 }
1155 }
1156 }
1157 }
1158
1159 static final class AndRunnable<T> extends Completion {
1160 final CompletableFuture<? extends T> src;
1161 final CompletableFuture<?> snd;
1162 final Runnable fn;
1163 final CompletableFuture<Void> dst;
1164 final Executor executor;
1165 AndRunnable(CompletableFuture<? extends T> src,
1166 CompletableFuture<?> snd,
1167 Runnable fn,
1168 CompletableFuture<Void> dst, Executor executor) {
1169 this.src = src; this.snd = snd;
1170 this.fn = fn; this.dst = dst;
1171 this.executor = executor;
1172 }
1173 public void run() {
1174 Object r, s; Throwable ex;
1175 final CompletableFuture<? extends T> a;
1176 final CompletableFuture<?> b;
1177 final Runnable fn;
1178 final CompletableFuture<Void> dst;
1179 if ((dst = this.dst) != null &&
1180 (fn = this.fn) != null &&
1181 (a = this.src) != null &&
1182 (r = a.result) != null &&
1183 (b = this.snd) != null &&
1184 (s = b.result) != null &&
1185 compareAndSet(0, 1)) {
1186 if (r instanceof AltResult) {
1187 if ((ex = ((AltResult)r).ex) != null) {
1188 dst.completeExceptionally(new RuntimeException(ex));
1189 return;
1190 }
1191 }
1192 if (s instanceof AltResult) {
1193 if ((ex = ((AltResult)s).ex) != null) {
1194 dst.completeExceptionally(new RuntimeException(ex));
1195 return;
1196 }
1197 }
1198 try {
1199 if (executor != null)
1200 executor.execute(new AsyncRunnable(fn, dst));
1201 else {
1202 fn.run();
1203 dst.complete(null);
1204 }
1205 } catch (Throwable rex) {
1206 dst.completeExceptionally(rex);
1207 }
1208 }
1209 }
1210 }
1211
1212 static final class OrFunction<T,U> extends Completion {
1213 final CompletableFuture<? extends T> src;
1214 final CompletableFuture<? extends T> snd;
1215 final Function<? super T,? extends U> fn;
1216 final CompletableFuture<U> dst;
1217 final Executor executor;
1218 OrFunction(CompletableFuture<? extends T> src,
1219 CompletableFuture<? extends T> snd,
1220 Function<? super T,? extends U> fn,
1221 CompletableFuture<U> dst, Executor executor) {
1222 this.src = src; this.snd = snd;
1223 this.fn = fn; this.dst = dst;
1224 this.executor = executor;
1225 }
1226 public void run() {
1227 Object r; T t; Throwable ex;
1228 CompletableFuture<? extends T> a;
1229 CompletableFuture<? extends T> b;
1230 Function<? super T,? extends U> fn;
1231 CompletableFuture<U> dst;
1232 if ((dst = this.dst) != null &&
1233 (fn = this.fn) != null &&
1234 (((a = this.src) != null && (r = a.result) != null) ||
1235 ((b = this.snd) != null && (r = b.result) != null)) &&
1236 compareAndSet(0, 1)) {
1237 if (r instanceof AltResult) {
1238 if ((ex = ((AltResult)r).ex) != null) {
1239 dst.completeExceptionally(new RuntimeException(ex));
1240 return;
1241 }
1242 t = null;
1243 }
1244 else
1245 t = (T) r;
1246 try {
1247 if (executor != null)
1248 executor.execute(new AsyncFunction(t, fn, dst));
1249 else
1250 dst.complete(fn.apply(t));
1251 } catch (Throwable rex) {
1252 dst.completeExceptionally(rex);
1253 }
1254 }
1255 }
1256 }
1257
1258 static final class OrRunnable<T> extends Completion {
1259 final CompletableFuture<? extends T> src;
1260 final CompletableFuture<?> snd;
1261 final Runnable fn;
1262 final CompletableFuture<Void> dst;
1263 final Executor executor;
1264 OrRunnable(CompletableFuture<? extends T> src,
1265 CompletableFuture<?> snd,
1266 Runnable fn,
1267 CompletableFuture<Void> dst, Executor executor) {
1268 this.src = src; this.snd = snd;
1269 this.fn = fn; this.dst = dst;
1270 this.executor = executor;
1271 }
1272 public void run() {
1273 Object r; Throwable ex;
1274 CompletableFuture<? extends T> a;
1275 final CompletableFuture<?> b;
1276 final Runnable fn;
1277 final CompletableFuture<Void> dst;
1278 if ((dst = this.dst) != null &&
1279 (fn = this.fn) != null &&
1280 (((a = this.src) != null && (r = a.result) != null) ||
1281 ((b = this.snd) != null && (r = b.result) != null)) &&
1282 compareAndSet(0, 1)) {
1283 if ((r instanceof AltResult) &&
1284 (ex = ((AltResult)r).ex) != null) {
1285 dst.completeExceptionally(new RuntimeException(ex));
1286 }
1287 else {
1288 try {
1289 if (executor != null)
1290 executor.execute(new AsyncRunnable(fn, dst));
1291 else {
1292 fn.run();
1293 dst.complete(null);
1294 }
1295 } catch (Throwable rex) {
1296 dst.completeExceptionally(rex);
1297 }
1298 }
1299 }
1300 }
1301 }
1302
1303 static final class ExceptionAction<T> extends Completion {
1304 final CompletableFuture<? extends T> src;
1305 final Function<? super Throwable, ? extends T> fn;
1306 final CompletableFuture<T> dst;
1307 ExceptionAction(CompletableFuture<? extends T> src,
1308 Function<? super Throwable, ? extends T> fn,
1309 CompletableFuture<T> dst) {
1310 this.src = src; this.fn = fn; this.dst = dst;
1311 }
1312 public void run() {
1313 CompletableFuture<? extends T> a;
1314 Function<? super Throwable, ? extends T> fn;
1315 CompletableFuture<T> dst;
1316 Object r; T t; Throwable ex;
1317 if ((dst = this.dst) != null &&
1318 (fn = this.fn) != null &&
1319 (a = this.src) != null &&
1320 (r = a.result) != null &&
1321 compareAndSet(0, 1)) {
1322 if (r instanceof AltResult) {
1323 if ((ex = ((AltResult)r).ex) != null) {
1324 try {
1325 dst.complete(fn.apply(ex));
1326 } catch (Throwable rex) {
1327 dst.completeExceptionally(rex);
1328 }
1329 return;
1330 }
1331 t = null;
1332 }
1333 else
1334 t = (T) r;
1335 dst.complete(t);
1336 }
1337 }
1338 }
1339
1340 /* ------------- then/and/or implementations -------------- */
1341
1342 private <U> CompletableFuture<U> thenFunction(Function<? super T,? extends U> fn,
1343 Executor executor) {
1344
1345 if (fn == null) throw new NullPointerException();
1346 CompletableFuture<U> dst = new CompletableFuture<U>();
1347 ThenFunction<T,U> d = null;
1348 Object r;
1349 if ((r = result) == null) {
1350 CompletionNode p = new CompletionNode
1351 (d = new ThenFunction<T,U>(this, fn, dst, executor));
1352 while ((r = result) == null) {
1353 if (UNSAFE.compareAndSwapObject
1354 (this, COMPLETIONS, p.next = completions, p))
1355 break;
1356 }
1357 }
1358 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1359 T t; Throwable ex = null;
1360 if (r instanceof AltResult) {
1361 if ((ex = ((AltResult)r).ex) != null)
1362 dst.completeExceptionally(new RuntimeException(ex));
1363 t = null;
1364 }
1365 else
1366 t = (T) r;
1367 if (ex == null) {
1368 try {
1369 if (executor != null)
1370 executor.execute(new AsyncFunction(t, fn, dst));
1371 else
1372 dst.complete(fn.apply(t));
1373 } catch (Throwable rex) {
1374 dst.completeExceptionally(rex);
1375 }
1376 }
1377 postComplete();
1378 }
1379 return dst;
1380 }
1381
1382 private CompletableFuture<Void> thenRunnable(Runnable action,
1383 Executor executor) {
1384 if (action == null) throw new NullPointerException();
1385 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1386 ThenRunnable<T> d = null;
1387 Object r;
1388 if ((r = result) == null) {
1389 CompletionNode p = new CompletionNode
1390 (d = new ThenRunnable<T>(this, action, dst, executor));
1391 while ((r = result) == null) {
1392 if (UNSAFE.compareAndSwapObject
1393 (this, COMPLETIONS, p.next = completions, p))
1394 break;
1395 }
1396 }
1397 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1398 Throwable ex = null;
1399 if (r instanceof AltResult) {
1400 if ((ex = ((AltResult)r).ex) != null)
1401 dst.completeExceptionally(new RuntimeException(ex));
1402 }
1403 if (ex == null) {
1404 try {
1405 if (executor != null)
1406 executor.execute(new AsyncRunnable(action, dst));
1407 else {
1408 action.run();
1409 dst.complete(null);
1410 }
1411 } catch (Throwable rex) {
1412 dst.completeExceptionally(rex);
1413 }
1414 }
1415 postComplete();
1416 }
1417 return dst;
1418 }
1419
1420 private <U,V> CompletableFuture<V> andFunction(CompletableFuture<? extends U> other,
1421 BiFunction<? super T,? super U,? extends V> fn,
1422 Executor executor) {
1423 if (other == null || fn == null) throw new NullPointerException();
1424 CompletableFuture<V> dst = new CompletableFuture<V>();
1425 AndFunction<T,U,V> d = null;
1426 Object r, s = null;
1427 if ((r = result) == null || (s = other.result) == null) {
1428 d = new AndFunction<T,U,V>(this, other, fn, dst, executor);
1429 CompletionNode q = null, p = new CompletionNode(d);
1430 while ((r == null && (r = result) == null) ||
1431 (s == null && (s = other.result) == null)) {
1432 if (q != null) {
1433 if (s != null ||
1434 UNSAFE.compareAndSwapObject
1435 (other, COMPLETIONS, q.next = other.completions, q))
1436 break;
1437 }
1438 else if (r != null ||
1439 UNSAFE.compareAndSwapObject
1440 (this, COMPLETIONS, p.next = completions, p)) {
1441 if (s != null)
1442 break;
1443 q = new CompletionNode(d);
1444 }
1445 }
1446 }
1447 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1448 T t; U u; Throwable ex = null;
1449 if (r instanceof AltResult) {
1450 if ((ex = ((AltResult)r).ex) != null)
1451 dst.completeExceptionally(new RuntimeException(ex));
1452 t = null;
1453 }
1454 else
1455 t = (T) r;
1456 if (ex != null)
1457 u = null;
1458 else if (s instanceof AltResult) {
1459 if ((ex = ((AltResult)s).ex) != null)
1460 dst.completeExceptionally(new RuntimeException(ex));
1461 u = null;
1462 }
1463 else
1464 u = (U) s;
1465 if (ex == null) {
1466 try {
1467 if (executor != null)
1468 executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1469 else
1470 dst.complete(fn.apply(t, u));
1471 } catch (Throwable rex) {
1472 dst.completeExceptionally(rex);
1473 }
1474 }
1475 }
1476 if (r != null)
1477 postComplete();
1478 if (s != null)
1479 other.postComplete();
1480 return dst;
1481 }
1482
1483 private CompletableFuture<Void> andRunnable(CompletableFuture<?> other,
1484 Runnable action,
1485 Executor executor) {
1486 if (other == null || action == null) throw new NullPointerException();
1487 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1488 AndRunnable<T> d = null;
1489 Object r, s = null;
1490 if ((r = result) == null || (s = other.result) == null) {
1491 d = new AndRunnable<T>(this, other, action, dst, executor);
1492 CompletionNode q = null, p = new CompletionNode(d);
1493 while ((r == null && (r = result) == null) ||
1494 (s == null && (s = other.result) == null)) {
1495 if (q != null) {
1496 if (s != null ||
1497 UNSAFE.compareAndSwapObject
1498 (other, COMPLETIONS, q.next = other.completions, q))
1499 break;
1500 }
1501 else if (r != null ||
1502 UNSAFE.compareAndSwapObject
1503 (this, COMPLETIONS, p.next = completions, p)) {
1504 if (s != null)
1505 break;
1506 q = new CompletionNode(d);
1507 }
1508 }
1509 }
1510 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1511 Throwable ex = null;
1512 if ((r instanceof AltResult) &&
1513 (ex = ((AltResult)r).ex) != null)
1514 dst.completeExceptionally(new RuntimeException(ex));
1515 else if ((s instanceof AltResult) &&
1516 (ex = ((AltResult)s).ex) != null)
1517 dst.completeExceptionally(new RuntimeException(ex));
1518 else {
1519 try {
1520 if (executor != null)
1521 executor.execute(new AsyncRunnable(action, dst));
1522 else {
1523 action.run();
1524 dst.complete(null);
1525 }
1526 } catch (Throwable rex) {
1527 dst.completeExceptionally(rex);
1528 }
1529 }
1530 }
1531 if (r != null)
1532 postComplete();
1533 if (s != null)
1534 other.postComplete();
1535 return dst;
1536 }
1537
1538 private <U> CompletableFuture<U> orFunction(CompletableFuture<? extends T> other,
1539 Function<? super T, U> fn,
1540 Executor executor) {
1541 if (other == null || fn == null) throw new NullPointerException();
1542 CompletableFuture<U> dst = new CompletableFuture<U>();
1543 OrFunction<T,U> d = null;
1544 Object r;
1545 if ((r = result) == null && (r = other.result) == null) {
1546 d = new OrFunction<T,U>(this, other, fn, dst, executor);
1547 CompletionNode q = null, p = new CompletionNode(d);
1548 while ((r = result) == null && (r = other.result) == null) {
1549 if (q != null) {
1550 if (UNSAFE.compareAndSwapObject
1551 (other, COMPLETIONS, q.next = other.completions, q))
1552 break;
1553 }
1554 else if (UNSAFE.compareAndSwapObject
1555 (this, COMPLETIONS, p.next = completions, p))
1556 q = new CompletionNode(d);
1557 }
1558 }
1559 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1560 T t; Throwable ex = null;
1561 if (r instanceof AltResult) {
1562 if ((ex = ((AltResult)r).ex) != null)
1563 dst.completeExceptionally(new RuntimeException(ex));
1564 t = null;
1565 }
1566 else
1567 t = (T) r;
1568 if (ex == null) {
1569 try {
1570 if (executor != null)
1571 executor.execute(new AsyncFunction(t, fn, dst));
1572 else
1573 dst.complete(fn.apply(t));
1574 } catch (Throwable rex) {
1575 dst.completeExceptionally(rex);
1576 }
1577 }
1578 }
1579 if (r != null) {
1580 if (result != null)
1581 postComplete();
1582 if (other.result != null)
1583 other.postComplete();
1584 }
1585 return dst;
1586 }
1587
1588 private CompletableFuture<Void> orRunnable(CompletableFuture<?> other,
1589 Runnable action,
1590 Executor executor) {
1591 if (other == null || action == null) throw new NullPointerException();
1592 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1593 OrRunnable<T> d = null;
1594 Object r;
1595 if ((r = result) == null && (r = other.result) == null) {
1596 d = new OrRunnable<T>(this, other, action, dst, executor);
1597 CompletionNode q = null, p = new CompletionNode(d);
1598 while ((r = result) == null && (r = other.result) == null) {
1599 if (q != null) {
1600 if (UNSAFE.compareAndSwapObject
1601 (other, COMPLETIONS, q.next = other.completions, q))
1602 break;
1603 }
1604 else if (UNSAFE.compareAndSwapObject
1605 (this, COMPLETIONS, p.next = completions, p))
1606 q = new CompletionNode(d);
1607 }
1608 }
1609 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1610 Throwable ex = null;
1611 if ((r instanceof AltResult) &&
1612 (ex = ((AltResult)r).ex) != null)
1613 dst.completeExceptionally(new RuntimeException(ex));
1614 else {
1615 try {
1616 if (executor != null)
1617 executor.execute(new AsyncRunnable(action, dst));
1618 else {
1619 action.run();
1620 dst.complete(null);
1621 }
1622 } catch (Throwable rex) {
1623 dst.completeExceptionally(rex);
1624 }
1625 }
1626 }
1627 if (r != null) {
1628 if (result != null)
1629 postComplete();
1630 if (other.result != null)
1631 other.postComplete();
1632 }
1633 return dst;
1634 }
1635
1636 // Unsafe mechanics
1637 private static final sun.misc.Unsafe UNSAFE;
1638 private static final long RESULT;
1639 private static final long WAITERS;
1640 private static final long COMPLETIONS;
1641 static {
1642 try {
1643 UNSAFE = sun.misc.Unsafe.getUnsafe();
1644 Class<?> k = CompletableFuture.class;
1645 RESULT = UNSAFE.objectFieldOffset
1646 (k.getDeclaredField("result"));
1647 WAITERS = UNSAFE.objectFieldOffset
1648 (k.getDeclaredField("waiters"));
1649 COMPLETIONS = UNSAFE.objectFieldOffset
1650 (k.getDeclaredField("completions"));
1651 } catch (Exception e) {
1652 throw new Error(e);
1653 }
1654 }
1655
1656 }