ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.5
Committed: Fri Dec 28 14:03:11 2012 UTC (11 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.4: +31 -37 lines
Log Message:
avoid bad casts in exception reporting

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Block;
9     import java.util.function.Supplier;
10     import java.util.function.Function;
11     import java.util.function.BiFunction;
12     import java.util.concurrent.Future;
13     import java.util.concurrent.TimeUnit;
14     import java.util.concurrent.ForkJoinPool;
15     import java.util.concurrent.ForkJoinTask;
16     import java.util.concurrent.Executor;
17 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
18 dl 1.1 import java.util.concurrent.ExecutionException;
19     import java.util.concurrent.TimeoutException;
20     import java.util.concurrent.CancellationException;
21     import java.util.concurrent.atomic.AtomicInteger;
22     import java.util.concurrent.locks.LockSupport;
23    
24    
25     /**
26     * A {@link Future} that may be explicitly completed (setting its
27     * value and status), and may include dependent functions and actions
28     * that trigger upon its completion.
29     *
30 jsr166 1.4 * <p>Similar methods are available for function-based usages in
31 dl 1.1 * which dependent stages typically propagate values, as well as
32     * result-less action-based usages, that are normally associated with
33     * {@code CompletableFuture<Void>} Futures. Functions and actions
34     * supplied for dependent completions using {@code then}, {@code
35     * andThen}, {@code orThen}, and {@code exceptionally} may be
36     * performed by the thread that completes the current
37     * CompletableFuture, or by any other caller of these methods. There
38     * are no guarantees about the order of processing completions unless
39     * constrained by method {@code then} and related methods.
40     *
41 jsr166 1.4 * <p>When two or more threads attempt to {@link #complete} or {@link
42 dl 1.1 * #completeExceptionally} a CompletableFuture, only one of them will
43 dl 1.5 * succeed. Upon exceptional completion, or when a completion entails
44     * computation of a function or action, and it terminates abruptly
45     * with an exception, then further completions act as {@code
46     * completeExceptionally} with that exception.
47 dl 1.1 *
48     * <p>CompletableFutures themselves do not execute asynchronously.
49     * However, the {@code async} methods provide commonly useful ways to
50     * to commence asynchronous processing, using either a given {@link
51     * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
52     * function or action that will result in the completion of a new
53     * CompletableFuture.
54     *
55     * @author Doug Lea
56     * @since 1.8
57     */
58     public class CompletableFuture<T> implements Future<T> {
59     /*
60     * Quick overview (more to come):
61     *
62     * 1. Non-nullness of field result indicates done. An AltResult is
63     * used to box null as a result, as well as to hold exceptions.
64     *
65     * 2. Waiters are held in a Treiber stack similar to the one used
66     * in FutureTask
67     *
68     * 3. Completions are also kept in a list/stack, and pulled off
69     * and run when completion is triggered.
70     */
71    
72     static final class AltResult {
73     final Throwable ex; // null only for NIL
74 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
75 dl 1.1 }
76    
77     static final AltResult NIL = new AltResult(null);
78    
79     /**
80     * Simple linked list nodes to record waiting threads in a Treiber
81     * stack. See other classes such as Phaser and SynchronousQueue
82     * for more detailed explanation.
83     */
84     static final class WaitNode {
85     volatile Thread thread;
86     volatile WaitNode next;
87     }
88    
89     /**
90     * Simple linked list nodes to record completions, used in
91     * basically the same way as WaitNodes
92     */
93     static final class CompletionNode {
94 jsr166 1.2 final Completion completion;
95 dl 1.1 volatile CompletionNode next;
96 jsr166 1.2 CompletionNode(Completion completion) { this.completion = completion; }
97 dl 1.1 }
98    
99    
100     volatile Object result; // either the result or boxed AltResult
101     volatile WaitNode waiters; // Treiber stack of threads blocked on get()
102     volatile CompletionNode completions; // list (Treiber stack) of completions
103    
104     /**
105     * Creates a new incomplete CompletableFuture.
106     */
107     public CompletableFuture() {
108     }
109    
110     /**
111     * Asynchronously executes in the {@link
112     * ForkJoinPool#commonPool()}, a task that completes the returned
113     * CompletableFuture with the result of the given Supplier.
114     *
115     * @param supplier a function returning the value to be used
116     * to complete the returned CompletableFuture.
117     * @return the CompletableFuture.
118     */
119     public static <U> CompletableFuture<U> async(Supplier<U> supplier) {
120     if (supplier == null) throw new NullPointerException();
121     CompletableFuture<U> f = new CompletableFuture<U>();
122     ForkJoinPool.commonPool().
123     execute((ForkJoinTask<?>)new AsyncSupplier(supplier, f));
124     return f;
125     }
126    
127     /**
128     * Asynchronously executes using the given executor, a task that
129     * completes the returned CompletableFuture with the result of the
130     * given Supplier.
131     *
132     * @param supplier a function returning the value to be used
133     * to complete the returned CompletableFuture.
134     * @param executor the executor to use for asynchronous execution
135     * @return the CompletableFuture.
136     */
137     public static <U> CompletableFuture<U> async(Supplier<U> supplier,
138     Executor executor) {
139     if (executor == null || supplier == null)
140     throw new NullPointerException();
141     CompletableFuture<U> f = new CompletableFuture<U>();
142     executor.execute(new AsyncSupplier(supplier, f));
143     return f;
144     }
145    
146     /**
147     * Asynchronously executes in the {@link
148     * ForkJoinPool#commonPool()} a task that runs the given action,
149     * and then completes the returned CompletableFuture
150     *
151     * @param runnable the action to run before completing the
152     * returned CompletableFuture.
153     * @return the CompletableFuture.
154     */
155     public static CompletableFuture<Void> async(Runnable runnable) {
156     if (runnable == null) throw new NullPointerException();
157     CompletableFuture<Void> f = new CompletableFuture<Void>();
158     ForkJoinPool.commonPool().
159     execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
160     return f;
161     }
162    
163     /**
164     * Asynchronously executes using the given executor, a task that
165     * runs the given action, and then completes the returned
166     * CompletableFuture
167     *
168     * @param runnable the action to run before completing the
169     * returned CompletableFuture.
170     * @param executor the executor to use for asynchronous execution
171     * @return the CompletableFuture.
172     */
173     public static CompletableFuture<Void> async(Runnable runnable,
174     Executor executor) {
175     if (executor == null || runnable == null)
176     throw new NullPointerException();
177     CompletableFuture<Void> f = new CompletableFuture<Void>();
178     executor.execute(new AsyncRunnable(runnable, f));
179     return f;
180     }
181    
182     /**
183     * Returns {@code true} if completed in any fashion: normally,
184     * exceptionally, or cancellation.
185     *
186     * @return {@code true} if completed
187     */
188     public boolean isDone() {
189     return result != null;
190     }
191    
192     /**
193     * Returns the result value when complete, or throws an
194     * (unchecked) exception if completed exceptionally. To better
195     * conform with the use of common functional forms, this method
196     * transforms any checked exception possible with {@link
197     * Future#get} into an (unchecked) {@link RuntimeException} with
198     * the underlying exception as its cause. (The checked exception
199     * convention is available using the timed form of get.)
200     *
201     * @return the result value
202     */
203     public T get() {
204     Object r; Throwable ex;
205 jsr166 1.2 if ((r = result) == null)
206     return waitingGet();
207     if (r instanceof AltResult) {
208 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
209     if (ex instanceof Error)
210     throw (Error)ex;
211     if (ex instanceof RuntimeException)
212     throw (RuntimeException)ex;
213     throw new RuntimeException(ex);
214     }
215 jsr166 1.2 return null;
216     }
217     return (T)r;
218 dl 1.1 }
219    
220     /**
221     * Returns the result value (or throws any encountered exception)
222     * if completed, else returns the given valueIfAbsent.
223     *
224     * @param valueIfAbsent the value to return if not completed
225     * @return the result value, if completed, else the given valueIfAbsent
226     */
227     public T getNow(T valueIfAbsent) {
228     Object r; Throwable ex;
229 jsr166 1.2 if ((r = result) == null)
230     return valueIfAbsent;
231     if (r instanceof AltResult) {
232 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
233     if (ex instanceof Error)
234     throw (Error)ex;
235     if (ex instanceof RuntimeException)
236     throw (RuntimeException)ex;
237     throw new RuntimeException(ex);
238     }
239 jsr166 1.2 return null;
240     }
241     return (T)r;
242 dl 1.1 }
243    
244     /**
245     * Waits if necessary for at most the given time for completion,
246     * and then retrieves its result, if available.
247     *
248     * @param timeout the maximum time to wait
249     * @param unit the time unit of the timeout argument
250     * @return the computed result
251     * @throws CancellationException if the computation was cancelled
252     * @throws ExecutionException if the computation threw an
253     * exception
254     * @throws InterruptedException if the current thread was interrupted
255     * while waiting
256     * @throws TimeoutException if the wait timed out
257     */
258     public T get(long timeout, TimeUnit unit)
259     throws InterruptedException, ExecutionException, TimeoutException {
260     Object r; Throwable ex;
261     long nanos = unit.toNanos(timeout);
262     if (Thread.interrupted())
263     throw new InterruptedException();
264 jsr166 1.2 if ((r = result) == null)
265     r = timedAwaitDone(nanos);
266     if (r instanceof AltResult) {
267 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
268     if (ex instanceof ExecutionException) // avoid re-wrap
269     throw (ExecutionException)ex;
270 dl 1.1 throw new ExecutionException(ex);
271 dl 1.5 }
272 jsr166 1.2 return null;
273     }
274     return (T)r;
275 dl 1.1 }
276    
277     /**
278     * If not already completed, sets the value returned by {@link
279     * #get()} and related methods to the given value.
280     *
281     * @param value the result value
282     * @return true if this invocation caused this CompletableFuture
283     * to transition to a completed state, else false.
284     */
285     public boolean complete(T value) {
286     if (result == null &&
287     UNSAFE.compareAndSwapObject(this, RESULT, null,
288 jsr166 1.2 (value == null) ? NIL : value)) {
289 dl 1.1 postComplete();
290     return true;
291     }
292     return false;
293     }
294    
295     /**
296     * If not already completed, causes invocations of {@link #get()}
297     * and related methods to throw the given exception.
298     *
299     * @param ex the exception
300     * @return true if this invocation caused this CompletableFuture
301     * to transition to a completed state, else false.
302     */
303     public boolean completeExceptionally(Throwable ex) {
304     if (ex == null) throw new NullPointerException();
305     if (result == null) {
306     Object r = new AltResult(ex);
307     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
308     postComplete();
309     return true;
310     }
311     }
312     return false;
313     }
314    
315     /**
316     * Creates and returns a CompletableFuture that is completed with
317     * the result of the given function of this CompletableFuture.
318     * If this CompletableFuture completes exceptionally,
319     * then the returned CompletableFuture also does so,
320     * with a RuntimeException having this exception as
321     * its cause.
322     *
323     * @param fn the function to use to compute the value of
324     * the returned CompletableFuture
325     * @return the new CompletableFuture
326     */
327     public <U> CompletableFuture<U> then(Function<? super T,? extends U> fn) {
328     return thenFunction(fn, null);
329     }
330    
331     /**
332     * Creates and returns a CompletableFuture that is asynchronously
333     * completed using the {@link ForkJoinPool#commonPool()} with the
334     * result of the given function of this CompletableFuture. If
335     * this CompletableFuture completes exceptionally, then the
336     * returned CompletableFuture also does so, with a
337     * RuntimeException having this exception as its cause.
338     *
339     * @param fn the function to use to compute the value of
340     * the returned CompletableFuture
341     * @return the new CompletableFuture
342     */
343     public <U> CompletableFuture<U> thenAsync(Function<? super T,? extends U> fn) {
344     return thenFunction(fn, ForkJoinPool.commonPool());
345     }
346    
347     /**
348     * Creates and returns a CompletableFuture that is asynchronously
349     * completed using the given executor with the result of the given
350     * function of this CompletableFuture. If this CompletableFuture
351     * completes exceptionally, then the returned CompletableFuture
352     * also does so, with a RuntimeException having this exception as
353     * its cause.
354     *
355     * @param fn the function to use to compute the value of
356     * the returned CompletableFuture
357     * @param executor the executor to use for asynchronous execution
358     * @return the new CompletableFuture
359     */
360     public <U> CompletableFuture<U> thenAsync(Function<? super T,? extends U> fn,
361     Executor executor) {
362     if (executor == null) throw new NullPointerException();
363     return thenFunction(fn, executor);
364     }
365    
366     /**
367     * Creates and returns a CompletableFuture that is completed after
368     * performing the given action if/when this CompletableFuture
369     * completes. If this CompletableFuture completes exceptionally,
370     * then the returned CompletableFuture also does so, with a
371     * RuntimeException having this exception as its cause.
372     *
373     * @param action the action to perform before completing the
374     * returned CompletableFuture
375     * @return the new CompletableFuture
376     */
377     public CompletableFuture<Void> then(Runnable action) {
378     return thenRunnable(action, null);
379     }
380    
381     /**
382     * Creates and returns a CompletableFuture that is asynchronously
383     * completed using the {@link ForkJoinPool#commonPool()} after
384     * performing the given action if/when this CompletableFuture
385     * completes. If this CompletableFuture completes exceptionally,
386     * then the returned CompletableFuture also does so, with a
387     * RuntimeException having this exception as its cause.
388     *
389     * @param action the action to perform before completing the
390     * returned CompletableFuture
391     * @return the new CompletableFuture
392     */
393     public CompletableFuture<Void> thenAsync(Runnable action) {
394     return thenRunnable(action, ForkJoinPool.commonPool());
395     }
396    
397     /**
398     * Creates and returns a CompletableFuture that is asynchronously
399     * completed using the given executor after performing the given
400     * action if/when this CompletableFuture completes. If this
401     * CompletableFuture completes exceptionally, then the returned
402     * CompletableFuture also does so, with a RuntimeException having
403     * this exception as its cause.
404     *
405     * @param action the action to perform before completing the
406     * returned CompletableFuture
407     * @param executor the executor to use for asynchronous execution
408     * @return the new CompletableFuture
409     */
410     public CompletableFuture<Void> thenAsync(Runnable action, Executor executor) {
411     if (executor == null) throw new NullPointerException();
412     return thenRunnable(action, executor);
413     }
414    
415     /**
416     * Creates and returns a CompletableFuture that is completed with
417     * the result of the given function of this and the other given
418     * CompletableFuture's results if/when both complete. If this or
419     * the other CompletableFuture complete exceptionally, then the
420     * returned CompletableFuture also does so, with a
421     * RuntimeException having the exception as its cause.
422     *
423     * @param other the other CompletableFuture
424     * @param fn the function to use to compute the value of
425     * the returned CompletableFuture
426     * @return the new CompletableFuture
427     */
428     public <U,V> CompletableFuture<V> andThen(CompletableFuture<? extends U> other,
429     BiFunction<? super T,? super U,? extends V> fn) {
430     return andFunction(other, fn, null);
431     }
432    
433     /**
434 jsr166 1.3 * Creates and returns a CompletableFuture that is asynchronously
435 dl 1.1 * completed using the {@link ForkJoinPool#commonPool()} with
436     * the result of the given function of this and the other given
437     * CompletableFuture's results if/when both complete. If this or
438     * the other CompletableFuture complete exceptionally, then the
439     * returned CompletableFuture also does so, with a
440     * RuntimeException having the exception as its cause.
441     *
442     * @param other the other CompletableFuture
443     * @param fn the function to use to compute the value of
444     * the returned CompletableFuture
445     * @return the new CompletableFuture
446     */
447     public <U,V> CompletableFuture<V> andThenAsync(CompletableFuture<? extends U> other,
448     BiFunction<? super T,? super U,? extends V> fn) {
449     return andFunction(other, fn, ForkJoinPool.commonPool());
450     }
451    
452     /**
453 jsr166 1.3 * Creates and returns a CompletableFuture that is
454 dl 1.1 * asynchronously completed using the given executor with the
455     * result of the given function of this and the other given
456     * CompletableFuture's results if/when both complete. If this or
457     * the other CompletableFuture complete exceptionally, then the
458     * returned CompletableFuture also does so, with a
459     * RuntimeException having the exception as its cause.
460     *
461     * @param other the other CompletableFuture
462     * @param fn the function to use to compute the value of
463     * the returned CompletableFuture
464     * @param executor the executor to use for asynchronous execution
465     * @return the new CompletableFuture
466     */
467    
468     public <U,V> CompletableFuture<V> andThenAsync(CompletableFuture<? extends U> other,
469     BiFunction<? super T,? super U,? extends V> fn,
470     Executor executor) {
471     if (executor == null) throw new NullPointerException();
472     return andFunction(other, fn, executor);
473     }
474    
475     /**
476     * Creates and returns a CompletableFuture that is completed
477     * if/when this and the other given CompletableFuture both
478     * complete. If this and/or the other CompletableFuture complete
479     * exceptionally, then the returned CompletableFuture also does
480     * so, with a RuntimeException having the one of the exceptions as
481     * its cause.
482     *
483     * @param other the other CompletableFuture
484     * @param action the action to perform before completing the
485     * returned CompletableFuture
486     * @return the new CompletableFuture
487     */
488     public CompletableFuture<Void> andThen(CompletableFuture<?> other,
489     Runnable action) {
490     return andRunnable(other, action, null);
491     }
492    
493     /**
494     * Creates and returns a CompletableFuture that is completed
495     * asynchronously using the {@link ForkJoinPool#commonPool()}
496     * if/when this and the other given CompletableFuture both
497     * complete. If this and/or the other CompletableFuture complete
498     * exceptionally, then the returned CompletableFuture also does
499     * so, with a RuntimeException having the one of the exceptions as
500     * its cause.
501     *
502     * @param other the other CompletableFuture
503     * @param action the action to perform before completing the
504     * returned CompletableFuture
505     * @return the new CompletableFuture
506     */
507     public CompletableFuture<Void> andThenAsync(CompletableFuture<?> other,
508     Runnable action) {
509     return andRunnable(other, action, ForkJoinPool.commonPool());
510     }
511    
512     /**
513     * Creates and returns a CompletableFuture that is completed
514     * asynchronously using the given executor
515     * if/when this and the other given CompletableFuture both
516     * complete. If this and/or the other CompletableFuture complete
517     * exceptionally, then the returned CompletableFuture also does
518     * so, with a RuntimeException having the one of the exceptions as
519     * its cause.
520     *
521     * @param other the other CompletableFuture
522     * @param action the action to perform before completing the
523     * returned CompletableFuture
524     * @param executor the executor to use for asynchronous execution
525     * @return the new CompletableFuture
526     */
527     public CompletableFuture<Void> andThenAsync(CompletableFuture<?> other,
528     Runnable action,
529     Executor executor) {
530     if (executor == null) throw new NullPointerException();
531     return andRunnable(other, action, executor);
532     }
533    
534     /**
535     * Creates and returns a CompletableFuture that is completed with
536     * the result of the given function of either this or the other
537     * given CompletableFuture's results if/when either complete. If
538     * this and/or the other CompletableFuture complete exceptionally,
539     * then the returned CompletableFuture may also do so, with a
540     * RuntimeException having one of these exceptions as its cause.
541     * No guarantees are made about which result or exception is used
542     * in the returned CompletableFuture.
543     *
544     * @param other the other CompletableFuture
545     * @param fn the function to use to compute the value of
546     * the returned CompletableFuture
547     * @return the new CompletableFuture
548     */
549     public <U> CompletableFuture<U> orThen(CompletableFuture<? extends T> other,
550     Function<? super T, U> fn) {
551     return orFunction(other, fn, null);
552     }
553    
554     /**
555     * Creates and returns a CompletableFuture that is completed
556     * asynchronously using the {@link ForkJoinPool#commonPool()} with
557     * the result of the given function of either this or the other
558     * given CompletableFuture's results if/when either complete. If
559     * this and/or the other CompletableFuture complete exceptionally,
560     * then the returned CompletableFuture may also do so, with a
561     * RuntimeException having one of these exceptions as its cause.
562     * No guarantees are made about which result or exception is used
563     * in the returned CompletableFuture.
564     *
565     * @param other the other CompletableFuture
566     * @param fn the function to use to compute the value of
567     * the returned CompletableFuture
568     * @return the new CompletableFuture
569     */
570     public <U> CompletableFuture<U> orThenAsync(CompletableFuture<? extends T> other,
571     Function<? super T, U> fn) {
572     return orFunction(other, fn, ForkJoinPool.commonPool());
573     }
574    
575     /**
576     * Creates and returns a CompletableFuture that is completed
577     * asynchronously using the given executor with the result of the
578     * given function of either this or the other given
579     * CompletableFuture's results if/when either complete. If this
580     * and/or the other CompletableFuture complete exceptionally, then
581     * the returned CompletableFuture may also do so, with a
582     * RuntimeException having one of these exceptions as its cause.
583     * No guarantees are made about which result or exception is used
584     * in the returned CompletableFuture.
585     *
586     * @param other the other CompletableFuture
587     * @param fn the function to use to compute the value of
588     * the returned CompletableFuture
589     * @param executor the executor to use for asynchronous execution
590     * @return the new CompletableFuture
591     */
592     public <U> CompletableFuture<U> orThen(CompletableFuture<? extends T> other,
593     Function<? super T, U> fn,
594     Executor executor) {
595     if (executor == null) throw new NullPointerException();
596     return orFunction(other, fn, executor);
597     }
598    
599     /**
600     * Creates and returns a CompletableFuture that is completed
601     * after this or the other given CompletableFuture complete. If
602     * this and/or the other CompletableFuture complete exceptionally,
603     * then the returned CompletableFuture may also do so, with a
604     * RuntimeException having one of these exceptions as its cause.
605     * No guarantees are made about which exception is used in the
606     * returned CompletableFuture.
607     *
608     * @param other the other CompletableFuture
609     * @param action the action to perform before completing the
610     * returned CompletableFuture
611     * @return the new CompletableFuture
612     */
613     public CompletableFuture<Void> orThen(CompletableFuture<?> other,
614     Runnable action) {
615     return orRunnable(other, action, null);
616     }
617    
618     /**
619     * Creates and returns a CompletableFuture that is completed
620     * asynchronously using the {@link ForkJoinPool#commonPool()}
621     * after this or the other given CompletableFuture complete. If
622     * this and/or the other CompletableFuture complete exceptionally,
623     * then the returned CompletableFuture may also do so, with a
624     * RuntimeException having one of these exceptions as its cause.
625     * No guarantees are made about which exception is used in the
626     * returned CompletableFuture.
627     *
628     * @param other the other CompletableFuture
629     * @param action the action to perform before completing the
630     * returned CompletableFuture
631     * @return the new CompletableFuture
632     */
633     public CompletableFuture<Void> orThenAsync(CompletableFuture<?> other,
634     Runnable action) {
635     return orRunnable(other, action, ForkJoinPool.commonPool());
636     }
637    
638     /**
639     * Creates and returns a CompletableFuture that is completed
640     * asynchronously using the given executor after this or the other
641     * given CompletableFuture complete. If this and/or the other
642     * CompletableFuture complete exceptionally, then the returned
643     * CompletableFuture may also do so, with a RuntimeException
644     * having one of these exceptions as its cause. No guarantees are
645     * made about which exception is used in the returned
646     * CompletableFuture.
647     *
648     * @param other the other CompletableFuture
649     * @param action the action to perform before completing the
650     * returned CompletableFuture
651     * @param executor the executor to use for asynchronous execution
652     * @return the new CompletableFuture
653     */
654     public CompletableFuture<Void> orThenAsync(CompletableFuture<?> other,
655     Runnable action,
656     Executor executor) {
657     if (executor == null) throw new NullPointerException();
658     return orRunnable(other, action, executor);
659     }
660    
661     /**
662     * Creates and returns a CompletableFuture that is completed after
663     * performing the given action with the exception triggering this
664     * CompletableFuture's completion if/when it completes
665     * exceptionally.
666     *
667     * @param action the action to perform before completing the
668     * returned CompletableFuture
669     * @return the new CompletableFuture
670     */
671     public CompletableFuture<Void> exceptionally(Block<Throwable> action) {
672     if (action == null) throw new NullPointerException();
673     CompletableFuture<Void> dst = new CompletableFuture<Void>();
674     ExceptionAction<T> d = null;
675 jsr166 1.2 Object r; Throwable ex;
676     if ((r = result) == null) {
677 dl 1.1 CompletionNode p =
678     new CompletionNode(d = new ExceptionAction<T>(this, action, dst));
679     while ((r = result) == null) {
680     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
681     p.next = completions, p))
682     break;
683     }
684     }
685 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1)) &&
686 dl 1.1 (r instanceof AltResult) && (ex = ((AltResult)r).ex) != null) {
687     try {
688     action.accept(ex);
689     dst.complete(null);
690     } catch (Throwable rex) {
691     dst.completeExceptionally(rex);
692     }
693     }
694     if (r != null)
695     postComplete();
696     return dst;
697     }
698    
699     /**
700     * Attempts to complete this CompletableFuture with
701     * a {@link CancellationException}.
702     *
703     * @param mayInterruptIfRunning this value has no effect in this
704     * implementation because interrupts are not used to control
705     * processing.
706     *
707     * @return {@code true} if this task is now cancelled
708     */
709     public boolean cancel(boolean mayInterruptIfRunning) {
710     Object r;
711     while ((r = result) == null) {
712     r = new AltResult(new CancellationException());
713     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
714     postComplete();
715     return true;
716     }
717     }
718     return ((r instanceof AltResult) &&
719     (((AltResult)r).ex instanceof CancellationException));
720     }
721    
722     /**
723     * Returns {@code true} if this CompletableFuture was cancelled
724     * before it completed normally.
725     *
726     * @return {@code true} if this CompletableFuture was cancelled
727     * before it completed normally
728     */
729     public boolean isCancelled() {
730     Object r;
731     return ((r = result) != null &&
732     (r instanceof AltResult) &&
733     (((AltResult)r).ex instanceof CancellationException));
734     }
735    
736     /**
737     * Whether or not already completed, sets the value subsequently
738     * returned by method get() and related methods to the given
739     * value. This method is designed for use in error recovery
740     * actions, and is very unlikely to be useful otherwise.
741     *
742     * @param value the completion value
743     */
744     public void force(T value) {
745 jsr166 1.2 result = (value == null) ? NIL : value;
746 dl 1.1 postComplete();
747     }
748    
749     /**
750     * Removes and signals all waiting threads and runs all completions
751     */
752     private void postComplete() {
753     WaitNode q; Thread t;
754     while ((q = waiters) != null) {
755     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
756     (t = q.thread) != null) {
757     q.thread = null;
758     LockSupport.unpark(t);
759     }
760     }
761    
762     CompletionNode h; Completion c;
763     while ((h = completions) != null) {
764     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
765     (c = h.completion) != null)
766     c.run();
767     }
768     }
769    
770     /* ------------- waiting for completions -------------- */
771    
772 jsr166 1.2 /**
773 dl 1.1 * Heuristic spin value for waitingGet() before blocking on
774     * multiprocessors
775     */
776     static final int WAITING_GET_SPINS = 256;
777    
778     /**
779     * Returns result after waiting.
780     */
781     private T waitingGet() {
782     WaitNode q = null;
783     boolean queued = false, interrupted = false;
784     int h = 0, spins = 0;
785     for (Object r;;) {
786     if ((r = result) != null) {
787 dl 1.5 Throwable ex;
788 dl 1.1 if (q != null) // suppress unpark
789     q.thread = null;
790     postComplete(); // help release others
791     if (interrupted)
792     Thread.currentThread().interrupt();
793     if (r instanceof AltResult) {
794 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
795     if (ex instanceof Error)
796     throw (Error)ex;
797     if (ex instanceof RuntimeException)
798     throw (RuntimeException)ex;
799     throw new RuntimeException(ex);
800     }
801 dl 1.1 return null;
802     }
803     return (T)r;
804     }
805     else if (h == 0) {
806     h = ThreadLocalRandom.current().nextInt();
807     if (Runtime.getRuntime().availableProcessors() > 1)
808     spins = WAITING_GET_SPINS;
809     }
810     else if (spins > 0) {
811     h ^= h << 1; // xorshift
812 jsr166 1.2 h ^= h >>> 3;
813 dl 1.1 if ((h ^= h << 10) >= 0)
814     --spins;
815 jsr166 1.2 }
816 dl 1.1 else if (q == null)
817     q = new WaitNode();
818     else if (!queued)
819     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
820     q.next = waiters, q);
821     else if (Thread.interrupted())
822     interrupted = true;
823     else if (q.thread == null)
824     q.thread = Thread.currentThread();
825     else
826     LockSupport.park(this);
827     }
828     }
829    
830     /**
831     * Awaits completion or aborts on interrupt or timeout.
832     *
833     * @param nanos time to wait
834     * @return raw result
835     */
836     private Object timedAwaitDone(long nanos)
837     throws InterruptedException, TimeoutException {
838     final long deadline = System.nanoTime() + nanos;
839     WaitNode q = null;
840     boolean queued = false;
841     for (Object r;;) {
842     if (Thread.interrupted()) {
843     removeWaiter(q);
844     throw new InterruptedException();
845     }
846     else if ((r = result) != null) {
847     if (q != null)
848     q.thread = null;
849     postComplete();
850     return r;
851     }
852     else if (q == null)
853     q = new WaitNode();
854     else if (!queued)
855     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
856     q.next = waiters, q);
857     else if ((nanos = deadline - System.nanoTime()) <= 0L) {
858     removeWaiter(q);
859     throw new TimeoutException();
860     }
861     else if (q.thread == null)
862     q.thread = Thread.currentThread();
863     else
864     LockSupport.parkNanos(this, nanos);
865     }
866     }
867    
868     /**
869     * Tries to unlink a timed-out or interrupted wait node to avoid
870     * accumulating garbage. Internal nodes are simply unspliced
871     * without CAS since it is harmless if they are traversed anyway
872     * by releasers. To avoid effects of unsplicing from already
873     * removed nodes, the list is retraversed in case of an apparent
874     * race. This is slow when there are a lot of nodes, but we don't
875     * expect lists to be long enough to outweigh higher-overhead
876     * schemes.
877     */
878     private void removeWaiter(WaitNode node) {
879     if (node != null) {
880     node.thread = null;
881     retry:
882     for (;;) { // restart on removeWaiter race
883     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
884     s = q.next;
885     if (q.thread != null)
886     pred = q;
887     else if (pred != null) {
888     pred.next = s;
889     if (pred.thread == null) // check for race
890     continue retry;
891     }
892     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
893     continue retry;
894     }
895     break;
896     }
897     }
898     }
899    
900     /* ------------- Async tasks -------------- */
901    
902     /** Base class can act as either FJ or plain Runnable */
903     static abstract class Async extends ForkJoinTask<Void> implements Runnable {
904     public final Void getRawResult() { return null; }
905     public final void setRawResult(Void v) { }
906     public final void run() { exec(); }
907     }
908    
909     static final class AsyncRunnable extends Async {
910     final Runnable runnable;
911     final CompletableFuture<Void> dst;
912     AsyncRunnable(Runnable runnable, CompletableFuture<Void> dst) {
913     this.runnable = runnable; this.dst = dst;
914     }
915     public final boolean exec() {
916     Runnable fn;
917     CompletableFuture<Void> d;
918     if ((fn = this.runnable) == null || (d = this.dst) == null)
919     throw new NullPointerException();
920     try {
921     fn.run();
922     d.complete(null);
923     } catch (Throwable ex) {
924     d.completeExceptionally(ex);
925     }
926     return true;
927     }
928     private static final long serialVersionUID = 5232453952276885070L;
929     }
930    
931     static final class AsyncSupplier<U> extends Async {
932     final Supplier<U> supplier;
933     final CompletableFuture<U> dst;
934     AsyncSupplier(Supplier<U> supplier, CompletableFuture<U> dst) {
935     this.supplier = supplier; this.dst = dst;
936     }
937     public final boolean exec() {
938     Supplier<U> fn;
939     CompletableFuture<U> d;
940     if ((fn = this.supplier) == null || (d = this.dst) == null)
941     throw new NullPointerException();
942     try {
943     d.complete(fn.get());
944     } catch (Throwable ex) {
945     d.completeExceptionally(ex);
946     }
947     return true;
948     }
949     private static final long serialVersionUID = 5232453952276885070L;
950     }
951    
952     static final class AsyncFunction<T,U> extends Async {
953     Function<? super T,? extends U> fn;
954     T arg;
955     final CompletableFuture<U> dst;
956     AsyncFunction(T arg, Function<? super T,? extends U> fn,
957     CompletableFuture<U> dst) {
958     this.arg = arg; this.fn = fn; this.dst = dst;
959     }
960     public final boolean exec() {
961     Function<? super T,? extends U> fn;
962     CompletableFuture<U> d;
963     if ((fn = this.fn) == null || (d = this.dst) == null)
964     throw new NullPointerException();
965     try {
966     d.complete(fn.apply(arg));
967     } catch (Throwable ex) {
968     d.completeExceptionally(ex);
969     }
970     return true;
971     }
972     private static final long serialVersionUID = 5232453952276885070L;
973     }
974    
975     static final class AsyncBiFunction<T,U,V> extends Async {
976     final BiFunction<? super T,? super U,? extends V> fn;
977     final T arg1;
978     final U arg2;
979     final CompletableFuture<V> dst;
980     AsyncBiFunction(T arg1, U arg2,
981     BiFunction<? super T,? super U,? extends V> fn,
982     CompletableFuture<V> dst) {
983     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
984     }
985     public final boolean exec() {
986     BiFunction<? super T,? super U,? extends V> fn;
987     CompletableFuture<V> d;
988     if ((fn = this.fn) == null || (d = this.dst) == null)
989     throw new NullPointerException();
990     try {
991     d.complete(fn.apply(arg1, arg2));
992     } catch (Throwable ex) {
993     d.completeExceptionally(ex);
994     }
995     return true;
996     }
997     private static final long serialVersionUID = 5232453952276885070L;
998     }
999    
1000     /* ------------- Completions -------------- */
1001    
1002     // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1003     static abstract class Completion extends AtomicInteger implements Runnable {
1004     }
1005    
1006     static final class ThenFunction<T,U> extends Completion {
1007 jsr166 1.2 final CompletableFuture<? extends T> src;
1008     final Function<? super T,? extends U> fn;
1009     final CompletableFuture<U> dst;
1010 dl 1.1 final Executor executor;
1011     ThenFunction(CompletableFuture<? extends T> src,
1012     final Function<? super T,? extends U> fn,
1013     final CompletableFuture<U> dst, Executor executor) {
1014     this.src = src; this.fn = fn; this.dst = dst;
1015     this.executor = executor;
1016     }
1017 jsr166 1.2 public void run() {
1018 dl 1.1 CompletableFuture<? extends T> a;
1019     Function<? super T,? extends U> fn;
1020     CompletableFuture<U> dst;
1021 jsr166 1.2 Object r; T t; Throwable ex;
1022     if ((dst = this.dst) != null &&
1023 dl 1.1 (fn = this.fn) != null &&
1024     (a = this.src) != null &&
1025     (r = a.result) != null &&
1026     compareAndSet(0, 1)) {
1027 jsr166 1.2 if (r instanceof AltResult) {
1028 dl 1.1 if ((ex = ((AltResult)r).ex) != null) {
1029     dst.completeExceptionally(new RuntimeException(ex));
1030     return;
1031     }
1032     t = null;
1033     }
1034     else
1035     t = (T) r;
1036     try {
1037     if (executor != null)
1038     executor.execute(new AsyncFunction(t, fn, dst));
1039     else
1040     dst.complete(fn.apply(t));
1041     } catch (Throwable rex) {
1042     dst.completeExceptionally(rex);
1043     }
1044     }
1045     }
1046     }
1047    
1048     static final class ThenRunnable<T> extends Completion {
1049 jsr166 1.2 final CompletableFuture<? extends T> src;
1050     final Runnable fn;
1051     final CompletableFuture<Void> dst;
1052 dl 1.1 final Executor executor;
1053     ThenRunnable(CompletableFuture<? extends T> src,
1054     Runnable fn,
1055     CompletableFuture<Void> dst,
1056     Executor executor) {
1057     this.src = src; this.fn = fn; this.dst = dst;
1058     this.executor = executor;
1059     }
1060 jsr166 1.2 public void run() {
1061 dl 1.1 CompletableFuture<? extends T> a;
1062     Runnable fn;
1063     CompletableFuture<Void> dst;
1064 jsr166 1.2 Object r; Throwable ex;
1065     if ((dst = this.dst) != null &&
1066 dl 1.1 (fn = this.fn) != null &&
1067     (a = this.src) != null &&
1068     (r = a.result) != null &&
1069     compareAndSet(0, 1)) {
1070 jsr166 1.2 if (r instanceof AltResult) {
1071 dl 1.1 if ((ex = ((AltResult)r).ex) != null) {
1072     dst.completeExceptionally(new RuntimeException(ex));
1073     return;
1074     }
1075     }
1076     try {
1077     if (executor != null)
1078     executor.execute(new AsyncRunnable(fn, dst));
1079     else {
1080     fn.run();
1081     dst.complete(null);
1082     }
1083     } catch (Throwable rex) {
1084     dst.completeExceptionally(rex);
1085     }
1086     }
1087     }
1088     }
1089    
1090     static final class AndFunction<T,U,V> extends Completion {
1091 jsr166 1.2 final CompletableFuture<? extends T> src;
1092     final CompletableFuture<? extends U> snd;
1093     final BiFunction<? super T,? super U,? extends V> fn;
1094     final CompletableFuture<V> dst;
1095 dl 1.1 final Executor executor;
1096     AndFunction(CompletableFuture<? extends T> src,
1097     CompletableFuture<? extends U> snd,
1098     BiFunction<? super T,? super U,? extends V> fn,
1099     CompletableFuture<V> dst, Executor executor) {
1100     this.src = src; this.snd = snd;
1101     this.fn = fn; this.dst = dst;
1102     this.executor = executor;
1103     }
1104 jsr166 1.2 public void run() {
1105     Object r, s; T t; U u; Throwable ex;
1106 dl 1.1 CompletableFuture<? extends T> a;
1107     CompletableFuture<? extends U> b;
1108     BiFunction<? super T,? super U,? extends V> fn;
1109     CompletableFuture<V> dst;
1110 jsr166 1.2 if ((dst = this.dst) != null &&
1111 dl 1.1 (fn = this.fn) != null &&
1112     (a = this.src) != null &&
1113     (r = a.result) != null &&
1114     (b = this.snd) != null &&
1115     (s = b.result) != null &&
1116     compareAndSet(0, 1)) {
1117 jsr166 1.2 if (r instanceof AltResult) {
1118 dl 1.1 if ((ex = ((AltResult)r).ex) != null) {
1119     dst.completeExceptionally(new RuntimeException(ex));
1120     return;
1121     }
1122 jsr166 1.2 t = null;
1123     }
1124     else
1125     t = (T) r;
1126     if (s instanceof AltResult) {
1127 dl 1.1 if ((ex = ((AltResult)s).ex) != null) {
1128     dst.completeExceptionally(new RuntimeException(ex));
1129     return;
1130     }
1131 jsr166 1.2 u = null;
1132     }
1133     else
1134     u = (U) s;
1135 dl 1.1 try {
1136     if (executor != null)
1137     executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1138     else
1139     dst.complete(fn.apply(t, u));
1140     } catch (Throwable rex) {
1141     dst.completeExceptionally(rex);
1142     }
1143 jsr166 1.2 }
1144     }
1145 dl 1.1 }
1146    
1147     static final class AndRunnable<T> extends Completion {
1148 jsr166 1.2 final CompletableFuture<? extends T> src;
1149     final CompletableFuture<?> snd;
1150     final Runnable fn;
1151     final CompletableFuture<Void> dst;
1152 dl 1.1 final Executor executor;
1153     AndRunnable(CompletableFuture<? extends T> src,
1154     CompletableFuture<?> snd,
1155     Runnable fn,
1156     CompletableFuture<Void> dst, Executor executor) {
1157     this.src = src; this.snd = snd;
1158     this.fn = fn; this.dst = dst;
1159     this.executor = executor;
1160     }
1161 jsr166 1.2 public void run() {
1162     Object r, s; Throwable ex;
1163 dl 1.1 final CompletableFuture<? extends T> a;
1164     final CompletableFuture<?> b;
1165     final Runnable fn;
1166     final CompletableFuture<Void> dst;
1167 jsr166 1.2 if ((dst = this.dst) != null &&
1168 dl 1.1 (fn = this.fn) != null &&
1169     (a = this.src) != null &&
1170     (r = a.result) != null &&
1171     (b = this.snd) != null &&
1172     (s = b.result) != null &&
1173     compareAndSet(0, 1)) {
1174 jsr166 1.2 if (r instanceof AltResult) {
1175 dl 1.1 if ((ex = ((AltResult)r).ex) != null) {
1176     dst.completeExceptionally(new RuntimeException(ex));
1177     return;
1178     }
1179 jsr166 1.2 }
1180     if (s instanceof AltResult) {
1181 dl 1.1 if ((ex = ((AltResult)s).ex) != null) {
1182     dst.completeExceptionally(new RuntimeException(ex));
1183     return;
1184     }
1185 jsr166 1.2 }
1186 dl 1.1 try {
1187     if (executor != null)
1188     executor.execute(new AsyncRunnable(fn, dst));
1189     else {
1190     fn.run();
1191     dst.complete(null);
1192     }
1193     } catch (Throwable rex) {
1194     dst.completeExceptionally(rex);
1195     }
1196 jsr166 1.2 }
1197     }
1198 dl 1.1 }
1199    
1200     static final class OrFunction<T,U> extends Completion {
1201 jsr166 1.2 final CompletableFuture<? extends T> src;
1202     final CompletableFuture<? extends T> snd;
1203     final Function<? super T,? extends U> fn;
1204     final CompletableFuture<U> dst;
1205 dl 1.1 final Executor executor;
1206     OrFunction(CompletableFuture<? extends T> src,
1207     CompletableFuture<? extends T> snd,
1208     Function<? super T,? extends U> fn,
1209     CompletableFuture<U> dst, Executor executor) {
1210     this.src = src; this.snd = snd;
1211     this.fn = fn; this.dst = dst;
1212     this.executor = executor;
1213     }
1214 jsr166 1.2 public void run() {
1215     Object r; T t; Throwable ex;
1216 dl 1.1 CompletableFuture<? extends T> a;
1217     CompletableFuture<? extends T> b;
1218     Function<? super T,? extends U> fn;
1219     CompletableFuture<U> dst;
1220 jsr166 1.2 if ((dst = this.dst) != null &&
1221 dl 1.1 (fn = this.fn) != null &&
1222     (((a = this.src) != null && (r = a.result) != null) ||
1223     ((b = this.snd) != null && (r = b.result) != null)) &&
1224     compareAndSet(0, 1)) {
1225 jsr166 1.2 if (r instanceof AltResult) {
1226 dl 1.1 if ((ex = ((AltResult)r).ex) != null) {
1227     dst.completeExceptionally(new RuntimeException(ex));
1228     return;
1229     }
1230 jsr166 1.2 t = null;
1231     }
1232     else
1233     t = (T) r;
1234 dl 1.1 try {
1235     if (executor != null)
1236     executor.execute(new AsyncFunction(t, fn, dst));
1237     else
1238     dst.complete(fn.apply(t));
1239     } catch (Throwable rex) {
1240     dst.completeExceptionally(rex);
1241     }
1242 jsr166 1.2 }
1243     }
1244 dl 1.1 }
1245    
1246     static final class OrRunnable<T> extends Completion {
1247 jsr166 1.2 final CompletableFuture<? extends T> src;
1248     final CompletableFuture<?> snd;
1249     final Runnable fn;
1250     final CompletableFuture<Void> dst;
1251 dl 1.1 final Executor executor;
1252     OrRunnable(CompletableFuture<? extends T> src,
1253     CompletableFuture<?> snd,
1254     Runnable fn,
1255     CompletableFuture<Void> dst, Executor executor) {
1256     this.src = src; this.snd = snd;
1257     this.fn = fn; this.dst = dst;
1258     this.executor = executor;
1259     }
1260 jsr166 1.2 public void run() {
1261     Object r; Throwable ex;
1262 dl 1.1 CompletableFuture<? extends T> a;
1263     final CompletableFuture<?> b;
1264     final Runnable fn;
1265     final CompletableFuture<Void> dst;
1266 jsr166 1.2 if ((dst = this.dst) != null &&
1267 dl 1.1 (fn = this.fn) != null &&
1268     (((a = this.src) != null && (r = a.result) != null) ||
1269     ((b = this.snd) != null && (r = b.result) != null)) &&
1270     compareAndSet(0, 1)) {
1271 jsr166 1.2 if ((r instanceof AltResult) &&
1272 dl 1.1 (ex = ((AltResult)r).ex) != null) {
1273     dst.completeExceptionally(new RuntimeException(ex));
1274     }
1275 jsr166 1.2 else {
1276 dl 1.1 try {
1277     if (executor != null)
1278     executor.execute(new AsyncRunnable(fn, dst));
1279     else {
1280     fn.run();
1281     dst.complete(null);
1282     }
1283     } catch (Throwable rex) {
1284     dst.completeExceptionally(rex);
1285     }
1286     }
1287 jsr166 1.2 }
1288     }
1289 dl 1.1 }
1290    
1291     static final class ExceptionAction<T> extends Completion {
1292 jsr166 1.2 final CompletableFuture<? extends T> src;
1293     final Block<? super Throwable> fn;
1294     final CompletableFuture<Void> dst;
1295 dl 1.1 ExceptionAction(CompletableFuture<? extends T> src,
1296     Block<? super Throwable> fn,
1297     CompletableFuture<Void> dst) {
1298     this.src = src; this.fn = fn; this.dst = dst;
1299     }
1300 jsr166 1.2 public void run() {
1301 dl 1.1 CompletableFuture<? extends T> a;
1302     Block<? super Throwable> fn;
1303     CompletableFuture<Void> dst;
1304 jsr166 1.2 Object r; Throwable ex;
1305     if ((dst = this.dst) != null &&
1306 dl 1.1 (fn = this.fn) != null &&
1307     (a = this.src) != null &&
1308     (r = a.result) != null &&
1309     compareAndSet(0, 1)) {
1310     if ((r instanceof AltResult) &&
1311     (ex = ((AltResult)r).ex) != null) {
1312     try {
1313     fn.accept(ex);
1314     dst.complete(null);
1315     } catch (Throwable rex) {
1316     dst.completeExceptionally(rex);
1317     }
1318     }
1319     }
1320     }
1321     }
1322    
1323     /* ------------- then/and/or implementations -------------- */
1324    
1325     private <U> CompletableFuture<U> thenFunction(Function<? super T,? extends U> fn,
1326     Executor executor) {
1327    
1328     if (fn == null) throw new NullPointerException();
1329     CompletableFuture<U> dst = new CompletableFuture<U>();
1330     ThenFunction<T,U> d = null;
1331 jsr166 1.2 Object r;
1332     if ((r = result) == null) {
1333 dl 1.1 CompletionNode p = new CompletionNode
1334     (d = new ThenFunction<T,U>(this, fn, dst, executor));
1335     while ((r = result) == null) {
1336     if (UNSAFE.compareAndSwapObject
1337     (this, COMPLETIONS, p.next = completions, p))
1338     break;
1339     }
1340     }
1341 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1342     T t; Throwable ex = null;
1343     if (r instanceof AltResult) {
1344 dl 1.1 if ((ex = ((AltResult)r).ex) != null)
1345 jsr166 1.2 dst.completeExceptionally(new RuntimeException(ex));
1346     t = null;
1347 dl 1.1 }
1348     else
1349 jsr166 1.2 t = (T) r;
1350     if (ex == null) {
1351 dl 1.1 try {
1352     if (executor != null)
1353     executor.execute(new AsyncFunction(t, fn, dst));
1354     else
1355     dst.complete(fn.apply(t));
1356     } catch (Throwable rex) {
1357     dst.completeExceptionally(rex);
1358     }
1359     }
1360 jsr166 1.2 postComplete();
1361     }
1362 dl 1.1 return dst;
1363     }
1364    
1365 jsr166 1.2 private CompletableFuture<Void> thenRunnable(Runnable action,
1366 dl 1.1 Executor executor) {
1367     if (action == null) throw new NullPointerException();
1368     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1369     ThenRunnable<T> d = null;
1370 jsr166 1.2 Object r;
1371     if ((r = result) == null) {
1372 dl 1.1 CompletionNode p = new CompletionNode
1373     (d = new ThenRunnable<T>(this, action, dst, executor));
1374     while ((r = result) == null) {
1375     if (UNSAFE.compareAndSwapObject
1376     (this, COMPLETIONS, p.next = completions, p))
1377     break;
1378     }
1379     }
1380 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1381 dl 1.1 Throwable ex = null;
1382 jsr166 1.2 if (r instanceof AltResult) {
1383 dl 1.1 if ((ex = ((AltResult)r).ex) != null)
1384 jsr166 1.2 dst.completeExceptionally(new RuntimeException(ex));
1385 dl 1.1 }
1386 jsr166 1.2 if (ex == null) {
1387 dl 1.1 try {
1388     if (executor != null)
1389     executor.execute(new AsyncRunnable(action, dst));
1390     else {
1391     action.run();
1392     dst.complete(null);
1393     }
1394     } catch (Throwable rex) {
1395     dst.completeExceptionally(rex);
1396     }
1397     }
1398 jsr166 1.2 postComplete();
1399     }
1400 dl 1.1 return dst;
1401     }
1402    
1403     private <U,V> CompletableFuture<V> andFunction(CompletableFuture<? extends U> other,
1404     BiFunction<? super T,? super U,? extends V> fn,
1405     Executor executor) {
1406     if (other == null || fn == null) throw new NullPointerException();
1407 jsr166 1.2 CompletableFuture<V> dst = new CompletableFuture<V>();
1408     AndFunction<T,U,V> d = null;
1409     Object r, s = null;
1410     if ((r = result) == null || (s = other.result) == null) {
1411 dl 1.1 d = new AndFunction<T,U,V>(this, other, fn, dst, executor);
1412     CompletionNode q = null, p = new CompletionNode(d);
1413     while ((r == null && (r = result) == null) ||
1414     (s == null && (s = other.result) == null)) {
1415     if (q != null) {
1416     if (s != null ||
1417     UNSAFE.compareAndSwapObject
1418     (other, COMPLETIONS, q.next = other.completions, q))
1419     break;
1420     }
1421     else if (r != null ||
1422     UNSAFE.compareAndSwapObject
1423     (this, COMPLETIONS, p.next = completions, p)) {
1424     if (s != null)
1425     break;
1426     q = new CompletionNode(d);
1427     }
1428     }
1429     }
1430 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1431     T t; U u; Throwable ex = null;
1432     if (r instanceof AltResult) {
1433 dl 1.1 if ((ex = ((AltResult)r).ex) != null)
1434 jsr166 1.2 dst.completeExceptionally(new RuntimeException(ex));
1435     t = null;
1436 dl 1.1 }
1437     else
1438 jsr166 1.2 t = (T) r;
1439 dl 1.1 if (ex != null)
1440     u = null;
1441     else if (s instanceof AltResult) {
1442     if ((ex = ((AltResult)s).ex) != null)
1443     dst.completeExceptionally(new RuntimeException(ex));
1444     u = null;
1445     }
1446     else
1447 jsr166 1.2 u = (U) s;
1448     if (ex == null) {
1449 dl 1.1 try {
1450     if (executor != null)
1451     executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1452     else
1453     dst.complete(fn.apply(t, u));
1454     } catch (Throwable rex) {
1455     dst.completeExceptionally(rex);
1456     }
1457     }
1458 jsr166 1.2 }
1459 dl 1.1 if (r != null)
1460     postComplete();
1461     if (s != null)
1462     other.postComplete();
1463 jsr166 1.2 return dst;
1464 dl 1.1 }
1465    
1466     private CompletableFuture<Void> andRunnable(CompletableFuture<?> other,
1467     Runnable action,
1468     Executor executor) {
1469     if (other == null || action == null) throw new NullPointerException();
1470 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1471     AndRunnable<T> d = null;
1472     Object r, s = null;
1473     if ((r = result) == null || (s = other.result) == null) {
1474 dl 1.1 d = new AndRunnable<T>(this, other, action, dst, executor);
1475     CompletionNode q = null, p = new CompletionNode(d);
1476     while ((r == null && (r = result) == null) ||
1477     (s == null && (s = other.result) == null)) {
1478     if (q != null) {
1479     if (s != null ||
1480     UNSAFE.compareAndSwapObject
1481     (other, COMPLETIONS, q.next = other.completions, q))
1482     break;
1483     }
1484     else if (r != null ||
1485     UNSAFE.compareAndSwapObject
1486     (this, COMPLETIONS, p.next = completions, p)) {
1487     if (s != null)
1488     break;
1489     q = new CompletionNode(d);
1490     }
1491     }
1492     }
1493 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1494     Throwable ex = null;
1495     if ((r instanceof AltResult) &&
1496 dl 1.1 (ex = ((AltResult)r).ex) != null)
1497     dst.completeExceptionally(new RuntimeException(ex));
1498     else if ((s instanceof AltResult) &&
1499     (ex = ((AltResult)s).ex) != null)
1500     dst.completeExceptionally(new RuntimeException(ex));
1501 jsr166 1.2 else {
1502 dl 1.1 try {
1503     if (executor != null)
1504     executor.execute(new AsyncRunnable(action, dst));
1505     else {
1506     action.run();
1507     dst.complete(null);
1508     }
1509     } catch (Throwable rex) {
1510     dst.completeExceptionally(rex);
1511     }
1512     }
1513 jsr166 1.2 }
1514 dl 1.1 if (r != null)
1515     postComplete();
1516     if (s != null)
1517     other.postComplete();
1518 jsr166 1.2 return dst;
1519 dl 1.1 }
1520    
1521     private <U> CompletableFuture<U> orFunction(CompletableFuture<? extends T> other,
1522     Function<? super T, U> fn,
1523     Executor executor) {
1524     if (other == null || fn == null) throw new NullPointerException();
1525 jsr166 1.2 CompletableFuture<U> dst = new CompletableFuture<U>();
1526     OrFunction<T,U> d = null;
1527     Object r;
1528     if ((r = result) == null && (r = other.result) == null) {
1529 dl 1.1 d = new OrFunction<T,U>(this, other, fn, dst, executor);
1530     CompletionNode q = null, p = new CompletionNode(d);
1531     while ((r = result) == null && (r = other.result) == null) {
1532     if (q != null) {
1533     if (UNSAFE.compareAndSwapObject
1534     (other, COMPLETIONS, q.next = other.completions, q))
1535     break;
1536     }
1537     else if (UNSAFE.compareAndSwapObject
1538     (this, COMPLETIONS, p.next = completions, p))
1539     q = new CompletionNode(d);
1540     }
1541 jsr166 1.2 }
1542     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1543     T t; Throwable ex = null;
1544     if (r instanceof AltResult) {
1545 dl 1.1 if ((ex = ((AltResult)r).ex) != null)
1546 jsr166 1.2 dst.completeExceptionally(new RuntimeException(ex));
1547     t = null;
1548 dl 1.1 }
1549     else
1550 jsr166 1.2 t = (T) r;
1551     if (ex == null) {
1552 dl 1.1 try {
1553     if (executor != null)
1554     executor.execute(new AsyncFunction(t, fn, dst));
1555     else
1556     dst.complete(fn.apply(t));
1557     } catch (Throwable rex) {
1558     dst.completeExceptionally(rex);
1559     }
1560     }
1561 jsr166 1.2 }
1562 dl 1.1 if (r != null) {
1563     if (result != null)
1564     postComplete();
1565     if (other.result != null)
1566     other.postComplete();
1567     }
1568 jsr166 1.2 return dst;
1569 dl 1.1 }
1570    
1571     private CompletableFuture<Void> orRunnable(CompletableFuture<?> other,
1572     Runnable action,
1573     Executor executor) {
1574     if (other == null || action == null) throw new NullPointerException();
1575 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1576     OrRunnable<T> d = null;
1577     Object r;
1578     if ((r = result) == null && (r = other.result) == null) {
1579 dl 1.1 d = new OrRunnable<T>(this, other, action, dst, executor);
1580     CompletionNode q = null, p = new CompletionNode(d);
1581     while ((r = result) == null && (r = other.result) == null) {
1582     if (q != null) {
1583     if (UNSAFE.compareAndSwapObject
1584     (other, COMPLETIONS, q.next = other.completions, q))
1585     break;
1586     }
1587     else if (UNSAFE.compareAndSwapObject
1588     (this, COMPLETIONS, p.next = completions, p))
1589     q = new CompletionNode(d);
1590     }
1591 jsr166 1.2 }
1592     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1593     Throwable ex = null;
1594     if ((r instanceof AltResult) &&
1595 dl 1.1 (ex = ((AltResult)r).ex) != null)
1596     dst.completeExceptionally(new RuntimeException(ex));
1597     else {
1598     try {
1599     if (executor != null)
1600     executor.execute(new AsyncRunnable(action, dst));
1601     else {
1602     action.run();
1603     dst.complete(null);
1604     }
1605     } catch (Throwable rex) {
1606     dst.completeExceptionally(rex);
1607     }
1608     }
1609 jsr166 1.2 }
1610 dl 1.1 if (r != null) {
1611     if (result != null)
1612     postComplete();
1613     if (other.result != null)
1614     other.postComplete();
1615     }
1616 jsr166 1.2 return dst;
1617 dl 1.1 }
1618    
1619     // Unsafe mechanics
1620     private static final sun.misc.Unsafe UNSAFE;
1621     private static final long RESULT;
1622     private static final long WAITERS;
1623     private static final long COMPLETIONS;
1624     static {
1625     try {
1626     UNSAFE = sun.misc.Unsafe.getUnsafe();
1627     Class<?> k = CompletableFuture.class;
1628     RESULT = UNSAFE.objectFieldOffset
1629     (k.getDeclaredField("result"));
1630     WAITERS = UNSAFE.objectFieldOffset
1631     (k.getDeclaredField("waiters"));
1632     COMPLETIONS = UNSAFE.objectFieldOffset
1633     (k.getDeclaredField("completions"));
1634     } catch (Exception e) {
1635     throw new Error(e);
1636     }
1637     }
1638    
1639     }