ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.6
Committed: Fri Dec 28 19:18:30 2012 UTC (11 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.5: +56 -39 lines
Log Message:
Allow exceptionally to compute value; rename force

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9     import java.util.function.Function;
10     import java.util.function.BiFunction;
11     import java.util.concurrent.Future;
12     import java.util.concurrent.TimeUnit;
13     import java.util.concurrent.ForkJoinPool;
14     import java.util.concurrent.ForkJoinTask;
15     import java.util.concurrent.Executor;
16 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
17 dl 1.1 import java.util.concurrent.ExecutionException;
18     import java.util.concurrent.TimeoutException;
19     import java.util.concurrent.CancellationException;
20     import java.util.concurrent.atomic.AtomicInteger;
21     import java.util.concurrent.locks.LockSupport;
22    
23    
24     /**
25     * A {@link Future} that may be explicitly completed (setting its
26     * value and status), and may include dependent functions and actions
27     * that trigger upon its completion.
28     *
29 jsr166 1.4 * <p>Similar methods are available for function-based usages in
30 dl 1.1 * which dependent stages typically propagate values, as well as
31     * result-less action-based usages, that are normally associated with
32     * {@code CompletableFuture<Void>} Futures. Functions and actions
33     * supplied for dependent completions using {@code then}, {@code
34     * andThen}, {@code orThen}, and {@code exceptionally} may be
35     * performed by the thread that completes the current
36     * CompletableFuture, or by any other caller of these methods. There
37     * are no guarantees about the order of processing completions unless
38     * constrained by method {@code then} and related methods.
39     *
40 jsr166 1.4 * <p>When two or more threads attempt to {@link #complete} or {@link
41 dl 1.1 * #completeExceptionally} a CompletableFuture, only one of them will
42 dl 1.5 * succeed. Upon exceptional completion, or when a completion entails
43     * computation of a function or action, and it terminates abruptly
44     * with an exception, then further completions act as {@code
45     * completeExceptionally} with that exception.
46 dl 1.1 *
47     * <p>CompletableFutures themselves do not execute asynchronously.
48     * However, the {@code async} methods provide commonly useful ways to
49     * to commence asynchronous processing, using either a given {@link
50     * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
51     * function or action that will result in the completion of a new
52     * CompletableFuture.
53     *
54     * @author Doug Lea
55     * @since 1.8
56     */
57     public class CompletableFuture<T> implements Future<T> {
58     /*
59     * Quick overview (more to come):
60     *
61     * 1. Non-nullness of field result indicates done. An AltResult is
62     * used to box null as a result, as well as to hold exceptions.
63     *
64     * 2. Waiters are held in a Treiber stack similar to the one used
65     * in FutureTask
66     *
67     * 3. Completions are also kept in a list/stack, and pulled off
68     * and run when completion is triggered.
69     */
70    
71     static final class AltResult {
72     final Throwable ex; // null only for NIL
73 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
74 dl 1.1 }
75    
76     static final AltResult NIL = new AltResult(null);
77    
78     /**
79     * Simple linked list nodes to record waiting threads in a Treiber
80     * stack. See other classes such as Phaser and SynchronousQueue
81     * for more detailed explanation.
82     */
83     static final class WaitNode {
84     volatile Thread thread;
85     volatile WaitNode next;
86     }
87    
88     /**
89     * Simple linked list nodes to record completions, used in
90     * basically the same way as WaitNodes
91     */
92     static final class CompletionNode {
93 jsr166 1.2 final Completion completion;
94 dl 1.1 volatile CompletionNode next;
95 jsr166 1.2 CompletionNode(Completion completion) { this.completion = completion; }
96 dl 1.1 }
97    
98    
99     volatile Object result; // either the result or boxed AltResult
100     volatile WaitNode waiters; // Treiber stack of threads blocked on get()
101     volatile CompletionNode completions; // list (Treiber stack) of completions
102    
103     /**
104     * Creates a new incomplete CompletableFuture.
105     */
106     public CompletableFuture() {
107     }
108    
109     /**
110     * Asynchronously executes in the {@link
111     * ForkJoinPool#commonPool()}, a task that completes the returned
112     * CompletableFuture with the result of the given Supplier.
113     *
114     * @param supplier a function returning the value to be used
115     * to complete the returned CompletableFuture.
116     * @return the CompletableFuture.
117     */
118     public static <U> CompletableFuture<U> async(Supplier<U> supplier) {
119     if (supplier == null) throw new NullPointerException();
120     CompletableFuture<U> f = new CompletableFuture<U>();
121     ForkJoinPool.commonPool().
122     execute((ForkJoinTask<?>)new AsyncSupplier(supplier, f));
123     return f;
124     }
125    
126     /**
127     * Asynchronously executes using the given executor, a task that
128     * completes the returned CompletableFuture with the result of the
129     * given Supplier.
130     *
131     * @param supplier a function returning the value to be used
132     * to complete the returned CompletableFuture.
133     * @param executor the executor to use for asynchronous execution
134     * @return the CompletableFuture.
135     */
136     public static <U> CompletableFuture<U> async(Supplier<U> supplier,
137     Executor executor) {
138     if (executor == null || supplier == null)
139     throw new NullPointerException();
140     CompletableFuture<U> f = new CompletableFuture<U>();
141     executor.execute(new AsyncSupplier(supplier, f));
142     return f;
143     }
144    
145     /**
146     * Asynchronously executes in the {@link
147     * ForkJoinPool#commonPool()} a task that runs the given action,
148     * and then completes the returned CompletableFuture
149     *
150     * @param runnable the action to run before completing the
151     * returned CompletableFuture.
152     * @return the CompletableFuture.
153     */
154     public static CompletableFuture<Void> async(Runnable runnable) {
155     if (runnable == null) throw new NullPointerException();
156     CompletableFuture<Void> f = new CompletableFuture<Void>();
157     ForkJoinPool.commonPool().
158     execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
159     return f;
160     }
161    
162     /**
163     * Asynchronously executes using the given executor, a task that
164     * runs the given action, and then completes the returned
165     * CompletableFuture
166     *
167     * @param runnable the action to run before completing the
168     * returned CompletableFuture.
169     * @param executor the executor to use for asynchronous execution
170     * @return the CompletableFuture.
171     */
172     public static CompletableFuture<Void> async(Runnable runnable,
173     Executor executor) {
174     if (executor == null || runnable == null)
175     throw new NullPointerException();
176     CompletableFuture<Void> f = new CompletableFuture<Void>();
177     executor.execute(new AsyncRunnable(runnable, f));
178     return f;
179     }
180    
181     /**
182     * Returns {@code true} if completed in any fashion: normally,
183     * exceptionally, or cancellation.
184     *
185     * @return {@code true} if completed
186     */
187     public boolean isDone() {
188     return result != null;
189     }
190    
191     /**
192     * Returns the result value when complete, or throws an
193     * (unchecked) exception if completed exceptionally. To better
194     * conform with the use of common functional forms, this method
195     * transforms any checked exception possible with {@link
196     * Future#get} into an (unchecked) {@link RuntimeException} with
197     * the underlying exception as its cause. (The checked exception
198     * convention is available using the timed form of get.)
199     *
200     * @return the result value
201     */
202     public T get() {
203     Object r; Throwable ex;
204 jsr166 1.2 if ((r = result) == null)
205     return waitingGet();
206     if (r instanceof AltResult) {
207 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
208     if (ex instanceof Error)
209     throw (Error)ex;
210     if (ex instanceof RuntimeException)
211     throw (RuntimeException)ex;
212     throw new RuntimeException(ex);
213     }
214 jsr166 1.2 return null;
215     }
216     return (T)r;
217 dl 1.1 }
218    
219     /**
220     * Returns the result value (or throws any encountered exception)
221     * if completed, else returns the given valueIfAbsent.
222     *
223     * @param valueIfAbsent the value to return if not completed
224     * @return the result value, if completed, else the given valueIfAbsent
225     */
226     public T getNow(T valueIfAbsent) {
227     Object r; Throwable ex;
228 jsr166 1.2 if ((r = result) == null)
229     return valueIfAbsent;
230     if (r instanceof AltResult) {
231 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
232     if (ex instanceof Error)
233     throw (Error)ex;
234     if (ex instanceof RuntimeException)
235     throw (RuntimeException)ex;
236     throw new RuntimeException(ex);
237     }
238 jsr166 1.2 return null;
239     }
240     return (T)r;
241 dl 1.1 }
242    
243     /**
244     * Waits if necessary for at most the given time for completion,
245     * and then retrieves its result, if available.
246     *
247     * @param timeout the maximum time to wait
248     * @param unit the time unit of the timeout argument
249     * @return the computed result
250     * @throws CancellationException if the computation was cancelled
251     * @throws ExecutionException if the computation threw an
252     * exception
253     * @throws InterruptedException if the current thread was interrupted
254     * while waiting
255     * @throws TimeoutException if the wait timed out
256     */
257     public T get(long timeout, TimeUnit unit)
258     throws InterruptedException, ExecutionException, TimeoutException {
259     Object r; Throwable ex;
260     long nanos = unit.toNanos(timeout);
261     if (Thread.interrupted())
262     throw new InterruptedException();
263 jsr166 1.2 if ((r = result) == null)
264     r = timedAwaitDone(nanos);
265     if (r instanceof AltResult) {
266 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
267     if (ex instanceof ExecutionException) // avoid re-wrap
268     throw (ExecutionException)ex;
269 dl 1.1 throw new ExecutionException(ex);
270 dl 1.5 }
271 jsr166 1.2 return null;
272     }
273     return (T)r;
274 dl 1.1 }
275    
276     /**
277     * If not already completed, sets the value returned by {@link
278     * #get()} and related methods to the given value.
279     *
280     * @param value the result value
281     * @return true if this invocation caused this CompletableFuture
282     * to transition to a completed state, else false.
283     */
284     public boolean complete(T value) {
285     if (result == null &&
286     UNSAFE.compareAndSwapObject(this, RESULT, null,
287 jsr166 1.2 (value == null) ? NIL : value)) {
288 dl 1.1 postComplete();
289     return true;
290     }
291     return false;
292     }
293    
294     /**
295     * If not already completed, causes invocations of {@link #get()}
296     * and related methods to throw the given exception.
297     *
298     * @param ex the exception
299     * @return true if this invocation caused this CompletableFuture
300     * to transition to a completed state, else false.
301     */
302     public boolean completeExceptionally(Throwable ex) {
303     if (ex == null) throw new NullPointerException();
304     if (result == null) {
305     Object r = new AltResult(ex);
306     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
307     postComplete();
308     return true;
309     }
310     }
311     return false;
312     }
313    
314     /**
315     * Creates and returns a CompletableFuture that is completed with
316     * the result of the given function of this CompletableFuture.
317     * If this CompletableFuture completes exceptionally,
318     * then the returned CompletableFuture also does so,
319     * with a RuntimeException having this exception as
320     * its cause.
321     *
322     * @param fn the function to use to compute the value of
323     * the returned CompletableFuture
324     * @return the new CompletableFuture
325     */
326     public <U> CompletableFuture<U> then(Function<? super T,? extends U> fn) {
327     return thenFunction(fn, null);
328     }
329    
330     /**
331     * Creates and returns a CompletableFuture that is asynchronously
332     * completed using the {@link ForkJoinPool#commonPool()} with the
333     * result of the given function of this CompletableFuture. If
334     * this CompletableFuture completes exceptionally, then the
335     * returned CompletableFuture also does so, with a
336     * RuntimeException having this exception as its cause.
337     *
338     * @param fn the function to use to compute the value of
339     * the returned CompletableFuture
340     * @return the new CompletableFuture
341     */
342     public <U> CompletableFuture<U> thenAsync(Function<? super T,? extends U> fn) {
343     return thenFunction(fn, ForkJoinPool.commonPool());
344     }
345    
346     /**
347     * Creates and returns a CompletableFuture that is asynchronously
348     * completed using the given executor with the result of the given
349     * function of this CompletableFuture. If this CompletableFuture
350     * completes exceptionally, then the returned CompletableFuture
351     * also does so, with a RuntimeException having this exception as
352     * its cause.
353     *
354     * @param fn the function to use to compute the value of
355     * the returned CompletableFuture
356     * @param executor the executor to use for asynchronous execution
357     * @return the new CompletableFuture
358     */
359     public <U> CompletableFuture<U> thenAsync(Function<? super T,? extends U> fn,
360     Executor executor) {
361     if (executor == null) throw new NullPointerException();
362     return thenFunction(fn, executor);
363     }
364    
365     /**
366     * Creates and returns a CompletableFuture that is completed after
367     * performing the given action if/when this CompletableFuture
368     * completes. If this CompletableFuture completes exceptionally,
369     * then the returned CompletableFuture also does so, with a
370     * RuntimeException having this exception as its cause.
371     *
372     * @param action the action to perform before completing the
373     * returned CompletableFuture
374     * @return the new CompletableFuture
375     */
376     public CompletableFuture<Void> then(Runnable action) {
377     return thenRunnable(action, null);
378     }
379    
380     /**
381     * Creates and returns a CompletableFuture that is asynchronously
382     * completed using the {@link ForkJoinPool#commonPool()} after
383     * performing the given action if/when this CompletableFuture
384     * completes. If this CompletableFuture completes exceptionally,
385     * then the returned CompletableFuture also does so, with a
386     * RuntimeException having this exception as its cause.
387     *
388     * @param action the action to perform before completing the
389     * returned CompletableFuture
390     * @return the new CompletableFuture
391     */
392     public CompletableFuture<Void> thenAsync(Runnable action) {
393     return thenRunnable(action, ForkJoinPool.commonPool());
394     }
395    
396     /**
397     * Creates and returns a CompletableFuture that is asynchronously
398     * completed using the given executor after performing the given
399     * action if/when this CompletableFuture completes. If this
400     * CompletableFuture completes exceptionally, then the returned
401     * CompletableFuture also does so, with a RuntimeException having
402     * this exception as its cause.
403     *
404     * @param action the action to perform before completing the
405     * returned CompletableFuture
406     * @param executor the executor to use for asynchronous execution
407     * @return the new CompletableFuture
408     */
409     public CompletableFuture<Void> thenAsync(Runnable action, Executor executor) {
410     if (executor == null) throw new NullPointerException();
411     return thenRunnable(action, executor);
412     }
413    
414     /**
415     * Creates and returns a CompletableFuture that is completed with
416     * the result of the given function of this and the other given
417     * CompletableFuture's results if/when both complete. If this or
418     * the other CompletableFuture complete exceptionally, then the
419     * returned CompletableFuture also does so, with a
420     * RuntimeException having the exception as its cause.
421     *
422     * @param other the other CompletableFuture
423     * @param fn the function to use to compute the value of
424     * the returned CompletableFuture
425     * @return the new CompletableFuture
426     */
427     public <U,V> CompletableFuture<V> andThen(CompletableFuture<? extends U> other,
428     BiFunction<? super T,? super U,? extends V> fn) {
429     return andFunction(other, fn, null);
430     }
431    
432     /**
433 jsr166 1.3 * Creates and returns a CompletableFuture that is asynchronously
434 dl 1.1 * completed using the {@link ForkJoinPool#commonPool()} with
435     * the result of the given function of this and the other given
436     * CompletableFuture's results if/when both complete. If this or
437     * the other CompletableFuture complete exceptionally, then the
438     * returned CompletableFuture also does so, with a
439     * RuntimeException having the exception as its cause.
440     *
441     * @param other the other CompletableFuture
442     * @param fn the function to use to compute the value of
443     * the returned CompletableFuture
444     * @return the new CompletableFuture
445     */
446     public <U,V> CompletableFuture<V> andThenAsync(CompletableFuture<? extends U> other,
447     BiFunction<? super T,? super U,? extends V> fn) {
448     return andFunction(other, fn, ForkJoinPool.commonPool());
449     }
450    
451     /**
452 jsr166 1.3 * Creates and returns a CompletableFuture that is
453 dl 1.1 * asynchronously completed using the given executor with the
454     * result of the given function of this and the other given
455     * CompletableFuture's results if/when both complete. If this or
456     * the other CompletableFuture complete exceptionally, then the
457     * returned CompletableFuture also does so, with a
458     * RuntimeException having the exception as its cause.
459     *
460     * @param other the other CompletableFuture
461     * @param fn the function to use to compute the value of
462     * the returned CompletableFuture
463     * @param executor the executor to use for asynchronous execution
464     * @return the new CompletableFuture
465     */
466    
467     public <U,V> CompletableFuture<V> andThenAsync(CompletableFuture<? extends U> other,
468     BiFunction<? super T,? super U,? extends V> fn,
469     Executor executor) {
470     if (executor == null) throw new NullPointerException();
471     return andFunction(other, fn, executor);
472     }
473    
474     /**
475     * Creates and returns a CompletableFuture that is completed
476     * if/when this and the other given CompletableFuture both
477     * complete. If this and/or the other CompletableFuture complete
478     * exceptionally, then the returned CompletableFuture also does
479     * so, with a RuntimeException having the one of the exceptions as
480     * its cause.
481     *
482     * @param other the other CompletableFuture
483     * @param action the action to perform before completing the
484     * returned CompletableFuture
485     * @return the new CompletableFuture
486     */
487     public CompletableFuture<Void> andThen(CompletableFuture<?> other,
488     Runnable action) {
489     return andRunnable(other, action, null);
490     }
491    
492     /**
493     * Creates and returns a CompletableFuture that is completed
494     * asynchronously using the {@link ForkJoinPool#commonPool()}
495     * if/when this and the other given CompletableFuture both
496     * complete. If this and/or the other CompletableFuture complete
497     * exceptionally, then the returned CompletableFuture also does
498     * so, with a RuntimeException having the one of the exceptions as
499     * its cause.
500     *
501     * @param other the other CompletableFuture
502     * @param action the action to perform before completing the
503     * returned CompletableFuture
504     * @return the new CompletableFuture
505     */
506     public CompletableFuture<Void> andThenAsync(CompletableFuture<?> other,
507     Runnable action) {
508     return andRunnable(other, action, ForkJoinPool.commonPool());
509     }
510    
511     /**
512     * Creates and returns a CompletableFuture that is completed
513     * asynchronously using the given executor
514     * if/when this and the other given CompletableFuture both
515     * complete. If this and/or the other CompletableFuture complete
516     * exceptionally, then the returned CompletableFuture also does
517     * so, with a RuntimeException having the one of the exceptions as
518     * its cause.
519     *
520     * @param other the other CompletableFuture
521     * @param action the action to perform before completing the
522     * returned CompletableFuture
523     * @param executor the executor to use for asynchronous execution
524     * @return the new CompletableFuture
525     */
526     public CompletableFuture<Void> andThenAsync(CompletableFuture<?> other,
527     Runnable action,
528     Executor executor) {
529     if (executor == null) throw new NullPointerException();
530     return andRunnable(other, action, executor);
531     }
532    
533     /**
534     * Creates and returns a CompletableFuture that is completed with
535     * the result of the given function of either this or the other
536     * given CompletableFuture's results if/when either complete. If
537     * this and/or the other CompletableFuture complete exceptionally,
538     * then the returned CompletableFuture may also do so, with a
539     * RuntimeException having one of these exceptions as its cause.
540     * No guarantees are made about which result or exception is used
541     * in the returned CompletableFuture.
542     *
543     * @param other the other CompletableFuture
544     * @param fn the function to use to compute the value of
545     * the returned CompletableFuture
546     * @return the new CompletableFuture
547     */
548     public <U> CompletableFuture<U> orThen(CompletableFuture<? extends T> other,
549     Function<? super T, U> fn) {
550     return orFunction(other, fn, null);
551     }
552    
553     /**
554     * Creates and returns a CompletableFuture that is completed
555     * asynchronously using the {@link ForkJoinPool#commonPool()} with
556     * the result of the given function of either this or the other
557     * given CompletableFuture's results if/when either complete. If
558     * this and/or the other CompletableFuture complete exceptionally,
559     * then the returned CompletableFuture may also do so, with a
560     * RuntimeException having one of these exceptions as its cause.
561     * No guarantees are made about which result or exception is used
562     * in the returned CompletableFuture.
563     *
564     * @param other the other CompletableFuture
565     * @param fn the function to use to compute the value of
566     * the returned CompletableFuture
567     * @return the new CompletableFuture
568     */
569     public <U> CompletableFuture<U> orThenAsync(CompletableFuture<? extends T> other,
570     Function<? super T, U> fn) {
571     return orFunction(other, fn, ForkJoinPool.commonPool());
572     }
573    
574     /**
575     * Creates and returns a CompletableFuture that is completed
576     * asynchronously using the given executor with the result of the
577     * given function of either this or the other given
578     * CompletableFuture's results if/when either complete. If this
579     * and/or the other CompletableFuture complete exceptionally, then
580     * the returned CompletableFuture may also do so, with a
581     * RuntimeException having one of these exceptions as its cause.
582     * No guarantees are made about which result or exception is used
583     * in the returned CompletableFuture.
584     *
585     * @param other the other CompletableFuture
586     * @param fn the function to use to compute the value of
587     * the returned CompletableFuture
588     * @param executor the executor to use for asynchronous execution
589     * @return the new CompletableFuture
590     */
591     public <U> CompletableFuture<U> orThen(CompletableFuture<? extends T> other,
592     Function<? super T, U> fn,
593     Executor executor) {
594     if (executor == null) throw new NullPointerException();
595     return orFunction(other, fn, executor);
596     }
597    
598     /**
599     * Creates and returns a CompletableFuture that is completed
600     * after this or the other given CompletableFuture complete. If
601     * this and/or the other CompletableFuture complete exceptionally,
602     * then the returned CompletableFuture may also do so, with a
603     * RuntimeException having one of these exceptions as its cause.
604     * No guarantees are made about which exception is used in the
605     * returned CompletableFuture.
606     *
607     * @param other the other CompletableFuture
608     * @param action the action to perform before completing the
609     * returned CompletableFuture
610     * @return the new CompletableFuture
611     */
612     public CompletableFuture<Void> orThen(CompletableFuture<?> other,
613     Runnable action) {
614     return orRunnable(other, action, null);
615     }
616    
617     /**
618     * Creates and returns a CompletableFuture that is completed
619     * asynchronously using the {@link ForkJoinPool#commonPool()}
620     * after this or the other given CompletableFuture complete. If
621     * this and/or the other CompletableFuture complete exceptionally,
622     * then the returned CompletableFuture may also do so, with a
623     * RuntimeException having one of these exceptions as its cause.
624     * No guarantees are made about which exception is used in the
625     * returned CompletableFuture.
626     *
627     * @param other the other CompletableFuture
628     * @param action the action to perform before completing the
629     * returned CompletableFuture
630     * @return the new CompletableFuture
631     */
632     public CompletableFuture<Void> orThenAsync(CompletableFuture<?> other,
633     Runnable action) {
634     return orRunnable(other, action, ForkJoinPool.commonPool());
635     }
636    
637     /**
638     * Creates and returns a CompletableFuture that is completed
639     * asynchronously using the given executor after this or the other
640     * given CompletableFuture complete. If this and/or the other
641     * CompletableFuture complete exceptionally, then the returned
642     * CompletableFuture may also do so, with a RuntimeException
643     * having one of these exceptions as its cause. No guarantees are
644     * made about which exception is used in the returned
645     * CompletableFuture.
646     *
647     * @param other the other CompletableFuture
648     * @param action the action to perform before completing the
649     * returned CompletableFuture
650     * @param executor the executor to use for asynchronous execution
651     * @return the new CompletableFuture
652     */
653     public CompletableFuture<Void> orThenAsync(CompletableFuture<?> other,
654     Runnable action,
655     Executor executor) {
656     if (executor == null) throw new NullPointerException();
657     return orRunnable(other, action, executor);
658     }
659    
660     /**
661 dl 1.6 * Creates and returns a CompletableFuture that is completed with
662     * the result of the given function of the exception triggering
663     * this CompletableFuture's completion if/when it completes
664     * exceptionally; Otherwise, if this CompletableFuture completes
665     * normally, then the returned CompletableFuture also completes
666     * normally with the same value.
667     *
668     * @param fn the function to use to compute the value of the
669     * returned CompletableFuture if this CompletableFuture completed
670     * exceptionally
671 dl 1.1 * @return the new CompletableFuture
672     */
673 dl 1.6 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
674     if (fn == null) throw new NullPointerException();
675     CompletableFuture<T> dst = new CompletableFuture<T>();
676 dl 1.1 ExceptionAction<T> d = null;
677 dl 1.6 Object r;
678 jsr166 1.2 if ((r = result) == null) {
679 dl 1.1 CompletionNode p =
680 dl 1.6 new CompletionNode(d = new ExceptionAction<T>(this, fn, dst));
681 dl 1.1 while ((r = result) == null) {
682     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
683     p.next = completions, p))
684     break;
685     }
686     }
687 dl 1.6 if (r != null && (d == null || d.compareAndSet(0, 1))) {
688     T t; Throwable ex = null;
689     if (r instanceof AltResult) {
690     if ((ex = ((AltResult)r).ex) != null) {
691     try {
692     dst.complete(fn.apply(ex));
693     } catch (Throwable rex) {
694     dst.completeExceptionally(rex);
695     }
696     }
697     t = null;
698 dl 1.1 }
699 dl 1.6 else
700     t = (T) r;
701     if (ex == null)
702     dst.complete(t);
703 dl 1.1 }
704     if (r != null)
705     postComplete();
706     return dst;
707     }
708    
709     /**
710     * Attempts to complete this CompletableFuture with
711     * a {@link CancellationException}.
712     *
713     * @param mayInterruptIfRunning this value has no effect in this
714     * implementation because interrupts are not used to control
715     * processing.
716     *
717     * @return {@code true} if this task is now cancelled
718     */
719     public boolean cancel(boolean mayInterruptIfRunning) {
720     Object r;
721     while ((r = result) == null) {
722     r = new AltResult(new CancellationException());
723     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
724     postComplete();
725     return true;
726     }
727     }
728     return ((r instanceof AltResult) &&
729     (((AltResult)r).ex instanceof CancellationException));
730     }
731    
732     /**
733     * Returns {@code true} if this CompletableFuture was cancelled
734     * before it completed normally.
735     *
736     * @return {@code true} if this CompletableFuture was cancelled
737     * before it completed normally
738     */
739     public boolean isCancelled() {
740     Object r;
741     return ((r = result) != null &&
742     (r instanceof AltResult) &&
743     (((AltResult)r).ex instanceof CancellationException));
744     }
745    
746     /**
747 dl 1.6 * Forcibly sets or resets the value subsequently returned by
748     * method get() and related methods, whether or not already
749     * completed. This method is designed for use only in error
750     * recovery actions, and even in such situations may result in
751     * ongoing dependent completions using established versus
752     * overwritten values.
753 dl 1.1 *
754     * @param value the completion value
755     */
756 dl 1.6 public void obtrudeValue(T value) {
757 jsr166 1.2 result = (value == null) ? NIL : value;
758 dl 1.1 postComplete();
759     }
760    
761     /**
762     * Removes and signals all waiting threads and runs all completions
763     */
764     private void postComplete() {
765     WaitNode q; Thread t;
766     while ((q = waiters) != null) {
767     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
768     (t = q.thread) != null) {
769     q.thread = null;
770     LockSupport.unpark(t);
771     }
772     }
773    
774     CompletionNode h; Completion c;
775     while ((h = completions) != null) {
776     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
777     (c = h.completion) != null)
778     c.run();
779     }
780     }
781    
782     /* ------------- waiting for completions -------------- */
783    
784 jsr166 1.2 /**
785 dl 1.1 * Heuristic spin value for waitingGet() before blocking on
786     * multiprocessors
787     */
788     static final int WAITING_GET_SPINS = 256;
789    
790     /**
791     * Returns result after waiting.
792     */
793     private T waitingGet() {
794     WaitNode q = null;
795     boolean queued = false, interrupted = false;
796     int h = 0, spins = 0;
797     for (Object r;;) {
798     if ((r = result) != null) {
799 dl 1.5 Throwable ex;
800 dl 1.1 if (q != null) // suppress unpark
801     q.thread = null;
802     postComplete(); // help release others
803     if (interrupted)
804     Thread.currentThread().interrupt();
805     if (r instanceof AltResult) {
806 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
807     if (ex instanceof Error)
808     throw (Error)ex;
809     if (ex instanceof RuntimeException)
810     throw (RuntimeException)ex;
811     throw new RuntimeException(ex);
812     }
813 dl 1.1 return null;
814     }
815     return (T)r;
816     }
817     else if (h == 0) {
818     h = ThreadLocalRandom.current().nextInt();
819     if (Runtime.getRuntime().availableProcessors() > 1)
820     spins = WAITING_GET_SPINS;
821     }
822     else if (spins > 0) {
823     h ^= h << 1; // xorshift
824 jsr166 1.2 h ^= h >>> 3;
825 dl 1.1 if ((h ^= h << 10) >= 0)
826     --spins;
827 jsr166 1.2 }
828 dl 1.1 else if (q == null)
829     q = new WaitNode();
830     else if (!queued)
831     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
832     q.next = waiters, q);
833     else if (Thread.interrupted())
834     interrupted = true;
835     else if (q.thread == null)
836     q.thread = Thread.currentThread();
837     else
838     LockSupport.park(this);
839     }
840     }
841    
842     /**
843     * Awaits completion or aborts on interrupt or timeout.
844     *
845     * @param nanos time to wait
846     * @return raw result
847     */
848     private Object timedAwaitDone(long nanos)
849     throws InterruptedException, TimeoutException {
850     final long deadline = System.nanoTime() + nanos;
851     WaitNode q = null;
852     boolean queued = false;
853     for (Object r;;) {
854     if (Thread.interrupted()) {
855     removeWaiter(q);
856     throw new InterruptedException();
857     }
858     else if ((r = result) != null) {
859     if (q != null)
860     q.thread = null;
861     postComplete();
862     return r;
863     }
864     else if (q == null)
865     q = new WaitNode();
866     else if (!queued)
867     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
868     q.next = waiters, q);
869     else if ((nanos = deadline - System.nanoTime()) <= 0L) {
870     removeWaiter(q);
871     throw new TimeoutException();
872     }
873     else if (q.thread == null)
874     q.thread = Thread.currentThread();
875     else
876     LockSupport.parkNanos(this, nanos);
877     }
878     }
879    
880     /**
881     * Tries to unlink a timed-out or interrupted wait node to avoid
882     * accumulating garbage. Internal nodes are simply unspliced
883     * without CAS since it is harmless if they are traversed anyway
884     * by releasers. To avoid effects of unsplicing from already
885     * removed nodes, the list is retraversed in case of an apparent
886     * race. This is slow when there are a lot of nodes, but we don't
887     * expect lists to be long enough to outweigh higher-overhead
888     * schemes.
889     */
890     private void removeWaiter(WaitNode node) {
891     if (node != null) {
892     node.thread = null;
893     retry:
894     for (;;) { // restart on removeWaiter race
895     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
896     s = q.next;
897     if (q.thread != null)
898     pred = q;
899     else if (pred != null) {
900     pred.next = s;
901     if (pred.thread == null) // check for race
902     continue retry;
903     }
904     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
905     continue retry;
906     }
907     break;
908     }
909     }
910     }
911    
912     /* ------------- Async tasks -------------- */
913    
914     /** Base class can act as either FJ or plain Runnable */
915     static abstract class Async extends ForkJoinTask<Void> implements Runnable {
916     public final Void getRawResult() { return null; }
917     public final void setRawResult(Void v) { }
918     public final void run() { exec(); }
919     }
920    
921     static final class AsyncRunnable extends Async {
922     final Runnable runnable;
923     final CompletableFuture<Void> dst;
924     AsyncRunnable(Runnable runnable, CompletableFuture<Void> dst) {
925     this.runnable = runnable; this.dst = dst;
926     }
927     public final boolean exec() {
928     Runnable fn;
929     CompletableFuture<Void> d;
930     if ((fn = this.runnable) == null || (d = this.dst) == null)
931     throw new NullPointerException();
932     try {
933     fn.run();
934     d.complete(null);
935     } catch (Throwable ex) {
936     d.completeExceptionally(ex);
937     }
938     return true;
939     }
940     private static final long serialVersionUID = 5232453952276885070L;
941     }
942    
943     static final class AsyncSupplier<U> extends Async {
944     final Supplier<U> supplier;
945     final CompletableFuture<U> dst;
946     AsyncSupplier(Supplier<U> supplier, CompletableFuture<U> dst) {
947     this.supplier = supplier; this.dst = dst;
948     }
949     public final boolean exec() {
950     Supplier<U> fn;
951     CompletableFuture<U> d;
952     if ((fn = this.supplier) == null || (d = this.dst) == null)
953     throw new NullPointerException();
954     try {
955     d.complete(fn.get());
956     } catch (Throwable ex) {
957     d.completeExceptionally(ex);
958     }
959     return true;
960     }
961     private static final long serialVersionUID = 5232453952276885070L;
962     }
963    
964     static final class AsyncFunction<T,U> extends Async {
965     Function<? super T,? extends U> fn;
966     T arg;
967     final CompletableFuture<U> dst;
968     AsyncFunction(T arg, Function<? super T,? extends U> fn,
969     CompletableFuture<U> dst) {
970     this.arg = arg; this.fn = fn; this.dst = dst;
971     }
972     public final boolean exec() {
973     Function<? super T,? extends U> fn;
974     CompletableFuture<U> d;
975     if ((fn = this.fn) == null || (d = this.dst) == null)
976     throw new NullPointerException();
977     try {
978     d.complete(fn.apply(arg));
979     } catch (Throwable ex) {
980     d.completeExceptionally(ex);
981     }
982     return true;
983     }
984     private static final long serialVersionUID = 5232453952276885070L;
985     }
986    
987     static final class AsyncBiFunction<T,U,V> extends Async {
988     final BiFunction<? super T,? super U,? extends V> fn;
989     final T arg1;
990     final U arg2;
991     final CompletableFuture<V> dst;
992     AsyncBiFunction(T arg1, U arg2,
993     BiFunction<? super T,? super U,? extends V> fn,
994     CompletableFuture<V> dst) {
995     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
996     }
997     public final boolean exec() {
998     BiFunction<? super T,? super U,? extends V> fn;
999     CompletableFuture<V> d;
1000     if ((fn = this.fn) == null || (d = this.dst) == null)
1001     throw new NullPointerException();
1002     try {
1003     d.complete(fn.apply(arg1, arg2));
1004     } catch (Throwable ex) {
1005     d.completeExceptionally(ex);
1006     }
1007     return true;
1008     }
1009     private static final long serialVersionUID = 5232453952276885070L;
1010     }
1011    
1012     /* ------------- Completions -------------- */
1013    
1014     // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1015     static abstract class Completion extends AtomicInteger implements Runnable {
1016     }
1017    
1018     static final class ThenFunction<T,U> extends Completion {
1019 jsr166 1.2 final CompletableFuture<? extends T> src;
1020     final Function<? super T,? extends U> fn;
1021     final CompletableFuture<U> dst;
1022 dl 1.1 final Executor executor;
1023     ThenFunction(CompletableFuture<? extends T> src,
1024     final Function<? super T,? extends U> fn,
1025     final CompletableFuture<U> dst, Executor executor) {
1026     this.src = src; this.fn = fn; this.dst = dst;
1027     this.executor = executor;
1028     }
1029 jsr166 1.2 public void run() {
1030 dl 1.1 CompletableFuture<? extends T> a;
1031     Function<? super T,? extends U> fn;
1032     CompletableFuture<U> dst;
1033 jsr166 1.2 Object r; T t; Throwable ex;
1034     if ((dst = this.dst) != null &&
1035 dl 1.1 (fn = this.fn) != null &&
1036     (a = this.src) != null &&
1037     (r = a.result) != null &&
1038     compareAndSet(0, 1)) {
1039 jsr166 1.2 if (r instanceof AltResult) {
1040 dl 1.1 if ((ex = ((AltResult)r).ex) != null) {
1041     dst.completeExceptionally(new RuntimeException(ex));
1042     return;
1043     }
1044     t = null;
1045     }
1046     else
1047     t = (T) r;
1048     try {
1049     if (executor != null)
1050     executor.execute(new AsyncFunction(t, fn, dst));
1051     else
1052     dst.complete(fn.apply(t));
1053     } catch (Throwable rex) {
1054     dst.completeExceptionally(rex);
1055     }
1056     }
1057     }
1058     }
1059    
1060     static final class ThenRunnable<T> extends Completion {
1061 jsr166 1.2 final CompletableFuture<? extends T> src;
1062     final Runnable fn;
1063     final CompletableFuture<Void> dst;
1064 dl 1.1 final Executor executor;
1065     ThenRunnable(CompletableFuture<? extends T> src,
1066     Runnable fn,
1067     CompletableFuture<Void> dst,
1068     Executor executor) {
1069     this.src = src; this.fn = fn; this.dst = dst;
1070     this.executor = executor;
1071     }
1072 jsr166 1.2 public void run() {
1073 dl 1.1 CompletableFuture<? extends T> a;
1074     Runnable fn;
1075     CompletableFuture<Void> dst;
1076 jsr166 1.2 Object r; Throwable ex;
1077     if ((dst = this.dst) != null &&
1078 dl 1.1 (fn = this.fn) != null &&
1079     (a = this.src) != null &&
1080     (r = a.result) != null &&
1081     compareAndSet(0, 1)) {
1082 jsr166 1.2 if (r instanceof AltResult) {
1083 dl 1.1 if ((ex = ((AltResult)r).ex) != null) {
1084     dst.completeExceptionally(new RuntimeException(ex));
1085     return;
1086     }
1087     }
1088     try {
1089     if (executor != null)
1090     executor.execute(new AsyncRunnable(fn, dst));
1091     else {
1092     fn.run();
1093     dst.complete(null);
1094     }
1095     } catch (Throwable rex) {
1096     dst.completeExceptionally(rex);
1097     }
1098     }
1099     }
1100     }
1101    
1102     static final class AndFunction<T,U,V> extends Completion {
1103 jsr166 1.2 final CompletableFuture<? extends T> src;
1104     final CompletableFuture<? extends U> snd;
1105     final BiFunction<? super T,? super U,? extends V> fn;
1106     final CompletableFuture<V> dst;
1107 dl 1.1 final Executor executor;
1108     AndFunction(CompletableFuture<? extends T> src,
1109     CompletableFuture<? extends U> snd,
1110     BiFunction<? super T,? super U,? extends V> fn,
1111     CompletableFuture<V> dst, Executor executor) {
1112     this.src = src; this.snd = snd;
1113     this.fn = fn; this.dst = dst;
1114     this.executor = executor;
1115     }
1116 jsr166 1.2 public void run() {
1117     Object r, s; T t; U u; Throwable ex;
1118 dl 1.1 CompletableFuture<? extends T> a;
1119     CompletableFuture<? extends U> b;
1120     BiFunction<? super T,? super U,? extends V> fn;
1121     CompletableFuture<V> dst;
1122 jsr166 1.2 if ((dst = this.dst) != null &&
1123 dl 1.1 (fn = this.fn) != null &&
1124     (a = this.src) != null &&
1125     (r = a.result) != null &&
1126     (b = this.snd) != null &&
1127     (s = b.result) != null &&
1128     compareAndSet(0, 1)) {
1129 jsr166 1.2 if (r instanceof AltResult) {
1130 dl 1.1 if ((ex = ((AltResult)r).ex) != null) {
1131     dst.completeExceptionally(new RuntimeException(ex));
1132     return;
1133     }
1134 jsr166 1.2 t = null;
1135     }
1136     else
1137     t = (T) r;
1138     if (s instanceof AltResult) {
1139 dl 1.1 if ((ex = ((AltResult)s).ex) != null) {
1140     dst.completeExceptionally(new RuntimeException(ex));
1141     return;
1142     }
1143 jsr166 1.2 u = null;
1144     }
1145     else
1146     u = (U) s;
1147 dl 1.1 try {
1148     if (executor != null)
1149     executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1150     else
1151     dst.complete(fn.apply(t, u));
1152     } catch (Throwable rex) {
1153     dst.completeExceptionally(rex);
1154     }
1155 jsr166 1.2 }
1156     }
1157 dl 1.1 }
1158    
1159     static final class AndRunnable<T> extends Completion {
1160 jsr166 1.2 final CompletableFuture<? extends T> src;
1161     final CompletableFuture<?> snd;
1162     final Runnable fn;
1163     final CompletableFuture<Void> dst;
1164 dl 1.1 final Executor executor;
1165     AndRunnable(CompletableFuture<? extends T> src,
1166     CompletableFuture<?> snd,
1167     Runnable fn,
1168     CompletableFuture<Void> dst, Executor executor) {
1169     this.src = src; this.snd = snd;
1170     this.fn = fn; this.dst = dst;
1171     this.executor = executor;
1172     }
1173 jsr166 1.2 public void run() {
1174     Object r, s; Throwable ex;
1175 dl 1.1 final CompletableFuture<? extends T> a;
1176     final CompletableFuture<?> b;
1177     final Runnable fn;
1178     final CompletableFuture<Void> dst;
1179 jsr166 1.2 if ((dst = this.dst) != null &&
1180 dl 1.1 (fn = this.fn) != null &&
1181     (a = this.src) != null &&
1182     (r = a.result) != null &&
1183     (b = this.snd) != null &&
1184     (s = b.result) != null &&
1185     compareAndSet(0, 1)) {
1186 jsr166 1.2 if (r instanceof AltResult) {
1187 dl 1.1 if ((ex = ((AltResult)r).ex) != null) {
1188     dst.completeExceptionally(new RuntimeException(ex));
1189     return;
1190     }
1191 jsr166 1.2 }
1192     if (s instanceof AltResult) {
1193 dl 1.1 if ((ex = ((AltResult)s).ex) != null) {
1194     dst.completeExceptionally(new RuntimeException(ex));
1195     return;
1196     }
1197 jsr166 1.2 }
1198 dl 1.1 try {
1199     if (executor != null)
1200     executor.execute(new AsyncRunnable(fn, dst));
1201     else {
1202     fn.run();
1203     dst.complete(null);
1204     }
1205     } catch (Throwable rex) {
1206     dst.completeExceptionally(rex);
1207     }
1208 jsr166 1.2 }
1209     }
1210 dl 1.1 }
1211    
1212     static final class OrFunction<T,U> extends Completion {
1213 jsr166 1.2 final CompletableFuture<? extends T> src;
1214     final CompletableFuture<? extends T> snd;
1215     final Function<? super T,? extends U> fn;
1216     final CompletableFuture<U> dst;
1217 dl 1.1 final Executor executor;
1218     OrFunction(CompletableFuture<? extends T> src,
1219     CompletableFuture<? extends T> snd,
1220     Function<? super T,? extends U> fn,
1221     CompletableFuture<U> dst, Executor executor) {
1222     this.src = src; this.snd = snd;
1223     this.fn = fn; this.dst = dst;
1224     this.executor = executor;
1225     }
1226 jsr166 1.2 public void run() {
1227     Object r; T t; Throwable ex;
1228 dl 1.1 CompletableFuture<? extends T> a;
1229     CompletableFuture<? extends T> b;
1230     Function<? super T,? extends U> fn;
1231     CompletableFuture<U> dst;
1232 jsr166 1.2 if ((dst = this.dst) != null &&
1233 dl 1.1 (fn = this.fn) != null &&
1234     (((a = this.src) != null && (r = a.result) != null) ||
1235     ((b = this.snd) != null && (r = b.result) != null)) &&
1236     compareAndSet(0, 1)) {
1237 jsr166 1.2 if (r instanceof AltResult) {
1238 dl 1.1 if ((ex = ((AltResult)r).ex) != null) {
1239     dst.completeExceptionally(new RuntimeException(ex));
1240     return;
1241     }
1242 jsr166 1.2 t = null;
1243     }
1244     else
1245     t = (T) r;
1246 dl 1.1 try {
1247     if (executor != null)
1248     executor.execute(new AsyncFunction(t, fn, dst));
1249     else
1250     dst.complete(fn.apply(t));
1251     } catch (Throwable rex) {
1252     dst.completeExceptionally(rex);
1253     }
1254 jsr166 1.2 }
1255     }
1256 dl 1.1 }
1257    
1258     static final class OrRunnable<T> extends Completion {
1259 jsr166 1.2 final CompletableFuture<? extends T> src;
1260     final CompletableFuture<?> snd;
1261     final Runnable fn;
1262     final CompletableFuture<Void> dst;
1263 dl 1.1 final Executor executor;
1264     OrRunnable(CompletableFuture<? extends T> src,
1265     CompletableFuture<?> snd,
1266     Runnable fn,
1267     CompletableFuture<Void> dst, Executor executor) {
1268     this.src = src; this.snd = snd;
1269     this.fn = fn; this.dst = dst;
1270     this.executor = executor;
1271     }
1272 jsr166 1.2 public void run() {
1273     Object r; Throwable ex;
1274 dl 1.1 CompletableFuture<? extends T> a;
1275     final CompletableFuture<?> b;
1276     final Runnable fn;
1277     final CompletableFuture<Void> dst;
1278 jsr166 1.2 if ((dst = this.dst) != null &&
1279 dl 1.1 (fn = this.fn) != null &&
1280     (((a = this.src) != null && (r = a.result) != null) ||
1281     ((b = this.snd) != null && (r = b.result) != null)) &&
1282     compareAndSet(0, 1)) {
1283 jsr166 1.2 if ((r instanceof AltResult) &&
1284 dl 1.1 (ex = ((AltResult)r).ex) != null) {
1285     dst.completeExceptionally(new RuntimeException(ex));
1286     }
1287 jsr166 1.2 else {
1288 dl 1.1 try {
1289     if (executor != null)
1290     executor.execute(new AsyncRunnable(fn, dst));
1291     else {
1292     fn.run();
1293     dst.complete(null);
1294     }
1295     } catch (Throwable rex) {
1296     dst.completeExceptionally(rex);
1297     }
1298     }
1299 jsr166 1.2 }
1300     }
1301 dl 1.1 }
1302    
1303     static final class ExceptionAction<T> extends Completion {
1304 jsr166 1.2 final CompletableFuture<? extends T> src;
1305 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1306     final CompletableFuture<T> dst;
1307 dl 1.1 ExceptionAction(CompletableFuture<? extends T> src,
1308 dl 1.6 Function<? super Throwable, ? extends T> fn,
1309     CompletableFuture<T> dst) {
1310 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1311     }
1312 jsr166 1.2 public void run() {
1313 dl 1.1 CompletableFuture<? extends T> a;
1314 dl 1.6 Function<? super Throwable, ? extends T> fn;
1315     CompletableFuture<T> dst;
1316     Object r; T t; Throwable ex;
1317 jsr166 1.2 if ((dst = this.dst) != null &&
1318 dl 1.1 (fn = this.fn) != null &&
1319     (a = this.src) != null &&
1320     (r = a.result) != null &&
1321     compareAndSet(0, 1)) {
1322 dl 1.6 if (r instanceof AltResult) {
1323     if ((ex = ((AltResult)r).ex) != null) {
1324     try {
1325     dst.complete(fn.apply(ex));
1326     } catch (Throwable rex) {
1327     dst.completeExceptionally(rex);
1328     }
1329     return;
1330 dl 1.1 }
1331 dl 1.6 t = null;
1332 dl 1.1 }
1333 dl 1.6 else
1334     t = (T) r;
1335     dst.complete(t);
1336 dl 1.1 }
1337     }
1338     }
1339    
1340     /* ------------- then/and/or implementations -------------- */
1341    
1342     private <U> CompletableFuture<U> thenFunction(Function<? super T,? extends U> fn,
1343     Executor executor) {
1344    
1345     if (fn == null) throw new NullPointerException();
1346     CompletableFuture<U> dst = new CompletableFuture<U>();
1347     ThenFunction<T,U> d = null;
1348 jsr166 1.2 Object r;
1349     if ((r = result) == null) {
1350 dl 1.1 CompletionNode p = new CompletionNode
1351     (d = new ThenFunction<T,U>(this, fn, dst, executor));
1352     while ((r = result) == null) {
1353     if (UNSAFE.compareAndSwapObject
1354     (this, COMPLETIONS, p.next = completions, p))
1355     break;
1356     }
1357     }
1358 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1359     T t; Throwable ex = null;
1360     if (r instanceof AltResult) {
1361 dl 1.1 if ((ex = ((AltResult)r).ex) != null)
1362 jsr166 1.2 dst.completeExceptionally(new RuntimeException(ex));
1363     t = null;
1364 dl 1.1 }
1365     else
1366 jsr166 1.2 t = (T) r;
1367     if (ex == null) {
1368 dl 1.1 try {
1369     if (executor != null)
1370     executor.execute(new AsyncFunction(t, fn, dst));
1371     else
1372     dst.complete(fn.apply(t));
1373     } catch (Throwable rex) {
1374     dst.completeExceptionally(rex);
1375     }
1376     }
1377 jsr166 1.2 postComplete();
1378     }
1379 dl 1.1 return dst;
1380     }
1381    
1382 jsr166 1.2 private CompletableFuture<Void> thenRunnable(Runnable action,
1383 dl 1.1 Executor executor) {
1384     if (action == null) throw new NullPointerException();
1385     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1386     ThenRunnable<T> d = null;
1387 jsr166 1.2 Object r;
1388     if ((r = result) == null) {
1389 dl 1.1 CompletionNode p = new CompletionNode
1390     (d = new ThenRunnable<T>(this, action, dst, executor));
1391     while ((r = result) == null) {
1392     if (UNSAFE.compareAndSwapObject
1393     (this, COMPLETIONS, p.next = completions, p))
1394     break;
1395     }
1396     }
1397 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1398 dl 1.1 Throwable ex = null;
1399 jsr166 1.2 if (r instanceof AltResult) {
1400 dl 1.1 if ((ex = ((AltResult)r).ex) != null)
1401 jsr166 1.2 dst.completeExceptionally(new RuntimeException(ex));
1402 dl 1.1 }
1403 jsr166 1.2 if (ex == null) {
1404 dl 1.1 try {
1405     if (executor != null)
1406     executor.execute(new AsyncRunnable(action, dst));
1407     else {
1408     action.run();
1409     dst.complete(null);
1410     }
1411     } catch (Throwable rex) {
1412     dst.completeExceptionally(rex);
1413     }
1414     }
1415 jsr166 1.2 postComplete();
1416     }
1417 dl 1.1 return dst;
1418     }
1419    
1420     private <U,V> CompletableFuture<V> andFunction(CompletableFuture<? extends U> other,
1421     BiFunction<? super T,? super U,? extends V> fn,
1422     Executor executor) {
1423     if (other == null || fn == null) throw new NullPointerException();
1424 jsr166 1.2 CompletableFuture<V> dst = new CompletableFuture<V>();
1425     AndFunction<T,U,V> d = null;
1426     Object r, s = null;
1427     if ((r = result) == null || (s = other.result) == null) {
1428 dl 1.1 d = new AndFunction<T,U,V>(this, other, fn, dst, executor);
1429     CompletionNode q = null, p = new CompletionNode(d);
1430     while ((r == null && (r = result) == null) ||
1431     (s == null && (s = other.result) == null)) {
1432     if (q != null) {
1433     if (s != null ||
1434     UNSAFE.compareAndSwapObject
1435     (other, COMPLETIONS, q.next = other.completions, q))
1436     break;
1437     }
1438     else if (r != null ||
1439     UNSAFE.compareAndSwapObject
1440     (this, COMPLETIONS, p.next = completions, p)) {
1441     if (s != null)
1442     break;
1443     q = new CompletionNode(d);
1444     }
1445     }
1446     }
1447 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1448     T t; U u; Throwable ex = null;
1449     if (r instanceof AltResult) {
1450 dl 1.1 if ((ex = ((AltResult)r).ex) != null)
1451 jsr166 1.2 dst.completeExceptionally(new RuntimeException(ex));
1452     t = null;
1453 dl 1.1 }
1454     else
1455 jsr166 1.2 t = (T) r;
1456 dl 1.1 if (ex != null)
1457     u = null;
1458     else if (s instanceof AltResult) {
1459     if ((ex = ((AltResult)s).ex) != null)
1460     dst.completeExceptionally(new RuntimeException(ex));
1461     u = null;
1462     }
1463     else
1464 jsr166 1.2 u = (U) s;
1465     if (ex == null) {
1466 dl 1.1 try {
1467     if (executor != null)
1468     executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1469     else
1470     dst.complete(fn.apply(t, u));
1471     } catch (Throwable rex) {
1472     dst.completeExceptionally(rex);
1473     }
1474     }
1475 jsr166 1.2 }
1476 dl 1.1 if (r != null)
1477     postComplete();
1478     if (s != null)
1479     other.postComplete();
1480 jsr166 1.2 return dst;
1481 dl 1.1 }
1482    
1483     private CompletableFuture<Void> andRunnable(CompletableFuture<?> other,
1484     Runnable action,
1485     Executor executor) {
1486     if (other == null || action == null) throw new NullPointerException();
1487 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1488     AndRunnable<T> d = null;
1489     Object r, s = null;
1490     if ((r = result) == null || (s = other.result) == null) {
1491 dl 1.1 d = new AndRunnable<T>(this, other, action, dst, executor);
1492     CompletionNode q = null, p = new CompletionNode(d);
1493     while ((r == null && (r = result) == null) ||
1494     (s == null && (s = other.result) == null)) {
1495     if (q != null) {
1496     if (s != null ||
1497     UNSAFE.compareAndSwapObject
1498     (other, COMPLETIONS, q.next = other.completions, q))
1499     break;
1500     }
1501     else if (r != null ||
1502     UNSAFE.compareAndSwapObject
1503     (this, COMPLETIONS, p.next = completions, p)) {
1504     if (s != null)
1505     break;
1506     q = new CompletionNode(d);
1507     }
1508     }
1509     }
1510 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1511     Throwable ex = null;
1512     if ((r instanceof AltResult) &&
1513 dl 1.1 (ex = ((AltResult)r).ex) != null)
1514     dst.completeExceptionally(new RuntimeException(ex));
1515     else if ((s instanceof AltResult) &&
1516     (ex = ((AltResult)s).ex) != null)
1517     dst.completeExceptionally(new RuntimeException(ex));
1518 jsr166 1.2 else {
1519 dl 1.1 try {
1520     if (executor != null)
1521     executor.execute(new AsyncRunnable(action, dst));
1522     else {
1523     action.run();
1524     dst.complete(null);
1525     }
1526     } catch (Throwable rex) {
1527     dst.completeExceptionally(rex);
1528     }
1529     }
1530 jsr166 1.2 }
1531 dl 1.1 if (r != null)
1532     postComplete();
1533     if (s != null)
1534     other.postComplete();
1535 jsr166 1.2 return dst;
1536 dl 1.1 }
1537    
1538     private <U> CompletableFuture<U> orFunction(CompletableFuture<? extends T> other,
1539     Function<? super T, U> fn,
1540     Executor executor) {
1541     if (other == null || fn == null) throw new NullPointerException();
1542 jsr166 1.2 CompletableFuture<U> dst = new CompletableFuture<U>();
1543     OrFunction<T,U> d = null;
1544     Object r;
1545     if ((r = result) == null && (r = other.result) == null) {
1546 dl 1.1 d = new OrFunction<T,U>(this, other, fn, dst, executor);
1547     CompletionNode q = null, p = new CompletionNode(d);
1548     while ((r = result) == null && (r = other.result) == null) {
1549     if (q != null) {
1550     if (UNSAFE.compareAndSwapObject
1551     (other, COMPLETIONS, q.next = other.completions, q))
1552     break;
1553     }
1554     else if (UNSAFE.compareAndSwapObject
1555     (this, COMPLETIONS, p.next = completions, p))
1556     q = new CompletionNode(d);
1557     }
1558 jsr166 1.2 }
1559     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1560     T t; Throwable ex = null;
1561     if (r instanceof AltResult) {
1562 dl 1.1 if ((ex = ((AltResult)r).ex) != null)
1563 jsr166 1.2 dst.completeExceptionally(new RuntimeException(ex));
1564     t = null;
1565 dl 1.1 }
1566     else
1567 jsr166 1.2 t = (T) r;
1568     if (ex == null) {
1569 dl 1.1 try {
1570     if (executor != null)
1571     executor.execute(new AsyncFunction(t, fn, dst));
1572     else
1573     dst.complete(fn.apply(t));
1574     } catch (Throwable rex) {
1575     dst.completeExceptionally(rex);
1576     }
1577     }
1578 jsr166 1.2 }
1579 dl 1.1 if (r != null) {
1580     if (result != null)
1581     postComplete();
1582     if (other.result != null)
1583     other.postComplete();
1584     }
1585 jsr166 1.2 return dst;
1586 dl 1.1 }
1587    
1588     private CompletableFuture<Void> orRunnable(CompletableFuture<?> other,
1589     Runnable action,
1590     Executor executor) {
1591     if (other == null || action == null) throw new NullPointerException();
1592 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1593     OrRunnable<T> d = null;
1594     Object r;
1595     if ((r = result) == null && (r = other.result) == null) {
1596 dl 1.1 d = new OrRunnable<T>(this, other, action, dst, executor);
1597     CompletionNode q = null, p = new CompletionNode(d);
1598     while ((r = result) == null && (r = other.result) == null) {
1599     if (q != null) {
1600     if (UNSAFE.compareAndSwapObject
1601     (other, COMPLETIONS, q.next = other.completions, q))
1602     break;
1603     }
1604     else if (UNSAFE.compareAndSwapObject
1605     (this, COMPLETIONS, p.next = completions, p))
1606     q = new CompletionNode(d);
1607     }
1608 jsr166 1.2 }
1609     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1610     Throwable ex = null;
1611     if ((r instanceof AltResult) &&
1612 dl 1.1 (ex = ((AltResult)r).ex) != null)
1613     dst.completeExceptionally(new RuntimeException(ex));
1614     else {
1615     try {
1616     if (executor != null)
1617     executor.execute(new AsyncRunnable(action, dst));
1618     else {
1619     action.run();
1620     dst.complete(null);
1621     }
1622     } catch (Throwable rex) {
1623     dst.completeExceptionally(rex);
1624     }
1625     }
1626 jsr166 1.2 }
1627 dl 1.1 if (r != null) {
1628     if (result != null)
1629     postComplete();
1630     if (other.result != null)
1631     other.postComplete();
1632     }
1633 jsr166 1.2 return dst;
1634 dl 1.1 }
1635    
1636     // Unsafe mechanics
1637     private static final sun.misc.Unsafe UNSAFE;
1638     private static final long RESULT;
1639     private static final long WAITERS;
1640     private static final long COMPLETIONS;
1641     static {
1642     try {
1643     UNSAFE = sun.misc.Unsafe.getUnsafe();
1644     Class<?> k = CompletableFuture.class;
1645     RESULT = UNSAFE.objectFieldOffset
1646     (k.getDeclaredField("result"));
1647     WAITERS = UNSAFE.objectFieldOffset
1648     (k.getDeclaredField("waiters"));
1649     COMPLETIONS = UNSAFE.objectFieldOffset
1650     (k.getDeclaredField("completions"));
1651     } catch (Exception e) {
1652     throw new Error(e);
1653     }
1654     }
1655    
1656     }