ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.19
Committed: Sun Dec 30 22:59:10 2012 UTC (11 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.18: +323 -236 lines
Log Message:
Use CompletionExceptions

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.7 import java.util.function.Block;
10     import java.util.function.BiBlock;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22 dl 1.19 import java.util.concurrent.CompletionException;
23 dl 1.1 import java.util.concurrent.atomic.AtomicInteger;
24     import java.util.concurrent.locks.LockSupport;
25    
26     /**
27     * A {@link Future} that may be explicitly completed (setting its
28     * value and status), and may include dependent functions and actions
29     * that trigger upon its completion.
30     *
31 dl 1.7 * <p>Similar methods are available for Functions, Blocks, and
32     * Runnables, depending on whether actions require arguments and/or
33     * produce results. Functions and actions supplied for dependent
34     * completions (mainly using methods with prefix {@code then}) may be
35 dl 1.1 * performed by the thread that completes the current
36     * CompletableFuture, or by any other caller of these methods. There
37     * are no guarantees about the order of processing completions unless
38 dl 1.17 * constrained by these methods.
39 dl 1.1 *
40 jsr166 1.4 * <p>When two or more threads attempt to {@link #complete} or {@link
41 dl 1.17 * #completeExceptionally} a CompletableFuture, only one of them
42 dl 1.19 * succeeds.
43     *
44     * <p> Upon exceptional completion, or when a completion entails
45 dl 1.5 * computation of a function or action, and it terminates abruptly
46 dl 1.19 * with an (unchecked) exception or error, then further completions
47     * act as {@code completeExceptionally} with a {@link
48     * CompletionException} holding that exception as its cause. If a
49     * CompletableFuture completes exceptionally, and is not followed by a
50     * {@link #exceptionally} or {@link #handle} completion, then all of
51     * its dependents (and their dependents) also complete exceptionally
52     * with CompletionExceptions holding the ultimate cause. For
53     * compatibility with {@link Future}, in case of a CompletionException,
54     * methods {@link #get()} and {@link #get(long, TimeUnit)} throw a
55     * {@link ExecutionException} with the same cause as would be held in
56     * the corresponding CompletionException.
57 dl 1.1 *
58     * <p>CompletableFutures themselves do not execute asynchronously.
59     * However, the {@code async} methods provide commonly useful ways to
60 jsr166 1.8 * commence asynchronous processing, using either a given {@link
61 dl 1.1 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
62     * function or action that will result in the completion of a new
63     * CompletableFuture.
64     *
65     * @author Doug Lea
66     * @since 1.8
67     */
68     public class CompletableFuture<T> implements Future<T> {
69     /*
70     * Quick overview (more to come):
71     *
72     * 1. Non-nullness of field result indicates done. An AltResult is
73     * used to box null as a result, as well as to hold exceptions.
74     *
75     * 2. Waiters are held in a Treiber stack similar to the one used
76 jsr166 1.9 * in FutureTask.
77 dl 1.1 *
78     * 3. Completions are also kept in a list/stack, and pulled off
79 dl 1.17 * and run when completion is triggered. Because post-processing
80     * may race with direct calls, completions extend AtomicInteger so
81     * callers can claim the action via compareAndSet(0, 1).
82 dl 1.1 */
83    
84     static final class AltResult {
85     final Throwable ex; // null only for NIL
86 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
87 dl 1.1 }
88    
89     static final AltResult NIL = new AltResult(null);
90    
91     /**
92     * Simple linked list nodes to record waiting threads in a Treiber
93     * stack. See other classes such as Phaser and SynchronousQueue
94     * for more detailed explanation.
95     */
96     static final class WaitNode {
97     volatile Thread thread;
98     volatile WaitNode next;
99     }
100    
101     /**
102     * Simple linked list nodes to record completions, used in
103     * basically the same way as WaitNodes
104     */
105     static final class CompletionNode {
106 jsr166 1.2 final Completion completion;
107 dl 1.1 volatile CompletionNode next;
108 jsr166 1.2 CompletionNode(Completion completion) { this.completion = completion; }
109 dl 1.1 }
110    
111     volatile Object result; // either the result or boxed AltResult
112     volatile WaitNode waiters; // Treiber stack of threads blocked on get()
113     volatile CompletionNode completions; // list (Treiber stack) of completions
114    
115     /**
116     * Creates a new incomplete CompletableFuture.
117     */
118     public CompletableFuture() {
119     }
120    
121     /**
122     * Asynchronously executes in the {@link
123     * ForkJoinPool#commonPool()}, a task that completes the returned
124     * CompletableFuture with the result of the given Supplier.
125     *
126     * @param supplier a function returning the value to be used
127 jsr166 1.14 * to complete the returned CompletableFuture
128 jsr166 1.11 * @return the CompletableFuture
129 dl 1.1 */
130 dl 1.7 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
131 dl 1.1 if (supplier == null) throw new NullPointerException();
132     CompletableFuture<U> f = new CompletableFuture<U>();
133     ForkJoinPool.commonPool().
134 dl 1.7 execute((ForkJoinTask<?>)new AsyncSupplier<U>(supplier, f));
135 dl 1.1 return f;
136     }
137    
138     /**
139     * Asynchronously executes using the given executor, a task that
140     * completes the returned CompletableFuture with the result of the
141     * given Supplier.
142     *
143     * @param supplier a function returning the value to be used
144 jsr166 1.11 * to complete the returned CompletableFuture
145 dl 1.1 * @param executor the executor to use for asynchronous execution
146 jsr166 1.11 * @return the CompletableFuture
147 dl 1.1 */
148 dl 1.7 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
149     Executor executor) {
150 dl 1.1 if (executor == null || supplier == null)
151     throw new NullPointerException();
152     CompletableFuture<U> f = new CompletableFuture<U>();
153 dl 1.7 executor.execute(new AsyncSupplier<U>(supplier, f));
154 dl 1.1 return f;
155     }
156    
157     /**
158     * Asynchronously executes in the {@link
159     * ForkJoinPool#commonPool()} a task that runs the given action,
160 jsr166 1.11 * and then completes the returned CompletableFuture.
161 dl 1.1 *
162     * @param runnable the action to run before completing the
163 jsr166 1.11 * returned CompletableFuture
164     * @return the CompletableFuture
165 dl 1.1 */
166 dl 1.7 public static CompletableFuture<Void> runAsync(Runnable runnable) {
167 dl 1.1 if (runnable == null) throw new NullPointerException();
168     CompletableFuture<Void> f = new CompletableFuture<Void>();
169     ForkJoinPool.commonPool().
170     execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
171     return f;
172     }
173    
174     /**
175     * Asynchronously executes using the given executor, a task that
176     * runs the given action, and then completes the returned
177 jsr166 1.11 * CompletableFuture.
178 dl 1.1 *
179     * @param runnable the action to run before completing the
180 jsr166 1.11 * returned CompletableFuture
181 dl 1.1 * @param executor the executor to use for asynchronous execution
182 jsr166 1.11 * @return the CompletableFuture
183 dl 1.1 */
184 dl 1.7 public static CompletableFuture<Void> runAsync(Runnable runnable,
185 jsr166 1.12 Executor executor) {
186 dl 1.1 if (executor == null || runnable == null)
187     throw new NullPointerException();
188     CompletableFuture<Void> f = new CompletableFuture<Void>();
189     executor.execute(new AsyncRunnable(runnable, f));
190     return f;
191     }
192    
193     /**
194     * Returns {@code true} if completed in any fashion: normally,
195 jsr166 1.16 * exceptionally, or via cancellation.
196 dl 1.1 *
197     * @return {@code true} if completed
198     */
199     public boolean isDone() {
200     return result != null;
201     }
202    
203     /**
204 dl 1.19 * Waits if necessary for the computation to complete, and then
205     * retrieves its result.
206     *
207     * @return the computed result
208     * @throws CancellationException if the computation was cancelled
209     * @throws ExecutionException if the computation threw an
210     * exception
211     * @throws InterruptedException if the current thread was interrupted
212     * while waiting
213     */
214     public T get() throws InterruptedException, ExecutionException {
215     Object r; Throwable ex;
216     if ((r = result) == null && (r = waitingGet(true)) == null)
217     throw new InterruptedException();
218     if (r instanceof AltResult) {
219     if ((ex = ((AltResult)r).ex) != null) {
220     if (ex instanceof CancellationException)
221     throw (CancellationException)ex;
222     if (ex instanceof CompletionException) {
223     Throwable cause = ex.getCause();
224     if (cause != null)
225     ex = cause;
226     }
227     throw new ExecutionException(ex);
228     }
229     return null;
230     }
231     return (T)r;
232     }
233    
234     /**
235 dl 1.1 * Returns the result value when complete, or throws an
236     * (unchecked) exception if completed exceptionally. To better
237 dl 1.19 * conform with the use of common functional forms, if a
238     * computation involved in the completion of this
239     * CompletableFuture threw an exception, this method throws an
240     * (unchecked) {@link CompletionException} with the
241     * underlying exception as its cause. Additionally, this method
242     * does not abort on interrupt.
243 dl 1.1 *
244     * @return the result value
245 dl 1.19 * @throws CancellationException if the computation was cancelled
246     * @throws CompletionException if the computation threw an
247     * exception
248 dl 1.1 */
249 dl 1.19 public T getValue() {
250 dl 1.1 Object r; Throwable ex;
251 jsr166 1.2 if ((r = result) == null)
252 dl 1.19 r = waitingGet(false);
253 jsr166 1.2 if (r instanceof AltResult) {
254 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
255 dl 1.19 if (ex instanceof CancellationException)
256     throw (CancellationException)ex;
257     if (ex instanceof CompletionException)
258     throw (CompletionException)ex;
259     throw new CompletionException(ex);
260 dl 1.5 }
261 jsr166 1.2 return null;
262     }
263     return (T)r;
264 dl 1.1 }
265    
266     /**
267     * Returns the result value (or throws any encountered exception)
268     * if completed, else returns the given valueIfAbsent.
269     *
270     * @param valueIfAbsent the value to return if not completed
271     * @return the result value, if completed, else the given valueIfAbsent
272 dl 1.19 * @throws CancellationException if the computation was cancelled
273     * @throws CompletionException if the computation threw an
274     * exception
275 dl 1.1 */
276     public T getNow(T valueIfAbsent) {
277     Object r; Throwable ex;
278 jsr166 1.2 if ((r = result) == null)
279     return valueIfAbsent;
280     if (r instanceof AltResult) {
281 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
282 dl 1.19 if (ex instanceof CancellationException)
283     throw (CancellationException)ex;
284     if (ex instanceof CompletionException)
285     throw (CompletionException)ex;
286     throw new CompletionException(ex);
287 dl 1.5 }
288 jsr166 1.2 return null;
289     }
290     return (T)r;
291 dl 1.1 }
292    
293     /**
294     * Waits if necessary for at most the given time for completion,
295     * and then retrieves its result, if available.
296     *
297     * @param timeout the maximum time to wait
298     * @param unit the time unit of the timeout argument
299     * @return the computed result
300     * @throws CancellationException if the computation was cancelled
301     * @throws ExecutionException if the computation threw an
302     * exception
303     * @throws InterruptedException if the current thread was interrupted
304     * while waiting
305     * @throws TimeoutException if the wait timed out
306     */
307     public T get(long timeout, TimeUnit unit)
308     throws InterruptedException, ExecutionException, TimeoutException {
309     Object r; Throwable ex;
310     long nanos = unit.toNanos(timeout);
311     if (Thread.interrupted())
312     throw new InterruptedException();
313 jsr166 1.2 if ((r = result) == null)
314     r = timedAwaitDone(nanos);
315     if (r instanceof AltResult) {
316 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
317 dl 1.19 if (ex instanceof CancellationException)
318     throw (CancellationException)ex;
319     if (ex instanceof CompletionException) {
320     Throwable cause = ex.getCause();
321     if (cause != null)
322     ex = cause;
323     }
324 dl 1.1 throw new ExecutionException(ex);
325 dl 1.5 }
326 jsr166 1.2 return null;
327     }
328     return (T)r;
329 dl 1.1 }
330    
331     /**
332     * If not already completed, sets the value returned by {@link
333     * #get()} and related methods to the given value.
334     *
335     * @param value the result value
336     * @return true if this invocation caused this CompletableFuture
337 jsr166 1.14 * to transition to a completed state, else false
338 dl 1.1 */
339     public boolean complete(T value) {
340     if (result == null &&
341     UNSAFE.compareAndSwapObject(this, RESULT, null,
342 jsr166 1.2 (value == null) ? NIL : value)) {
343 dl 1.1 postComplete();
344     return true;
345     }
346     return false;
347     }
348    
349     /**
350 dl 1.19 * Internal version of complete; CASes in an existing result or
351     * AltResult.
352     */
353     private void propagateCompletion(Object r) {
354     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r))
355     postComplete();
356     }
357    
358     /**
359 dl 1.1 * If not already completed, causes invocations of {@link #get()}
360     * and related methods to throw the given exception.
361     *
362     * @param ex the exception
363     * @return true if this invocation caused this CompletableFuture
364 jsr166 1.14 * to transition to a completed state, else false
365 dl 1.1 */
366     public boolean completeExceptionally(Throwable ex) {
367     if (ex == null) throw new NullPointerException();
368     if (result == null) {
369 dl 1.19 AltResult r = new AltResult(ex);
370 dl 1.1 if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
371     postComplete();
372     return true;
373     }
374     }
375     return false;
376     }
377    
378     /**
379 dl 1.19 * Internal version of completeExceptionally that avoids creating
380     * chains of CompletionExceptions
381     */
382     private void propagateException(Throwable ex) {
383     if (result == null) {
384     AltResult r = new AltResult
385     ((ex instanceof CompletionException) ? ex :
386     new CompletionException(ex));
387     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r))
388     postComplete();
389     }
390     }
391    
392     /**
393 dl 1.1 * Creates and returns a CompletableFuture that is completed with
394     * the result of the given function of this CompletableFuture.
395     * If this CompletableFuture completes exceptionally,
396     * then the returned CompletableFuture also does so,
397 dl 1.19 * with a CompletionException holding this exception as
398 dl 1.1 * its cause.
399     *
400     * @param fn the function to use to compute the value of
401     * the returned CompletableFuture
402     * @return the new CompletableFuture
403     */
404 dl 1.7 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
405 dl 1.1 return thenFunction(fn, null);
406     }
407    
408     /**
409     * Creates and returns a CompletableFuture that is asynchronously
410     * completed using the {@link ForkJoinPool#commonPool()} with the
411     * result of the given function of this CompletableFuture. If
412     * this CompletableFuture completes exceptionally, then the
413     * returned CompletableFuture also does so, with a
414 dl 1.19 * CompletionException holding this exception as its cause.
415 dl 1.1 *
416     * @param fn the function to use to compute the value of
417     * the returned CompletableFuture
418     * @return the new CompletableFuture
419     */
420 dl 1.7 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
421 dl 1.1 return thenFunction(fn, ForkJoinPool.commonPool());
422     }
423    
424     /**
425     * Creates and returns a CompletableFuture that is asynchronously
426     * completed using the given executor with the result of the given
427     * function of this CompletableFuture. If this CompletableFuture
428     * completes exceptionally, then the returned CompletableFuture
429 dl 1.19 * also does so, with a CompletionException holding this exception as
430 dl 1.1 * its cause.
431     *
432     * @param fn the function to use to compute the value of
433     * the returned CompletableFuture
434     * @param executor the executor to use for asynchronous execution
435     * @return the new CompletableFuture
436     */
437 dl 1.7 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
438 jsr166 1.12 Executor executor) {
439 dl 1.1 if (executor == null) throw new NullPointerException();
440     return thenFunction(fn, executor);
441     }
442    
443     /**
444     * Creates and returns a CompletableFuture that is completed after
445 dl 1.7 * performing the given action with this CompletableFuture's
446     * result when it completes. If this CompletableFuture
447     * completes exceptionally, then the returned CompletableFuture
448 dl 1.19 * also does so, with a CompletionException holding this exception as
449 dl 1.7 * its cause.
450     *
451     * @param block the action to perform before completing the
452     * returned CompletableFuture
453     * @return the new CompletableFuture
454     */
455     public CompletableFuture<Void> thenAccept(Block<? super T> block) {
456     return thenBlock(block, null);
457     }
458    
459     /**
460     * Creates and returns a CompletableFuture that is asynchronously
461     * completed using the {@link ForkJoinPool#commonPool()} with this
462     * CompletableFuture's result when it completes. If this
463     * CompletableFuture completes exceptionally, then the returned
464 dl 1.19 * CompletableFuture also does so, with a CompletionException holding
465 dl 1.7 * this exception as its cause.
466     *
467     * @param block the action to perform before completing the
468     * returned CompletableFuture
469     * @return the new CompletableFuture
470     */
471     public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block) {
472     return thenBlock(block, ForkJoinPool.commonPool());
473     }
474    
475     /**
476     * Creates and returns a CompletableFuture that is asynchronously
477     * completed using the given executor with this
478     * CompletableFuture's result when it completes. If this
479     * CompletableFuture completes exceptionally, then the returned
480 dl 1.19 * CompletableFuture also does so, with a CompletionException holding
481 dl 1.7 * this exception as its cause.
482     *
483     * @param block the action to perform before completing the
484     * returned CompletableFuture
485     * @param executor the executor to use for asynchronous execution
486     * @return the new CompletableFuture
487     */
488     public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block,
489     Executor executor) {
490     if (executor == null) throw new NullPointerException();
491     return thenBlock(block, executor);
492     }
493    
494     /**
495     * Creates and returns a CompletableFuture that is completed after
496     * performing the given action when this CompletableFuture
497 dl 1.1 * completes. If this CompletableFuture completes exceptionally,
498     * then the returned CompletableFuture also does so, with a
499 dl 1.19 * CompletionException holding this exception as its cause.
500 dl 1.1 *
501     * @param action the action to perform before completing the
502     * returned CompletableFuture
503     * @return the new CompletableFuture
504     */
505 dl 1.7 public CompletableFuture<Void> thenRun(Runnable action) {
506 dl 1.1 return thenRunnable(action, null);
507     }
508    
509     /**
510     * Creates and returns a CompletableFuture that is asynchronously
511     * completed using the {@link ForkJoinPool#commonPool()} after
512 dl 1.7 * performing the given action when this CompletableFuture
513 dl 1.1 * completes. If this CompletableFuture completes exceptionally,
514     * then the returned CompletableFuture also does so, with a
515 dl 1.19 * CompletionException holding this exception as its cause.
516 dl 1.1 *
517     * @param action the action to perform before completing the
518     * returned CompletableFuture
519     * @return the new CompletableFuture
520     */
521 dl 1.7 public CompletableFuture<Void> thenRunAsync(Runnable action) {
522 dl 1.1 return thenRunnable(action, ForkJoinPool.commonPool());
523     }
524    
525     /**
526     * Creates and returns a CompletableFuture that is asynchronously
527     * completed using the given executor after performing the given
528 dl 1.7 * action when this CompletableFuture completes. If this
529 dl 1.1 * CompletableFuture completes exceptionally, then the returned
530 dl 1.19 * CompletableFuture also does so, with a CompletionException holding
531 dl 1.1 * this exception as its cause.
532     *
533     * @param action the action to perform before completing the
534     * returned CompletableFuture
535     * @param executor the executor to use for asynchronous execution
536     * @return the new CompletableFuture
537     */
538 jsr166 1.12 public CompletableFuture<Void> thenRunAsync(Runnable action,
539     Executor executor) {
540 dl 1.1 if (executor == null) throw new NullPointerException();
541     return thenRunnable(action, executor);
542     }
543    
544     /**
545     * Creates and returns a CompletableFuture that is completed with
546     * the result of the given function of this and the other given
547 dl 1.7 * CompletableFuture's results when both complete. If this or
548 dl 1.1 * the other CompletableFuture complete exceptionally, then the
549     * returned CompletableFuture also does so, with a
550 dl 1.19 * CompletionException holding the exception as its cause.
551 dl 1.1 *
552     * @param other the other CompletableFuture
553     * @param fn the function to use to compute the value of
554     * the returned CompletableFuture
555     * @return the new CompletableFuture
556     */
557 dl 1.17 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
558     BiFunction<? super T,? super U,? extends V> fn) {
559 dl 1.1 return andFunction(other, fn, null);
560     }
561    
562     /**
563 jsr166 1.3 * Creates and returns a CompletableFuture that is asynchronously
564 dl 1.1 * completed using the {@link ForkJoinPool#commonPool()} with
565     * the result of the given function of this and the other given
566 dl 1.7 * CompletableFuture's results when both complete. If this or
567 dl 1.1 * the other CompletableFuture complete exceptionally, then the
568     * returned CompletableFuture also does so, with a
569 dl 1.19 * CompletionException holding the exception as its cause.
570 dl 1.1 *
571     * @param other the other CompletableFuture
572     * @param fn the function to use to compute the value of
573     * the returned CompletableFuture
574     * @return the new CompletableFuture
575     */
576 dl 1.17 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
577     BiFunction<? super T,? super U,? extends V> fn) {
578 dl 1.1 return andFunction(other, fn, ForkJoinPool.commonPool());
579     }
580    
581     /**
582 jsr166 1.3 * Creates and returns a CompletableFuture that is
583 dl 1.1 * asynchronously completed using the given executor with the
584     * result of the given function of this and the other given
585 dl 1.7 * CompletableFuture's results when both complete. If this or
586 dl 1.1 * the other CompletableFuture complete exceptionally, then the
587     * returned CompletableFuture also does so, with a
588 dl 1.19 * CompletionException holding the exception as its cause.
589 dl 1.1 *
590     * @param other the other CompletableFuture
591     * @param fn the function to use to compute the value of
592     * the returned CompletableFuture
593     * @param executor the executor to use for asynchronous execution
594     * @return the new CompletableFuture
595     */
596    
597 dl 1.17 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
598     BiFunction<? super T,? super U,? extends V> fn,
599     Executor executor) {
600 dl 1.7 if (executor == null) throw new NullPointerException();
601     return andFunction(other, fn, executor);
602     }
603    
604     /**
605     * Creates and returns a CompletableFuture that is completed with
606     * the results of this and the other given CompletableFuture if
607     * both complete. If this and/or the other CompletableFuture
608     * complete exceptionally, then the returned CompletableFuture
609 dl 1.19 * also does so, with a CompletionException holding one of these
610 dl 1.7 * exceptions as its cause.
611     *
612     * @param other the other CompletableFuture
613     * @param block the action to perform before completing the
614     * returned CompletableFuture
615     * @return the new CompletableFuture
616     */
617 dl 1.17 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
618     BiBlock<? super T, ? super U> block) {
619 dl 1.7 return andBlock(other, block, null);
620     }
621    
622     /**
623     * Creates and returns a CompletableFuture that is completed
624     * asynchronously using the {@link ForkJoinPool#commonPool()} with
625     * the results of this and the other given CompletableFuture when
626     * both complete. If this and/or the other CompletableFuture
627     * complete exceptionally, then the returned CompletableFuture
628 dl 1.19 * also does so, with a CompletionException holding one of these
629 dl 1.7 * exceptions as its cause.
630     *
631     * @param other the other CompletableFuture
632     * @param block the action to perform before completing the
633     * returned CompletableFuture
634     * @return the new CompletableFuture
635     */
636 dl 1.17 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
637     BiBlock<? super T, ? super U> block) {
638 dl 1.7 return andBlock(other, block, ForkJoinPool.commonPool());
639     }
640    
641     /**
642     * Creates and returns a CompletableFuture that is completed
643     * asynchronously using the given executor with the results of
644     * this and the other given CompletableFuture when both complete.
645     * If this and/or the other CompletableFuture complete
646     * exceptionally, then the returned CompletableFuture also does
647 dl 1.19 * so, with a CompletionException holding one of these exceptions as
648 dl 1.7 * its cause.
649     *
650     * @param other the other CompletableFuture
651     * @param block the action to perform before completing the
652     * returned CompletableFuture
653     * @param executor the executor to use for asynchronous execution
654     * @return the new CompletableFuture
655     */
656 dl 1.17 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
657     BiBlock<? super T, ? super U> block,
658     Executor executor) {
659 dl 1.1 if (executor == null) throw new NullPointerException();
660 dl 1.7 return andBlock(other, block, executor);
661 dl 1.1 }
662    
663     /**
664     * Creates and returns a CompletableFuture that is completed
665 dl 1.7 * when this and the other given CompletableFuture both
666 dl 1.1 * complete. If this and/or the other CompletableFuture complete
667     * exceptionally, then the returned CompletableFuture also does
668 dl 1.19 * so, with a CompletionException holding one of these exceptions as
669 dl 1.1 * its cause.
670     *
671     * @param other the other CompletableFuture
672     * @param action the action to perform before completing the
673     * returned CompletableFuture
674     * @return the new CompletableFuture
675     */
676 dl 1.17 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
677     Runnable action) {
678 dl 1.1 return andRunnable(other, action, null);
679     }
680    
681     /**
682     * Creates and returns a CompletableFuture that is completed
683     * asynchronously using the {@link ForkJoinPool#commonPool()}
684 dl 1.7 * when this and the other given CompletableFuture both
685 dl 1.1 * complete. If this and/or the other CompletableFuture complete
686     * exceptionally, then the returned CompletableFuture also does
687 dl 1.19 * so, with a CompletionException holding one of these exceptions as
688 dl 1.1 * its cause.
689     *
690     * @param other the other CompletableFuture
691     * @param action the action to perform before completing the
692     * returned CompletableFuture
693     * @return the new CompletableFuture
694     */
695 dl 1.17 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
696     Runnable action) {
697 dl 1.1 return andRunnable(other, action, ForkJoinPool.commonPool());
698     }
699    
700     /**
701     * Creates and returns a CompletableFuture that is completed
702     * asynchronously using the given executor
703 dl 1.7 * when this and the other given CompletableFuture both
704 dl 1.1 * complete. If this and/or the other CompletableFuture complete
705     * exceptionally, then the returned CompletableFuture also does
706 dl 1.19 * so, with a CompletionException holding one of these exceptions as
707 dl 1.1 * its cause.
708     *
709     * @param other the other CompletableFuture
710     * @param action the action to perform before completing the
711     * returned CompletableFuture
712     * @param executor the executor to use for asynchronous execution
713     * @return the new CompletableFuture
714     */
715 dl 1.17 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
716     Runnable action,
717     Executor executor) {
718 dl 1.1 if (executor == null) throw new NullPointerException();
719     return andRunnable(other, action, executor);
720     }
721    
722     /**
723     * Creates and returns a CompletableFuture that is completed with
724     * the result of the given function of either this or the other
725 dl 1.7 * given CompletableFuture's results when either complete. If
726 dl 1.1 * this and/or the other CompletableFuture complete exceptionally,
727     * then the returned CompletableFuture may also do so, with a
728 dl 1.19 * CompletionException holding one of these exceptions as its cause.
729 dl 1.1 * No guarantees are made about which result or exception is used
730     * in the returned CompletableFuture.
731     *
732     * @param other the other CompletableFuture
733     * @param fn the function to use to compute the value of
734     * the returned CompletableFuture
735     * @return the new CompletableFuture
736     */
737 dl 1.17 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
738     Function<? super T, U> fn) {
739 dl 1.1 return orFunction(other, fn, null);
740     }
741    
742     /**
743     * Creates and returns a CompletableFuture that is completed
744     * asynchronously using the {@link ForkJoinPool#commonPool()} with
745     * the result of the given function of either this or the other
746 dl 1.7 * given CompletableFuture's results when either complete. If
747 dl 1.1 * this and/or the other CompletableFuture complete exceptionally,
748     * then the returned CompletableFuture may also do so, with a
749 dl 1.19 * CompletionException holding one of these exceptions as its cause.
750 dl 1.1 * No guarantees are made about which result or exception is used
751     * in the returned CompletableFuture.
752     *
753     * @param other the other CompletableFuture
754     * @param fn the function to use to compute the value of
755     * the returned CompletableFuture
756     * @return the new CompletableFuture
757     */
758 dl 1.17 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
759     Function<? super T, U> fn) {
760 dl 1.1 return orFunction(other, fn, ForkJoinPool.commonPool());
761     }
762    
763     /**
764     * Creates and returns a CompletableFuture that is completed
765     * asynchronously using the given executor with the result of the
766     * given function of either this or the other given
767 dl 1.7 * CompletableFuture's results when either complete. If this
768 dl 1.1 * and/or the other CompletableFuture complete exceptionally, then
769     * the returned CompletableFuture may also do so, with a
770 dl 1.19 * CompletionException holding one of these exceptions as its cause.
771 dl 1.1 * No guarantees are made about which result or exception is used
772     * in the returned CompletableFuture.
773     *
774     * @param other the other CompletableFuture
775     * @param fn the function to use to compute the value of
776     * the returned CompletableFuture
777     * @param executor the executor to use for asynchronous execution
778     * @return the new CompletableFuture
779     */
780 dl 1.17 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
781     Function<? super T, U> fn,
782     Executor executor) {
783 dl 1.1 if (executor == null) throw new NullPointerException();
784     return orFunction(other, fn, executor);
785     }
786    
787     /**
788 dl 1.7 * Creates and returns a CompletableFuture that is completed after
789     * performing the given action with the result of either this or the
790     * other given CompletableFuture's result, when either complete.
791     * If this and/or the other CompletableFuture complete
792     * exceptionally, then the returned CompletableFuture may also do
793 dl 1.19 * so, with a CompletionException holding one of these exceptions as
794 dl 1.7 * its cause. No guarantees are made about which exception is
795     * used in the returned CompletableFuture.
796     *
797     * @param other the other CompletableFuture
798     * @param block the action to perform before completing the
799     * returned CompletableFuture
800     * @return the new CompletableFuture
801     */
802 dl 1.17 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
803     Block<? super T> block) {
804 dl 1.7 return orBlock(other, block, null);
805     }
806    
807     /**
808     * Creates and returns a CompletableFuture that is completed
809     * asynchronously using the {@link ForkJoinPool#commonPool()},
810     * performing the given action with the result of either this or
811     * the other given CompletableFuture's result, when either
812     * complete. If this and/or the other CompletableFuture complete
813     * exceptionally, then the returned CompletableFuture may also do
814 dl 1.19 * so, with a CompletionException holding one of these exceptions as
815 dl 1.7 * its cause. No guarantees are made about which exception is
816     * used in the returned CompletableFuture.
817     *
818     * @param other the other CompletableFuture
819     * @param block the action to perform before completing the
820     * returned CompletableFuture
821     * @return the new CompletableFuture
822     */
823 dl 1.17 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
824     Block<? super T> block) {
825 dl 1.7 return orBlock(other, block, ForkJoinPool.commonPool());
826     }
827    
828     /**
829     * Creates and returns a CompletableFuture that is completed
830     * asynchronously using the given executor,
831     * performing the given action with the result of either this or
832     * the other given CompletableFuture's result, when either
833     * complete. If this and/or the other CompletableFuture complete
834     * exceptionally, then the returned CompletableFuture may also do
835 dl 1.19 * so, with a CompletionException holding one of these exceptions as
836 dl 1.7 * its cause. No guarantees are made about which exception is
837     * used in the returned CompletableFuture.
838     *
839     * @param other the other CompletableFuture
840     * @param block the action to perform before completing the
841     * returned CompletableFuture
842     * @param executor the executor to use for asynchronous execution
843     * @return the new CompletableFuture
844     */
845 dl 1.17 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
846     Block<? super T> block,
847     Executor executor) {
848 dl 1.7 if (executor == null) throw new NullPointerException();
849     return orBlock(other, block, executor);
850     }
851    
852     /**
853 dl 1.1 * Creates and returns a CompletableFuture that is completed
854     * after this or the other given CompletableFuture complete. If
855     * this and/or the other CompletableFuture complete exceptionally,
856     * then the returned CompletableFuture may also do so, with a
857 dl 1.19 * CompletionException holding one of these exceptions as its cause.
858 dl 1.1 * No guarantees are made about which exception is used in the
859     * returned CompletableFuture.
860     *
861     * @param other the other CompletableFuture
862     * @param action the action to perform before completing the
863     * returned CompletableFuture
864     * @return the new CompletableFuture
865     */
866 dl 1.17 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
867     Runnable action) {
868 dl 1.1 return orRunnable(other, action, null);
869     }
870    
871     /**
872     * Creates and returns a CompletableFuture that is completed
873     * asynchronously using the {@link ForkJoinPool#commonPool()}
874     * after this or the other given CompletableFuture complete. If
875     * this and/or the other CompletableFuture complete exceptionally,
876     * then the returned CompletableFuture may also do so, with a
877 dl 1.19 * CompletionException holding one of these exceptions as its cause.
878 dl 1.1 * No guarantees are made about which exception is used in the
879     * returned CompletableFuture.
880     *
881     * @param other the other CompletableFuture
882     * @param action the action to perform before completing the
883     * returned CompletableFuture
884     * @return the new CompletableFuture
885     */
886 dl 1.17 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
887     Runnable action) {
888 dl 1.1 return orRunnable(other, action, ForkJoinPool.commonPool());
889     }
890    
891     /**
892     * Creates and returns a CompletableFuture that is completed
893     * asynchronously using the given executor after this or the other
894     * given CompletableFuture complete. If this and/or the other
895     * CompletableFuture complete exceptionally, then the returned
896 dl 1.19 * CompletableFuture may also do so, with a CompletionException
897     * holding one of these exceptions as its cause. No guarantees are
898 dl 1.1 * made about which exception is used in the returned
899     * CompletableFuture.
900     *
901     * @param other the other CompletableFuture
902     * @param action the action to perform before completing the
903     * returned CompletableFuture
904     * @param executor the executor to use for asynchronous execution
905     * @return the new CompletableFuture
906     */
907 dl 1.17 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
908     Runnable action,
909     Executor executor) {
910 dl 1.1 if (executor == null) throw new NullPointerException();
911     return orRunnable(other, action, executor);
912     }
913    
914     /**
915 dl 1.17 * Returns a CompletableFuture equal or equivalent to that
916     * produced by the given function of the result of this
917     * CompletableFuture when completed. If this CompletableFuture
918     * completes exceptionally, then the returned CompletableFuture
919 dl 1.19 * also does so, with a CompletionException holding this exception as
920 dl 1.17 * its cause.
921     *
922     * @param fn the function returning a new CompletableFuture.
923     * @return the CompletableFuture, that {@code isDone()} upon
924     * return if completed by the given function, or an exception
925     * occurs.
926     */
927     public <U> CompletableFuture<U> thenCompose(Function<? super T,
928     CompletableFuture<U>> fn) {
929     if (fn == null) throw new NullPointerException();
930     CompletableFuture<U> dst = null;
931     ThenCompose<T,U> d = null;
932     Object r;
933     if ((r = result) == null) {
934     dst = new CompletableFuture<U>();
935     CompletionNode p = new CompletionNode
936     (d = new ThenCompose<T,U>(this, fn, dst));
937     while ((r = result) == null) {
938     if (UNSAFE.compareAndSwapObject
939     (this, COMPLETIONS, p.next = completions, p))
940     break;
941     }
942     }
943     if (r != null && (d == null || d.compareAndSet(0, 1))) {
944     T t; Throwable ex = null;
945     if (r instanceof AltResult) {
946     ex = ((AltResult)r).ex;
947     t = null;
948     }
949     else
950     t = (T) r;
951     if (ex == null) {
952     try {
953     dst = fn.apply(t);
954     } catch (Throwable rex) {
955     ex = rex;
956     }
957     }
958     if (dst == null) {
959     dst = new CompletableFuture<U>();
960     if (ex == null)
961     ex = new NullPointerException();
962     }
963     if (ex != null)
964 dl 1.19 dst.propagateException(ex);
965 dl 1.17 }
966     if (r != null || result != null)
967     postComplete();
968     return dst;
969     }
970    
971     /**
972 dl 1.6 * Creates and returns a CompletableFuture that is completed with
973     * the result of the given function of the exception triggering
974 dl 1.7 * this CompletableFuture's completion when it completes
975 dl 1.6 * exceptionally; Otherwise, if this CompletableFuture completes
976     * normally, then the returned CompletableFuture also completes
977     * normally with the same value.
978     *
979     * @param fn the function to use to compute the value of the
980     * returned CompletableFuture if this CompletableFuture completed
981     * exceptionally
982 dl 1.1 * @return the new CompletableFuture
983     */
984 dl 1.6 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
985     if (fn == null) throw new NullPointerException();
986     CompletableFuture<T> dst = new CompletableFuture<T>();
987 dl 1.1 ExceptionAction<T> d = null;
988 dl 1.6 Object r;
989 jsr166 1.2 if ((r = result) == null) {
990 dl 1.1 CompletionNode p =
991 dl 1.6 new CompletionNode(d = new ExceptionAction<T>(this, fn, dst));
992 dl 1.1 while ((r = result) == null) {
993     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
994     p.next = completions, p))
995     break;
996     }
997     }
998 dl 1.6 if (r != null && (d == null || d.compareAndSet(0, 1))) {
999     T t; Throwable ex = null;
1000     if (r instanceof AltResult) {
1001     if ((ex = ((AltResult)r).ex) != null) {
1002     try {
1003     dst.complete(fn.apply(ex));
1004     } catch (Throwable rex) {
1005     dst.completeExceptionally(rex);
1006     }
1007     }
1008     t = null;
1009 dl 1.1 }
1010 dl 1.6 else
1011     t = (T) r;
1012     if (ex == null)
1013     dst.complete(t);
1014 dl 1.1 }
1015 dl 1.17 if (r != null || result != null)
1016     postComplete();
1017     return dst;
1018     }
1019    
1020     /**
1021     * Creates and returns a CompletableFuture that is completed with
1022     * the result of the given function of the result and exception of
1023     * this CompletableFuture's completion when it completes. The
1024     * given function is invoked with the result (or {@code null} if
1025     * none) and the exception (or {@code null} if none) of this
1026     * CompletableFuture when complete.
1027     *
1028     * @param fn the function to use to compute the value of the
1029     * returned CompletableFuture
1030    
1031     * @return the new CompletableFuture
1032     */
1033     public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
1034     if (fn == null) throw new NullPointerException();
1035     CompletableFuture<U> dst = new CompletableFuture<U>();
1036     ThenHandle<T,U> d = null;
1037     Object r;
1038     if ((r = result) == null) {
1039     CompletionNode p =
1040     new CompletionNode(d = new ThenHandle<T,U>(this, fn, dst));
1041     while ((r = result) == null) {
1042     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1043     p.next = completions, p))
1044     break;
1045     }
1046     }
1047     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1048     T t; Throwable ex;
1049     if (r instanceof AltResult) {
1050     ex = ((AltResult)r).ex;
1051     t = null;
1052     }
1053     else {
1054     ex = null;
1055     t = (T) r;
1056     }
1057     try {
1058     dst.complete(fn.apply(t, ex));
1059     } catch (Throwable rex) {
1060     dst.completeExceptionally(rex);
1061     }
1062     }
1063     if (r != null || result != null)
1064 dl 1.1 postComplete();
1065     return dst;
1066     }
1067    
1068     /**
1069     * Attempts to complete this CompletableFuture with
1070     * a {@link CancellationException}.
1071     *
1072     * @param mayInterruptIfRunning this value has no effect in this
1073     * implementation because interrupts are not used to control
1074     * processing.
1075     *
1076     * @return {@code true} if this task is now cancelled
1077     */
1078     public boolean cancel(boolean mayInterruptIfRunning) {
1079     Object r;
1080     while ((r = result) == null) {
1081     r = new AltResult(new CancellationException());
1082     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
1083     postComplete();
1084     return true;
1085     }
1086     }
1087     return ((r instanceof AltResult) &&
1088     (((AltResult)r).ex instanceof CancellationException));
1089     }
1090    
1091     /**
1092     * Returns {@code true} if this CompletableFuture was cancelled
1093     * before it completed normally.
1094     *
1095     * @return {@code true} if this CompletableFuture was cancelled
1096     * before it completed normally
1097     */
1098     public boolean isCancelled() {
1099     Object r;
1100     return ((r = result) != null &&
1101     (r instanceof AltResult) &&
1102     (((AltResult)r).ex instanceof CancellationException));
1103     }
1104    
1105     /**
1106 dl 1.6 * Forcibly sets or resets the value subsequently returned by
1107     * method get() and related methods, whether or not already
1108     * completed. This method is designed for use only in error
1109     * recovery actions, and even in such situations may result in
1110     * ongoing dependent completions using established versus
1111     * overwritten values.
1112 dl 1.1 *
1113     * @param value the completion value
1114     */
1115 dl 1.6 public void obtrudeValue(T value) {
1116 jsr166 1.2 result = (value == null) ? NIL : value;
1117 dl 1.1 postComplete();
1118     }
1119    
1120     /**
1121 jsr166 1.10 * Removes and signals all waiting threads and runs all completions.
1122 dl 1.1 */
1123     private void postComplete() {
1124     WaitNode q; Thread t;
1125     while ((q = waiters) != null) {
1126     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
1127     (t = q.thread) != null) {
1128     q.thread = null;
1129     LockSupport.unpark(t);
1130     }
1131     }
1132    
1133     CompletionNode h; Completion c;
1134     while ((h = completions) != null) {
1135     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
1136     (c = h.completion) != null)
1137     c.run();
1138     }
1139     }
1140    
1141     /* ------------- waiting for completions -------------- */
1142    
1143 jsr166 1.2 /**
1144 dl 1.1 * Heuristic spin value for waitingGet() before blocking on
1145     * multiprocessors
1146     */
1147     static final int WAITING_GET_SPINS = 256;
1148    
1149     /**
1150 dl 1.19 * Returns raw result after waiting, or null if interruptible and
1151     * interrupted.
1152 dl 1.1 */
1153 dl 1.19 private Object waitingGet(boolean interruptible) {
1154 dl 1.1 WaitNode q = null;
1155     boolean queued = false, interrupted = false;
1156     int h = 0, spins = 0;
1157     for (Object r;;) {
1158     if ((r = result) != null) {
1159 dl 1.5 Throwable ex;
1160 dl 1.1 if (q != null) // suppress unpark
1161     q.thread = null;
1162     postComplete(); // help release others
1163     if (interrupted)
1164     Thread.currentThread().interrupt();
1165 dl 1.19 return r;
1166 dl 1.1 }
1167     else if (h == 0) {
1168     h = ThreadLocalRandom.current().nextInt();
1169     if (Runtime.getRuntime().availableProcessors() > 1)
1170     spins = WAITING_GET_SPINS;
1171     }
1172     else if (spins > 0) {
1173     h ^= h << 1; // xorshift
1174 jsr166 1.2 h ^= h >>> 3;
1175 dl 1.1 if ((h ^= h << 10) >= 0)
1176     --spins;
1177 jsr166 1.2 }
1178 dl 1.1 else if (q == null)
1179     q = new WaitNode();
1180     else if (!queued)
1181     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1182     q.next = waiters, q);
1183 dl 1.19 else if (Thread.interrupted()) {
1184     if (interruptible)
1185     return null;
1186 dl 1.1 interrupted = true;
1187 dl 1.19 }
1188 dl 1.1 else if (q.thread == null)
1189     q.thread = Thread.currentThread();
1190     else
1191     LockSupport.park(this);
1192     }
1193     }
1194    
1195     /**
1196     * Awaits completion or aborts on interrupt or timeout.
1197     *
1198     * @param nanos time to wait
1199     * @return raw result
1200     */
1201     private Object timedAwaitDone(long nanos)
1202     throws InterruptedException, TimeoutException {
1203     final long deadline = System.nanoTime() + nanos;
1204     WaitNode q = null;
1205     boolean queued = false;
1206     for (Object r;;) {
1207     if (Thread.interrupted()) {
1208     removeWaiter(q);
1209     throw new InterruptedException();
1210     }
1211     else if ((r = result) != null) {
1212     if (q != null)
1213     q.thread = null;
1214     postComplete();
1215     return r;
1216     }
1217     else if (q == null)
1218     q = new WaitNode();
1219     else if (!queued)
1220     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1221     q.next = waiters, q);
1222     else if ((nanos = deadline - System.nanoTime()) <= 0L) {
1223     removeWaiter(q);
1224     throw new TimeoutException();
1225     }
1226     else if (q.thread == null)
1227     q.thread = Thread.currentThread();
1228     else
1229     LockSupport.parkNanos(this, nanos);
1230     }
1231     }
1232    
1233     /**
1234     * Tries to unlink a timed-out or interrupted wait node to avoid
1235     * accumulating garbage. Internal nodes are simply unspliced
1236     * without CAS since it is harmless if they are traversed anyway
1237     * by releasers. To avoid effects of unsplicing from already
1238     * removed nodes, the list is retraversed in case of an apparent
1239     * race. This is slow when there are a lot of nodes, but we don't
1240     * expect lists to be long enough to outweigh higher-overhead
1241     * schemes.
1242     */
1243     private void removeWaiter(WaitNode node) {
1244     if (node != null) {
1245     node.thread = null;
1246     retry:
1247     for (;;) { // restart on removeWaiter race
1248     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
1249     s = q.next;
1250     if (q.thread != null)
1251     pred = q;
1252     else if (pred != null) {
1253     pred.next = s;
1254     if (pred.thread == null) // check for race
1255     continue retry;
1256     }
1257     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
1258     continue retry;
1259     }
1260     break;
1261     }
1262     }
1263     }
1264    
1265     /* ------------- Async tasks -------------- */
1266    
1267     /** Base class can act as either FJ or plain Runnable */
1268     static abstract class Async extends ForkJoinTask<Void> implements Runnable {
1269     public final Void getRawResult() { return null; }
1270     public final void setRawResult(Void v) { }
1271     public final void run() { exec(); }
1272     }
1273    
1274     static final class AsyncRunnable extends Async {
1275     final Runnable runnable;
1276     final CompletableFuture<Void> dst;
1277     AsyncRunnable(Runnable runnable, CompletableFuture<Void> dst) {
1278     this.runnable = runnable; this.dst = dst;
1279     }
1280     public final boolean exec() {
1281     Runnable fn;
1282     CompletableFuture<Void> d;
1283     if ((fn = this.runnable) == null || (d = this.dst) == null)
1284     throw new NullPointerException();
1285     try {
1286     fn.run();
1287     d.complete(null);
1288     } catch (Throwable ex) {
1289     d.completeExceptionally(ex);
1290     }
1291     return true;
1292     }
1293     private static final long serialVersionUID = 5232453952276885070L;
1294     }
1295    
1296     static final class AsyncSupplier<U> extends Async {
1297     final Supplier<U> supplier;
1298     final CompletableFuture<U> dst;
1299     AsyncSupplier(Supplier<U> supplier, CompletableFuture<U> dst) {
1300     this.supplier = supplier; this.dst = dst;
1301     }
1302     public final boolean exec() {
1303     Supplier<U> fn;
1304     CompletableFuture<U> d;
1305     if ((fn = this.supplier) == null || (d = this.dst) == null)
1306     throw new NullPointerException();
1307     try {
1308     d.complete(fn.get());
1309     } catch (Throwable ex) {
1310     d.completeExceptionally(ex);
1311     }
1312     return true;
1313     }
1314     private static final long serialVersionUID = 5232453952276885070L;
1315     }
1316    
1317     static final class AsyncFunction<T,U> extends Async {
1318     Function<? super T,? extends U> fn;
1319     T arg;
1320     final CompletableFuture<U> dst;
1321     AsyncFunction(T arg, Function<? super T,? extends U> fn,
1322     CompletableFuture<U> dst) {
1323     this.arg = arg; this.fn = fn; this.dst = dst;
1324     }
1325     public final boolean exec() {
1326     Function<? super T,? extends U> fn;
1327     CompletableFuture<U> d;
1328     if ((fn = this.fn) == null || (d = this.dst) == null)
1329     throw new NullPointerException();
1330     try {
1331     d.complete(fn.apply(arg));
1332     } catch (Throwable ex) {
1333     d.completeExceptionally(ex);
1334     }
1335     return true;
1336     }
1337     private static final long serialVersionUID = 5232453952276885070L;
1338     }
1339    
1340     static final class AsyncBiFunction<T,U,V> extends Async {
1341     final BiFunction<? super T,? super U,? extends V> fn;
1342     final T arg1;
1343     final U arg2;
1344     final CompletableFuture<V> dst;
1345     AsyncBiFunction(T arg1, U arg2,
1346     BiFunction<? super T,? super U,? extends V> fn,
1347     CompletableFuture<V> dst) {
1348     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1349     }
1350     public final boolean exec() {
1351     BiFunction<? super T,? super U,? extends V> fn;
1352     CompletableFuture<V> d;
1353     if ((fn = this.fn) == null || (d = this.dst) == null)
1354     throw new NullPointerException();
1355     try {
1356     d.complete(fn.apply(arg1, arg2));
1357     } catch (Throwable ex) {
1358     d.completeExceptionally(ex);
1359     }
1360     return true;
1361     }
1362     private static final long serialVersionUID = 5232453952276885070L;
1363     }
1364    
1365 dl 1.7 static final class AsyncBlock<T> extends Async {
1366     Block<? super T> fn;
1367     T arg;
1368     final CompletableFuture<Void> dst;
1369     AsyncBlock(T arg, Block<? super T> fn,
1370     CompletableFuture<Void> dst) {
1371     this.arg = arg; this.fn = fn; this.dst = dst;
1372     }
1373     public final boolean exec() {
1374     Block<? super T> fn;
1375     CompletableFuture<Void> d;
1376     if ((fn = this.fn) == null || (d = this.dst) == null)
1377     throw new NullPointerException();
1378     try {
1379     fn.accept(arg);
1380     d.complete(null);
1381     } catch (Throwable ex) {
1382     d.completeExceptionally(ex);
1383     }
1384     return true;
1385     }
1386     private static final long serialVersionUID = 5232453952276885070L;
1387     }
1388    
1389     static final class AsyncBiBlock<T,U> extends Async {
1390     final BiBlock<? super T,? super U> fn;
1391     final T arg1;
1392     final U arg2;
1393     final CompletableFuture<Void> dst;
1394     AsyncBiBlock(T arg1, U arg2,
1395     BiBlock<? super T,? super U> fn,
1396     CompletableFuture<Void> dst) {
1397     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1398     }
1399     public final boolean exec() {
1400     BiBlock<? super T,? super U> fn;
1401     CompletableFuture<Void> d;
1402     if ((fn = this.fn) == null || (d = this.dst) == null)
1403     throw new NullPointerException();
1404     try {
1405     fn.accept(arg1, arg2);
1406     d.complete(null);
1407     } catch (Throwable ex) {
1408     d.completeExceptionally(ex);
1409     }
1410     return true;
1411     }
1412     private static final long serialVersionUID = 5232453952276885070L;
1413     }
1414    
1415 dl 1.1 /* ------------- Completions -------------- */
1416    
1417     // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1418     static abstract class Completion extends AtomicInteger implements Runnable {
1419     }
1420    
1421     static final class ThenFunction<T,U> extends Completion {
1422 jsr166 1.2 final CompletableFuture<? extends T> src;
1423     final Function<? super T,? extends U> fn;
1424     final CompletableFuture<U> dst;
1425 dl 1.1 final Executor executor;
1426     ThenFunction(CompletableFuture<? extends T> src,
1427     final Function<? super T,? extends U> fn,
1428     final CompletableFuture<U> dst, Executor executor) {
1429     this.src = src; this.fn = fn; this.dst = dst;
1430     this.executor = executor;
1431     }
1432 jsr166 1.2 public void run() {
1433 dl 1.1 CompletableFuture<? extends T> a;
1434     Function<? super T,? extends U> fn;
1435     CompletableFuture<U> dst;
1436 jsr166 1.2 Object r; T t; Throwable ex;
1437     if ((dst = this.dst) != null &&
1438 dl 1.1 (fn = this.fn) != null &&
1439     (a = this.src) != null &&
1440     (r = a.result) != null &&
1441     compareAndSet(0, 1)) {
1442 jsr166 1.2 if (r instanceof AltResult) {
1443 dl 1.19 ex = ((AltResult)r).ex;
1444 dl 1.1 t = null;
1445     }
1446 dl 1.17 else {
1447     ex = null;
1448 dl 1.1 t = (T) r;
1449     }
1450 dl 1.17 if (ex == null) {
1451     try {
1452     if (executor != null)
1453     executor.execute(new AsyncFunction<T,U>(t, fn, dst));
1454     else
1455     dst.complete(fn.apply(t));
1456     } catch (Throwable rex) {
1457     ex = rex;
1458     }
1459     }
1460     if (ex != null)
1461 dl 1.19 dst.propagateException(ex);
1462 dl 1.1 }
1463     }
1464     }
1465    
1466 dl 1.7 static final class ThenBlock<T> extends Completion {
1467     final CompletableFuture<? extends T> src;
1468     final Block<? super T> fn;
1469     final CompletableFuture<Void> dst;
1470     final Executor executor;
1471     ThenBlock(CompletableFuture<? extends T> src,
1472     final Block<? super T> fn,
1473     final CompletableFuture<Void> dst, Executor executor) {
1474     this.src = src; this.fn = fn; this.dst = dst;
1475     this.executor = executor;
1476     }
1477     public void run() {
1478     CompletableFuture<? extends T> a;
1479     Block<? super T> fn;
1480     CompletableFuture<Void> dst;
1481     Object r; T t; Throwable ex;
1482     if ((dst = this.dst) != null &&
1483     (fn = this.fn) != null &&
1484     (a = this.src) != null &&
1485     (r = a.result) != null &&
1486     compareAndSet(0, 1)) {
1487     if (r instanceof AltResult) {
1488 dl 1.19 ex = ((AltResult)r).ex;
1489 dl 1.7 t = null;
1490     }
1491 dl 1.17 else {
1492     ex = null;
1493 dl 1.7 t = (T) r;
1494 dl 1.17 }
1495     if (ex == null) {
1496     try {
1497     if (executor != null)
1498     executor.execute(new AsyncBlock<T>(t, fn, dst));
1499     else {
1500     fn.accept(t);
1501     dst.complete(null);
1502     }
1503     } catch (Throwable rex) {
1504     ex = rex;
1505 dl 1.7 }
1506     }
1507 dl 1.17 if (ex != null)
1508 dl 1.19 dst.propagateException(ex);
1509 dl 1.7 }
1510     }
1511     }
1512    
1513 dl 1.1 static final class ThenRunnable<T> extends Completion {
1514 jsr166 1.2 final CompletableFuture<? extends T> src;
1515     final Runnable fn;
1516     final CompletableFuture<Void> dst;
1517 dl 1.1 final Executor executor;
1518     ThenRunnable(CompletableFuture<? extends T> src,
1519     Runnable fn,
1520     CompletableFuture<Void> dst,
1521     Executor executor) {
1522     this.src = src; this.fn = fn; this.dst = dst;
1523     this.executor = executor;
1524     }
1525 jsr166 1.2 public void run() {
1526 dl 1.1 CompletableFuture<? extends T> a;
1527     Runnable fn;
1528     CompletableFuture<Void> dst;
1529 jsr166 1.2 Object r; Throwable ex;
1530     if ((dst = this.dst) != null &&
1531 dl 1.1 (fn = this.fn) != null &&
1532     (a = this.src) != null &&
1533     (r = a.result) != null &&
1534     compareAndSet(0, 1)) {
1535 dl 1.19 if (r instanceof AltResult)
1536     ex = ((AltResult)r).ex;
1537 dl 1.17 else
1538     ex = null;
1539     if (ex == null) {
1540     try {
1541     if (executor != null)
1542     executor.execute(new AsyncRunnable(fn, dst));
1543     else {
1544     fn.run();
1545     dst.complete(null);
1546     }
1547     } catch (Throwable rex) {
1548     ex = rex;
1549 dl 1.1 }
1550     }
1551 dl 1.17 if (ex != null)
1552 dl 1.19 dst.propagateException(ex);
1553 dl 1.1 }
1554     }
1555     }
1556    
1557     static final class AndFunction<T,U,V> extends Completion {
1558 jsr166 1.2 final CompletableFuture<? extends T> src;
1559     final CompletableFuture<? extends U> snd;
1560     final BiFunction<? super T,? super U,? extends V> fn;
1561     final CompletableFuture<V> dst;
1562 dl 1.1 final Executor executor;
1563     AndFunction(CompletableFuture<? extends T> src,
1564     CompletableFuture<? extends U> snd,
1565     BiFunction<? super T,? super U,? extends V> fn,
1566     CompletableFuture<V> dst, Executor executor) {
1567     this.src = src; this.snd = snd;
1568     this.fn = fn; this.dst = dst;
1569     this.executor = executor;
1570     }
1571 jsr166 1.2 public void run() {
1572     Object r, s; T t; U u; Throwable ex;
1573 dl 1.1 CompletableFuture<? extends T> a;
1574     CompletableFuture<? extends U> b;
1575     BiFunction<? super T,? super U,? extends V> fn;
1576     CompletableFuture<V> dst;
1577 jsr166 1.2 if ((dst = this.dst) != null &&
1578 dl 1.1 (fn = this.fn) != null &&
1579     (a = this.src) != null &&
1580     (r = a.result) != null &&
1581     (b = this.snd) != null &&
1582     (s = b.result) != null &&
1583     compareAndSet(0, 1)) {
1584 jsr166 1.2 if (r instanceof AltResult) {
1585 dl 1.19 ex = ((AltResult)r).ex;
1586 jsr166 1.2 t = null;
1587     }
1588 dl 1.19 else {
1589     ex = null;
1590 jsr166 1.2 t = (T) r;
1591 dl 1.19 }
1592     if (ex != null)
1593     u = null;
1594     else if (s instanceof AltResult) {
1595     ex = ((AltResult)s).ex;
1596 jsr166 1.2 u = null;
1597     }
1598     else
1599     u = (U) s;
1600 dl 1.19 if (ex == null) {
1601     try {
1602     if (executor != null)
1603     executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1604     else
1605     dst.complete(fn.apply(t, u));
1606     } catch (Throwable rex) {
1607     ex = rex;
1608     }
1609 dl 1.1 }
1610 dl 1.19 if (ex != null)
1611     dst.propagateException(ex);
1612 jsr166 1.2 }
1613     }
1614 dl 1.1 }
1615    
1616 dl 1.7 static final class AndBlock<T,U> extends Completion {
1617     final CompletableFuture<? extends T> src;
1618     final CompletableFuture<? extends U> snd;
1619     final BiBlock<? super T,? super U> fn;
1620     final CompletableFuture<Void> dst;
1621     final Executor executor;
1622     AndBlock(CompletableFuture<? extends T> src,
1623     CompletableFuture<? extends U> snd,
1624     BiBlock<? super T,? super U> fn,
1625     CompletableFuture<Void> dst, Executor executor) {
1626     this.src = src; this.snd = snd;
1627     this.fn = fn; this.dst = dst;
1628     this.executor = executor;
1629     }
1630     public void run() {
1631     Object r, s; T t; U u; Throwable ex;
1632     CompletableFuture<? extends T> a;
1633     CompletableFuture<? extends U> b;
1634     BiBlock<? super T,? super U> fn;
1635     CompletableFuture<Void> dst;
1636     if ((dst = this.dst) != null &&
1637     (fn = this.fn) != null &&
1638     (a = this.src) != null &&
1639     (r = a.result) != null &&
1640     (b = this.snd) != null &&
1641     (s = b.result) != null &&
1642     compareAndSet(0, 1)) {
1643     if (r instanceof AltResult) {
1644 dl 1.19 ex = ((AltResult)r).ex;
1645 dl 1.7 t = null;
1646     }
1647 dl 1.19 else {
1648     ex = null;
1649 dl 1.7 t = (T) r;
1650 dl 1.19 }
1651     if (ex != null)
1652     u = null;
1653     else if (s instanceof AltResult) {
1654     ex = ((AltResult)s).ex;
1655 dl 1.7 u = null;
1656     }
1657     else
1658     u = (U) s;
1659 dl 1.19 if (ex == null) {
1660     try {
1661     if (executor != null)
1662     executor.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
1663     else {
1664     fn.accept(t, u);
1665     dst.complete(null);
1666     }
1667     } catch (Throwable rex) {
1668     ex = rex;
1669 dl 1.7 }
1670     }
1671 dl 1.19 if (ex != null)
1672     dst.propagateException(ex);
1673 dl 1.7 }
1674     }
1675     }
1676    
1677 dl 1.1 static final class AndRunnable<T> extends Completion {
1678 jsr166 1.2 final CompletableFuture<? extends T> src;
1679     final CompletableFuture<?> snd;
1680     final Runnable fn;
1681     final CompletableFuture<Void> dst;
1682 dl 1.1 final Executor executor;
1683     AndRunnable(CompletableFuture<? extends T> src,
1684     CompletableFuture<?> snd,
1685     Runnable fn,
1686     CompletableFuture<Void> dst, Executor executor) {
1687     this.src = src; this.snd = snd;
1688     this.fn = fn; this.dst = dst;
1689     this.executor = executor;
1690     }
1691 jsr166 1.2 public void run() {
1692     Object r, s; Throwable ex;
1693 dl 1.1 final CompletableFuture<? extends T> a;
1694     final CompletableFuture<?> b;
1695     final Runnable fn;
1696     final CompletableFuture<Void> dst;
1697 jsr166 1.2 if ((dst = this.dst) != null &&
1698 dl 1.1 (fn = this.fn) != null &&
1699     (a = this.src) != null &&
1700     (r = a.result) != null &&
1701     (b = this.snd) != null &&
1702     (s = b.result) != null &&
1703     compareAndSet(0, 1)) {
1704 dl 1.19 if (r instanceof AltResult)
1705     ex = ((AltResult)r).ex;
1706     else
1707     ex = null;
1708     if (ex == null && (s instanceof AltResult))
1709     ex = ((AltResult)s).ex;
1710     if (ex == null) {
1711     try {
1712     if (executor != null)
1713     executor.execute(new AsyncRunnable(fn, dst));
1714     else {
1715     fn.run();
1716     dst.complete(null);
1717     }
1718     } catch (Throwable rex) {
1719     ex = rex;
1720 dl 1.1 }
1721 jsr166 1.2 }
1722 dl 1.19 if (ex != null)
1723     dst.propagateException(ex);
1724 jsr166 1.2 }
1725     }
1726 dl 1.1 }
1727    
1728     static final class OrFunction<T,U> extends Completion {
1729 jsr166 1.2 final CompletableFuture<? extends T> src;
1730     final CompletableFuture<? extends T> snd;
1731     final Function<? super T,? extends U> fn;
1732     final CompletableFuture<U> dst;
1733 dl 1.1 final Executor executor;
1734     OrFunction(CompletableFuture<? extends T> src,
1735     CompletableFuture<? extends T> snd,
1736     Function<? super T,? extends U> fn,
1737     CompletableFuture<U> dst, Executor executor) {
1738     this.src = src; this.snd = snd;
1739     this.fn = fn; this.dst = dst;
1740     this.executor = executor;
1741     }
1742 jsr166 1.2 public void run() {
1743     Object r; T t; Throwable ex;
1744 dl 1.1 CompletableFuture<? extends T> a;
1745     CompletableFuture<? extends T> b;
1746     Function<? super T,? extends U> fn;
1747     CompletableFuture<U> dst;
1748 jsr166 1.2 if ((dst = this.dst) != null &&
1749 dl 1.1 (fn = this.fn) != null &&
1750     (((a = this.src) != null && (r = a.result) != null) ||
1751     ((b = this.snd) != null && (r = b.result) != null)) &&
1752     compareAndSet(0, 1)) {
1753 jsr166 1.2 if (r instanceof AltResult) {
1754 dl 1.19 ex = ((AltResult)r).ex;
1755 jsr166 1.2 t = null;
1756     }
1757 dl 1.19 else {
1758     ex = null;
1759 jsr166 1.2 t = (T) r;
1760 dl 1.1 }
1761 dl 1.19 if (ex == null) {
1762     try {
1763     if (executor != null)
1764     executor.execute(new AsyncFunction<T,U>(t, fn, dst));
1765     else
1766     dst.complete(fn.apply(t));
1767     } catch (Throwable rex) {
1768     ex = rex;
1769     }
1770     }
1771     if (ex != null)
1772     dst.propagateException(ex);
1773 jsr166 1.2 }
1774     }
1775 dl 1.1 }
1776    
1777 dl 1.7 static final class OrBlock<T> extends Completion {
1778     final CompletableFuture<? extends T> src;
1779     final CompletableFuture<? extends T> snd;
1780     final Block<? super T> fn;
1781     final CompletableFuture<Void> dst;
1782     final Executor executor;
1783     OrBlock(CompletableFuture<? extends T> src,
1784     CompletableFuture<? extends T> snd,
1785     Block<? super T> fn,
1786     CompletableFuture<Void> dst, Executor executor) {
1787     this.src = src; this.snd = snd;
1788     this.fn = fn; this.dst = dst;
1789     this.executor = executor;
1790     }
1791     public void run() {
1792     Object r; T t; Throwable ex;
1793     CompletableFuture<? extends T> a;
1794     CompletableFuture<? extends T> b;
1795     Block<? super T> fn;
1796     CompletableFuture<Void> dst;
1797     if ((dst = this.dst) != null &&
1798     (fn = this.fn) != null &&
1799     (((a = this.src) != null && (r = a.result) != null) ||
1800     ((b = this.snd) != null && (r = b.result) != null)) &&
1801     compareAndSet(0, 1)) {
1802     if (r instanceof AltResult) {
1803 dl 1.19 ex = ((AltResult)r).ex;
1804 dl 1.7 t = null;
1805     }
1806 dl 1.19 else {
1807     ex = null;
1808 dl 1.7 t = (T) r;
1809 dl 1.19 }
1810     if (ex == null) {
1811     try {
1812     if (executor != null)
1813     executor.execute(new AsyncBlock<T>(t, fn, dst));
1814     else {
1815     fn.accept(t);
1816     dst.complete(null);
1817     }
1818     } catch (Throwable rex) {
1819     ex = rex;
1820 dl 1.7 }
1821     }
1822 dl 1.19 if (ex != null)
1823     dst.propagateException(ex);
1824 dl 1.7 }
1825     }
1826     }
1827    
1828 dl 1.1 static final class OrRunnable<T> extends Completion {
1829 jsr166 1.2 final CompletableFuture<? extends T> src;
1830     final CompletableFuture<?> snd;
1831     final Runnable fn;
1832     final CompletableFuture<Void> dst;
1833 dl 1.1 final Executor executor;
1834     OrRunnable(CompletableFuture<? extends T> src,
1835     CompletableFuture<?> snd,
1836     Runnable fn,
1837     CompletableFuture<Void> dst, Executor executor) {
1838     this.src = src; this.snd = snd;
1839     this.fn = fn; this.dst = dst;
1840     this.executor = executor;
1841     }
1842 jsr166 1.2 public void run() {
1843     Object r; Throwable ex;
1844 dl 1.1 CompletableFuture<? extends T> a;
1845     final CompletableFuture<?> b;
1846     final Runnable fn;
1847     final CompletableFuture<Void> dst;
1848 jsr166 1.2 if ((dst = this.dst) != null &&
1849 dl 1.1 (fn = this.fn) != null &&
1850     (((a = this.src) != null && (r = a.result) != null) ||
1851     ((b = this.snd) != null && (r = b.result) != null)) &&
1852     compareAndSet(0, 1)) {
1853 dl 1.19 if (r instanceof AltResult)
1854     ex = ((AltResult)r).ex;
1855     else
1856     ex = null;
1857     if (ex == null) {
1858 dl 1.1 try {
1859     if (executor != null)
1860     executor.execute(new AsyncRunnable(fn, dst));
1861     else {
1862     fn.run();
1863     dst.complete(null);
1864     }
1865     } catch (Throwable rex) {
1866 dl 1.19 ex = rex;
1867 dl 1.1 }
1868     }
1869 dl 1.19 if (ex != null)
1870     dst.propagateException(ex);
1871 jsr166 1.2 }
1872     }
1873 dl 1.1 }
1874    
1875     static final class ExceptionAction<T> extends Completion {
1876 jsr166 1.2 final CompletableFuture<? extends T> src;
1877 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1878     final CompletableFuture<T> dst;
1879 dl 1.1 ExceptionAction(CompletableFuture<? extends T> src,
1880 dl 1.6 Function<? super Throwable, ? extends T> fn,
1881     CompletableFuture<T> dst) {
1882 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1883     }
1884 jsr166 1.2 public void run() {
1885 dl 1.1 CompletableFuture<? extends T> a;
1886 dl 1.6 Function<? super Throwable, ? extends T> fn;
1887     CompletableFuture<T> dst;
1888     Object r; T t; Throwable ex;
1889 jsr166 1.2 if ((dst = this.dst) != null &&
1890 dl 1.1 (fn = this.fn) != null &&
1891     (a = this.src) != null &&
1892     (r = a.result) != null &&
1893     compareAndSet(0, 1)) {
1894 dl 1.6 if (r instanceof AltResult) {
1895     if ((ex = ((AltResult)r).ex) != null) {
1896     try {
1897     dst.complete(fn.apply(ex));
1898     } catch (Throwable rex) {
1899     dst.completeExceptionally(rex);
1900     }
1901     return;
1902 dl 1.1 }
1903 dl 1.6 t = null;
1904 dl 1.1 }
1905 dl 1.6 else
1906     t = (T) r;
1907     dst.complete(t);
1908 dl 1.1 }
1909     }
1910     }
1911    
1912 dl 1.17 static final class ThenCopy<T> extends Completion {
1913     final CompletableFuture<? extends T> src;
1914     final CompletableFuture<T> dst;
1915     ThenCopy(CompletableFuture<? extends T> src,
1916     CompletableFuture<T> dst) {
1917     this.src = src; this.dst = dst;
1918     }
1919     public void run() {
1920     CompletableFuture<? extends T> a;
1921     CompletableFuture<T> dst;
1922     Object r; T t; Throwable ex;
1923     if ((dst = this.dst) != null &&
1924     (a = this.src) != null &&
1925     (r = a.result) != null &&
1926     compareAndSet(0, 1)) {
1927 dl 1.19 dst.propagateCompletion(r);
1928 dl 1.17 }
1929     }
1930     }
1931    
1932     static final class ThenHandle<T,U> extends Completion {
1933     final CompletableFuture<? extends T> src;
1934     final BiFunction<? super T, Throwable, ? extends U> fn;
1935     final CompletableFuture<U> dst;
1936     ThenHandle(CompletableFuture<? extends T> src,
1937     BiFunction<? super T, Throwable, ? extends U> fn,
1938     final CompletableFuture<U> dst) {
1939     this.src = src; this.fn = fn; this.dst = dst;
1940     }
1941     public void run() {
1942     CompletableFuture<? extends T> a;
1943     BiFunction<? super T, Throwable, ? extends U> fn;
1944     CompletableFuture<U> dst;
1945     Object r; T t; Throwable ex;
1946     if ((dst = this.dst) != null &&
1947     (fn = this.fn) != null &&
1948     (a = this.src) != null &&
1949     (r = a.result) != null &&
1950     compareAndSet(0, 1)) {
1951     if (r instanceof AltResult) {
1952     ex = ((AltResult)r).ex;
1953     t = null;
1954     }
1955     else {
1956     ex = null;
1957     t = (T) r;
1958     }
1959     try {
1960     dst.complete(fn.apply(t, ex));
1961     } catch (Throwable rex) {
1962     dst.completeExceptionally(rex);
1963     }
1964     }
1965     }
1966     }
1967    
1968     static final class ThenCompose<T,U> extends Completion {
1969     final CompletableFuture<? extends T> src;
1970     final Function<? super T, CompletableFuture<U>> fn;
1971     final CompletableFuture<U> dst;
1972     ThenCompose(CompletableFuture<? extends T> src,
1973     Function<? super T, CompletableFuture<U>> fn,
1974     final CompletableFuture<U> dst) {
1975     this.src = src; this.fn = fn; this.dst = dst;
1976     }
1977     public void run() {
1978     CompletableFuture<? extends T> a;
1979     Function<? super T, CompletableFuture<U>> fn;
1980     CompletableFuture<U> dst;
1981     Object r; T t; Throwable ex;
1982     if ((dst = this.dst) != null &&
1983     (fn = this.fn) != null &&
1984     (a = this.src) != null &&
1985     (r = a.result) != null &&
1986     compareAndSet(0, 1)) {
1987     if (r instanceof AltResult) {
1988     ex = ((AltResult)r).ex;
1989     t = null;
1990     }
1991     else {
1992     ex = null;
1993     t = (T) r;
1994     }
1995     CompletableFuture<U> c = null;
1996     if (ex == null) {
1997     try {
1998     c = fn.apply(t);
1999     } catch (Throwable rex) {
2000     ex = rex;
2001     }
2002     }
2003     if (ex != null || c == null) {
2004     if (ex == null)
2005     ex = new NullPointerException();
2006     }
2007 dl 1.18 else {
2008     ThenCopy<U> d = null;
2009     Object s;
2010     if ((s = c.result) == null) {
2011     CompletionNode p = new CompletionNode
2012     (d = new ThenCopy<U>(c, dst));
2013     while ((s = c.result) == null) {
2014     if (UNSAFE.compareAndSwapObject
2015     (c, COMPLETIONS, p.next = c.completions, p))
2016     break;
2017     }
2018 dl 1.17 }
2019 dl 1.18 if (s != null && (d == null || d.compareAndSet(0, 1))) {
2020     U u;
2021     if (s instanceof AltResult) {
2022     ex = ((AltResult)s).ex; // no rewrap
2023     u = null;
2024     }
2025     else
2026     u = (U) s;
2027     if (ex == null)
2028     dst.complete(u);
2029 dl 1.17 }
2030     }
2031 dl 1.18 if (ex != null)
2032 dl 1.19 dst.propagateException(ex);
2033     else if (c != null && c.result != null)
2034 dl 1.18 c.postComplete();
2035 dl 1.17 }
2036     }
2037     }
2038    
2039 dl 1.1 /* ------------- then/and/or implementations -------------- */
2040    
2041     private <U> CompletableFuture<U> thenFunction(Function<? super T,? extends U> fn,
2042     Executor executor) {
2043     if (fn == null) throw new NullPointerException();
2044     CompletableFuture<U> dst = new CompletableFuture<U>();
2045     ThenFunction<T,U> d = null;
2046 jsr166 1.2 Object r;
2047     if ((r = result) == null) {
2048 dl 1.1 CompletionNode p = new CompletionNode
2049     (d = new ThenFunction<T,U>(this, fn, dst, executor));
2050     while ((r = result) == null) {
2051     if (UNSAFE.compareAndSwapObject
2052     (this, COMPLETIONS, p.next = completions, p))
2053     break;
2054     }
2055     }
2056 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2057 dl 1.19 T t; Throwable ex;
2058 jsr166 1.2 if (r instanceof AltResult) {
2059 dl 1.19 ex = ((AltResult)r).ex;
2060 jsr166 1.2 t = null;
2061 dl 1.1 }
2062 dl 1.19 else {
2063     ex = null;
2064 jsr166 1.2 t = (T) r;
2065 dl 1.19 }
2066 jsr166 1.2 if (ex == null) {
2067 dl 1.1 try {
2068     if (executor != null)
2069 dl 1.7 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
2070 dl 1.1 else
2071     dst.complete(fn.apply(t));
2072     } catch (Throwable rex) {
2073 dl 1.19 ex = rex;
2074 dl 1.1 }
2075     }
2076 dl 1.19 if (ex != null)
2077     dst.propagateException(ex);
2078 dl 1.17 }
2079     if (r != null || result != null)
2080 jsr166 1.2 postComplete();
2081 dl 1.1 return dst;
2082     }
2083    
2084 dl 1.7 private CompletableFuture<Void> thenBlock(Block<? super T> fn,
2085     Executor executor) {
2086     if (fn == null) throw new NullPointerException();
2087     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2088     ThenBlock<T> d = null;
2089     Object r;
2090     if ((r = result) == null) {
2091     CompletionNode p = new CompletionNode
2092     (d = new ThenBlock<T>(this, fn, dst, executor));
2093     while ((r = result) == null) {
2094     if (UNSAFE.compareAndSwapObject
2095     (this, COMPLETIONS, p.next = completions, p))
2096     break;
2097     }
2098     }
2099     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2100 dl 1.19 T t; Throwable ex;
2101 dl 1.7 if (r instanceof AltResult) {
2102 dl 1.19 ex = ((AltResult)r).ex;
2103 dl 1.7 t = null;
2104     }
2105 dl 1.19 else {
2106     ex = null;
2107 dl 1.7 t = (T) r;
2108 dl 1.19 }
2109 dl 1.7 if (ex == null) {
2110     try {
2111     if (executor != null)
2112     executor.execute(new AsyncBlock<T>(t, fn, dst));
2113     else {
2114     fn.accept(t);
2115     dst.complete(null);
2116     }
2117     } catch (Throwable rex) {
2118 dl 1.19 ex = rex;
2119 dl 1.7 }
2120     }
2121 dl 1.19 if (ex != null)
2122     dst.propagateException(ex);
2123 dl 1.17 }
2124     if (r != null || result != null)
2125 dl 1.7 postComplete();
2126     return dst;
2127     }
2128    
2129 jsr166 1.2 private CompletableFuture<Void> thenRunnable(Runnable action,
2130 dl 1.1 Executor executor) {
2131     if (action == null) throw new NullPointerException();
2132     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2133     ThenRunnable<T> d = null;
2134 jsr166 1.2 Object r;
2135     if ((r = result) == null) {
2136 dl 1.1 CompletionNode p = new CompletionNode
2137     (d = new ThenRunnable<T>(this, action, dst, executor));
2138     while ((r = result) == null) {
2139     if (UNSAFE.compareAndSwapObject
2140     (this, COMPLETIONS, p.next = completions, p))
2141     break;
2142     }
2143     }
2144 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2145 dl 1.19 Throwable ex;
2146     if (r instanceof AltResult)
2147     ex = ((AltResult)r).ex;
2148     else
2149     ex = null;
2150 jsr166 1.2 if (ex == null) {
2151 dl 1.1 try {
2152     if (executor != null)
2153     executor.execute(new AsyncRunnable(action, dst));
2154     else {
2155     action.run();
2156     dst.complete(null);
2157     }
2158     } catch (Throwable rex) {
2159 dl 1.19 ex = rex;
2160 dl 1.1 }
2161     }
2162 dl 1.19 if (ex != null)
2163     dst.propagateException(ex);
2164 dl 1.17 }
2165     if (r != null || result != null)
2166 jsr166 1.2 postComplete();
2167 dl 1.1 return dst;
2168     }
2169    
2170     private <U,V> CompletableFuture<V> andFunction(CompletableFuture<? extends U> other,
2171     BiFunction<? super T,? super U,? extends V> fn,
2172     Executor executor) {
2173     if (other == null || fn == null) throw new NullPointerException();
2174 jsr166 1.2 CompletableFuture<V> dst = new CompletableFuture<V>();
2175     AndFunction<T,U,V> d = null;
2176     Object r, s = null;
2177     if ((r = result) == null || (s = other.result) == null) {
2178 dl 1.1 d = new AndFunction<T,U,V>(this, other, fn, dst, executor);
2179     CompletionNode q = null, p = new CompletionNode(d);
2180     while ((r == null && (r = result) == null) ||
2181     (s == null && (s = other.result) == null)) {
2182     if (q != null) {
2183     if (s != null ||
2184     UNSAFE.compareAndSwapObject
2185     (other, COMPLETIONS, q.next = other.completions, q))
2186     break;
2187     }
2188     else if (r != null ||
2189     UNSAFE.compareAndSwapObject
2190     (this, COMPLETIONS, p.next = completions, p)) {
2191     if (s != null)
2192     break;
2193     q = new CompletionNode(d);
2194     }
2195     }
2196     }
2197 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2198 dl 1.19 T t; U u; Throwable ex;
2199 jsr166 1.2 if (r instanceof AltResult) {
2200 dl 1.19 ex = ((AltResult)r).ex;
2201 jsr166 1.2 t = null;
2202 dl 1.1 }
2203 dl 1.19 else {
2204     ex = null;
2205 jsr166 1.2 t = (T) r;
2206 dl 1.19 }
2207 dl 1.1 if (ex != null)
2208     u = null;
2209     else if (s instanceof AltResult) {
2210 dl 1.19 ex = ((AltResult)s).ex;
2211 dl 1.1 u = null;
2212     }
2213     else
2214 jsr166 1.2 u = (U) s;
2215     if (ex == null) {
2216 dl 1.1 try {
2217     if (executor != null)
2218     executor.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
2219     else
2220     dst.complete(fn.apply(t, u));
2221     } catch (Throwable rex) {
2222 dl 1.19 ex = rex;
2223 dl 1.1 }
2224     }
2225 dl 1.19 if (ex != null)
2226     dst.propagateException(ex);
2227 jsr166 1.2 }
2228 dl 1.17 if (r != null || result != null)
2229 dl 1.1 postComplete();
2230 dl 1.17 if (s != null || other.result != null)
2231 dl 1.1 other.postComplete();
2232 jsr166 1.2 return dst;
2233 dl 1.1 }
2234    
2235 dl 1.7 private <U> CompletableFuture<Void> andBlock(CompletableFuture<? extends U> other,
2236     BiBlock<? super T,? super U> fn,
2237     Executor executor) {
2238     if (other == null || fn == null) throw new NullPointerException();
2239     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2240     AndBlock<T,U> d = null;
2241     Object r, s = null;
2242     if ((r = result) == null || (s = other.result) == null) {
2243     d = new AndBlock<T,U>(this, other, fn, dst, executor);
2244     CompletionNode q = null, p = new CompletionNode(d);
2245     while ((r == null && (r = result) == null) ||
2246     (s == null && (s = other.result) == null)) {
2247     if (q != null) {
2248     if (s != null ||
2249     UNSAFE.compareAndSwapObject
2250     (other, COMPLETIONS, q.next = other.completions, q))
2251     break;
2252     }
2253     else if (r != null ||
2254     UNSAFE.compareAndSwapObject
2255     (this, COMPLETIONS, p.next = completions, p)) {
2256     if (s != null)
2257     break;
2258     q = new CompletionNode(d);
2259     }
2260     }
2261     }
2262     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2263 dl 1.19 T t; U u; Throwable ex;
2264 dl 1.7 if (r instanceof AltResult) {
2265 dl 1.19 ex = ((AltResult)r).ex;
2266 dl 1.7 t = null;
2267     }
2268 dl 1.19 else {
2269     ex = null;
2270 dl 1.7 t = (T) r;
2271 dl 1.19 }
2272 dl 1.7 if (ex != null)
2273     u = null;
2274     else if (s instanceof AltResult) {
2275 dl 1.19 ex = ((AltResult)s).ex;
2276 dl 1.7 u = null;
2277     }
2278     else
2279     u = (U) s;
2280     if (ex == null) {
2281     try {
2282     if (executor != null)
2283     executor.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
2284     else {
2285     fn.accept(t, u);
2286     dst.complete(null);
2287     }
2288     } catch (Throwable rex) {
2289 dl 1.19 ex = rex;
2290 dl 1.7 }
2291     }
2292 dl 1.19 if (ex != null)
2293     dst.propagateException(ex);
2294 dl 1.7 }
2295 dl 1.17 if (r != null || result != null)
2296 dl 1.7 postComplete();
2297 dl 1.17 if (s != null || other.result != null)
2298 dl 1.7 other.postComplete();
2299     return dst;
2300     }
2301    
2302 dl 1.1 private CompletableFuture<Void> andRunnable(CompletableFuture<?> other,
2303     Runnable action,
2304     Executor executor) {
2305     if (other == null || action == null) throw new NullPointerException();
2306 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2307     AndRunnable<T> d = null;
2308     Object r, s = null;
2309     if ((r = result) == null || (s = other.result) == null) {
2310 dl 1.1 d = new AndRunnable<T>(this, other, action, dst, executor);
2311     CompletionNode q = null, p = new CompletionNode(d);
2312     while ((r == null && (r = result) == null) ||
2313     (s == null && (s = other.result) == null)) {
2314     if (q != null) {
2315     if (s != null ||
2316     UNSAFE.compareAndSwapObject
2317     (other, COMPLETIONS, q.next = other.completions, q))
2318     break;
2319     }
2320     else if (r != null ||
2321     UNSAFE.compareAndSwapObject
2322     (this, COMPLETIONS, p.next = completions, p)) {
2323     if (s != null)
2324     break;
2325     q = new CompletionNode(d);
2326     }
2327     }
2328     }
2329 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2330 dl 1.19 Throwable ex;
2331     if (r instanceof AltResult)
2332     ex = ((AltResult)r).ex;
2333     else
2334     ex = null;
2335     if (ex == null && (s instanceof AltResult))
2336     ex = ((AltResult)s).ex;
2337     if (ex == null) {
2338 dl 1.1 try {
2339     if (executor != null)
2340     executor.execute(new AsyncRunnable(action, dst));
2341     else {
2342     action.run();
2343     dst.complete(null);
2344     }
2345     } catch (Throwable rex) {
2346 dl 1.19 ex = rex;
2347 dl 1.1 }
2348     }
2349 dl 1.19 if (ex != null)
2350     dst.propagateException(ex);
2351 jsr166 1.2 }
2352 dl 1.17 if (r != null || result != null)
2353 dl 1.1 postComplete();
2354 dl 1.17 if (s != null || other.result != null)
2355 dl 1.1 other.postComplete();
2356 jsr166 1.2 return dst;
2357 dl 1.1 }
2358    
2359     private <U> CompletableFuture<U> orFunction(CompletableFuture<? extends T> other,
2360     Function<? super T, U> fn,
2361     Executor executor) {
2362     if (other == null || fn == null) throw new NullPointerException();
2363 jsr166 1.2 CompletableFuture<U> dst = new CompletableFuture<U>();
2364     OrFunction<T,U> d = null;
2365     Object r;
2366     if ((r = result) == null && (r = other.result) == null) {
2367 dl 1.1 d = new OrFunction<T,U>(this, other, fn, dst, executor);
2368     CompletionNode q = null, p = new CompletionNode(d);
2369     while ((r = result) == null && (r = other.result) == null) {
2370     if (q != null) {
2371     if (UNSAFE.compareAndSwapObject
2372     (other, COMPLETIONS, q.next = other.completions, q))
2373     break;
2374     }
2375     else if (UNSAFE.compareAndSwapObject
2376     (this, COMPLETIONS, p.next = completions, p))
2377     q = new CompletionNode(d);
2378     }
2379 jsr166 1.2 }
2380     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2381 dl 1.19 T t; Throwable ex;
2382 jsr166 1.2 if (r instanceof AltResult) {
2383 dl 1.19 ex = ((AltResult)r).ex;
2384 jsr166 1.2 t = null;
2385 dl 1.1 }
2386 dl 1.19 else {
2387     ex = null;
2388 jsr166 1.2 t = (T) r;
2389 dl 1.19 }
2390 jsr166 1.2 if (ex == null) {
2391 dl 1.1 try {
2392     if (executor != null)
2393 dl 1.7 executor.execute(new AsyncFunction<T,U>(t, fn, dst));
2394 dl 1.1 else
2395     dst.complete(fn.apply(t));
2396     } catch (Throwable rex) {
2397 dl 1.19 ex = rex;
2398 dl 1.1 }
2399     }
2400 dl 1.19 if (ex != null)
2401     dst.propagateException(ex);
2402 jsr166 1.2 }
2403 dl 1.17 if (result != null)
2404     postComplete();
2405     if (other.result != null)
2406     other.postComplete();
2407 jsr166 1.2 return dst;
2408 dl 1.1 }
2409    
2410 dl 1.7 private CompletableFuture<Void> orBlock(CompletableFuture<? extends T> other,
2411     Block<? super T> fn,
2412     Executor executor) {
2413     if (other == null || fn == null) throw new NullPointerException();
2414     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2415     OrBlock<T> d = null;
2416     Object r;
2417     if ((r = result) == null && (r = other.result) == null) {
2418     d = new OrBlock<T>(this, other, fn, dst, executor);
2419     CompletionNode q = null, p = new CompletionNode(d);
2420     while ((r = result) == null && (r = other.result) == null) {
2421     if (q != null) {
2422     if (UNSAFE.compareAndSwapObject
2423     (other, COMPLETIONS, q.next = other.completions, q))
2424     break;
2425     }
2426     else if (UNSAFE.compareAndSwapObject
2427     (this, COMPLETIONS, p.next = completions, p))
2428     q = new CompletionNode(d);
2429     }
2430     }
2431     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2432 dl 1.19 T t; Throwable ex;
2433 dl 1.7 if (r instanceof AltResult) {
2434 dl 1.19 ex = ((AltResult)r).ex;
2435 dl 1.7 t = null;
2436     }
2437 dl 1.19 else {
2438     ex = null;
2439 dl 1.7 t = (T) r;
2440 dl 1.19 }
2441 dl 1.7 if (ex == null) {
2442     try {
2443     if (executor != null)
2444     executor.execute(new AsyncBlock<T>(t, fn, dst));
2445     else {
2446     fn.accept(t);
2447     dst.complete(null);
2448     }
2449     } catch (Throwable rex) {
2450 dl 1.19 ex = rex;
2451 dl 1.7 }
2452     }
2453 dl 1.19 if (ex != null)
2454     dst.propagateException(ex);
2455 dl 1.7 }
2456 dl 1.17 if (result != null)
2457     postComplete();
2458     if (other.result != null)
2459     other.postComplete();
2460 dl 1.7 return dst;
2461     }
2462    
2463 dl 1.1 private CompletableFuture<Void> orRunnable(CompletableFuture<?> other,
2464     Runnable action,
2465     Executor executor) {
2466     if (other == null || action == null) throw new NullPointerException();
2467 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2468     OrRunnable<T> d = null;
2469     Object r;
2470     if ((r = result) == null && (r = other.result) == null) {
2471 dl 1.1 d = new OrRunnable<T>(this, other, action, dst, executor);
2472     CompletionNode q = null, p = new CompletionNode(d);
2473     while ((r = result) == null && (r = other.result) == null) {
2474     if (q != null) {
2475     if (UNSAFE.compareAndSwapObject
2476     (other, COMPLETIONS, q.next = other.completions, q))
2477     break;
2478     }
2479     else if (UNSAFE.compareAndSwapObject
2480     (this, COMPLETIONS, p.next = completions, p))
2481     q = new CompletionNode(d);
2482     }
2483 jsr166 1.2 }
2484     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2485 dl 1.19 Throwable ex;
2486     if (r instanceof AltResult)
2487     ex = ((AltResult)r).ex;
2488     else
2489     ex = null;
2490     if (ex == null) {
2491 dl 1.1 try {
2492     if (executor != null)
2493     executor.execute(new AsyncRunnable(action, dst));
2494     else {
2495     action.run();
2496     dst.complete(null);
2497     }
2498     } catch (Throwable rex) {
2499 dl 1.19 ex = rex;
2500 dl 1.1 }
2501     }
2502 dl 1.19 if (ex != null)
2503     dst.propagateException(ex);
2504 jsr166 1.2 }
2505 dl 1.17 if (result != null)
2506     postComplete();
2507     if (other.result != null)
2508     other.postComplete();
2509 jsr166 1.2 return dst;
2510 dl 1.1 }
2511    
2512     // Unsafe mechanics
2513     private static final sun.misc.Unsafe UNSAFE;
2514     private static final long RESULT;
2515     private static final long WAITERS;
2516     private static final long COMPLETIONS;
2517     static {
2518     try {
2519     UNSAFE = sun.misc.Unsafe.getUnsafe();
2520     Class<?> k = CompletableFuture.class;
2521     RESULT = UNSAFE.objectFieldOffset
2522     (k.getDeclaredField("result"));
2523     WAITERS = UNSAFE.objectFieldOffset
2524     (k.getDeclaredField("waiters"));
2525     COMPLETIONS = UNSAFE.objectFieldOffset
2526     (k.getDeclaredField("completions"));
2527     } catch (Exception e) {
2528     throw new Error(e);
2529     }
2530     }
2531    
2532     }