ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.24
Committed: Mon Dec 31 17:50:56 2012 UTC (11 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.23: +1 -1 lines
Log Message:
double trouble

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.7 import java.util.function.Block;
10     import java.util.function.BiBlock;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22 dl 1.19 import java.util.concurrent.CompletionException;
23 dl 1.1 import java.util.concurrent.atomic.AtomicInteger;
24     import java.util.concurrent.locks.LockSupport;
25    
26     /**
27     * A {@link Future} that may be explicitly completed (setting its
28     * value and status), and may include dependent functions and actions
29     * that trigger upon its completion.
30     *
31 dl 1.7 * <p>Similar methods are available for Functions, Blocks, and
32     * Runnables, depending on whether actions require arguments and/or
33     * produce results. Functions and actions supplied for dependent
34     * completions (mainly using methods with prefix {@code then}) may be
35 dl 1.1 * performed by the thread that completes the current
36     * CompletableFuture, or by any other caller of these methods. There
37     * are no guarantees about the order of processing completions unless
38 dl 1.17 * constrained by these methods.
39 dl 1.1 *
40 jsr166 1.4 * <p>When two or more threads attempt to {@link #complete} or {@link
41 dl 1.17 * #completeExceptionally} a CompletableFuture, only one of them
42 dl 1.19 * succeeds.
43     *
44 jsr166 1.23 * <p>Upon exceptional completion, or when a completion entails
45 dl 1.5 * computation of a function or action, and it terminates abruptly
46 dl 1.19 * with an (unchecked) exception or error, then further completions
47     * act as {@code completeExceptionally} with a {@link
48     * CompletionException} holding that exception as its cause. If a
49     * CompletableFuture completes exceptionally, and is not followed by a
50     * {@link #exceptionally} or {@link #handle} completion, then all of
51     * its dependents (and their dependents) also complete exceptionally
52 dl 1.20 * with CompletionExceptions holding the ultimate cause. In case of a
53     * CompletionException, methods {@link #get()} and {@link #get(long,
54     * TimeUnit)} throw a {@link ExecutionException} with the same cause
55     * as would be held in the corresponding CompletionException. However,
56     * in these cases, methods {@link #join()} and {@link #getNow} throw
57     * the CompletionException, which simplifies usage especially within
58     * other completion functions.
59 dl 1.1 *
60     * <p>CompletableFutures themselves do not execute asynchronously.
61     * However, the {@code async} methods provide commonly useful ways to
62 jsr166 1.8 * commence asynchronous processing, using either a given {@link
63 dl 1.1 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
64     * function or action that will result in the completion of a new
65     * CompletableFuture.
66     *
67     * @author Doug Lea
68     * @since 1.8
69     */
70     public class CompletableFuture<T> implements Future<T> {
71     /*
72 dl 1.20 * Overview:
73 dl 1.1 *
74 dl 1.20 * 1. Non-nullness of field result (set via CAS) indicates
75     * done. An AltResult is used to box null as a result, as well as
76     * to hold exceptions. Using a single field makes completion fast
77     * and simple to detect and trigger, at the expense of a lot of
78     * encoding and decoding that infiltrates many methods. One minor
79     * simplification relies on the (static) NIL (to box null results)
80     * being the only AltResult with a null exception field, so we
81     * don't usually need explicit comparisons with NIL.
82 dl 1.1 *
83     * 2. Waiters are held in a Treiber stack similar to the one used
84 dl 1.20 * in FutureTask, Phaser, and SynchronousQueue. See their
85     * internal documentation for details.
86 dl 1.1 *
87     * 3. Completions are also kept in a list/stack, and pulled off
88 dl 1.20 * and run when completion is triggered. (We could in fact use the
89 jsr166 1.24 * same stack as for waiters, but would give up the potential
90 dl 1.20 * parallelism obtained because woken waiters help release/run
91     * others (see method postComplete). Because post-processing may
92     * race with direct calls, completions extend AtomicInteger so
93     * callers can claim the action via compareAndSet(0, 1). The
94     * Completion.run methods are all written a boringly similar
95     * uniform way (that sometimes includes unnecessary-looking
96     * checks, kept to maintain uniformity). There are enough
97     * dimensions upon which they differ that factoring to use common
98     * code isn't worthwhile.
99     *
100     * 4. The exported then/and/or methods do support a bit of
101     * factoring (see thenFunction, andBlock, etc). They must cope
102     * with the intrinsic races surrounding addition of a dependent
103     * action versus performing the action directly because the task
104     * is already complete. For example, a CF may not be complete
105     * upon entry, so a dependent completion is added, but by the time
106     * it is added, the target CF is complete, so must be directly
107     * executed. This is all done while avoiding unnecessary object
108     * construction in safe-bypass cases.
109 dl 1.1 */
110    
111 dl 1.20 // preliminary class definitions
112    
113 dl 1.1 static final class AltResult {
114     final Throwable ex; // null only for NIL
115 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
116 dl 1.1 }
117    
118     static final AltResult NIL = new AltResult(null);
119    
120     /**
121     * Simple linked list nodes to record waiting threads in a Treiber
122     * stack. See other classes such as Phaser and SynchronousQueue
123     * for more detailed explanation.
124     */
125     static final class WaitNode {
126     volatile Thread thread;
127     volatile WaitNode next;
128     }
129    
130     /**
131     * Simple linked list nodes to record completions, used in
132 dl 1.20 * basically the same way as WaitNodes. (We separate nodes from
133     * the Completions themselves mainly because for the And and Or
134     * methods, the same Completion object resides in two lists.)
135 dl 1.1 */
136     static final class CompletionNode {
137 jsr166 1.2 final Completion completion;
138 dl 1.1 volatile CompletionNode next;
139 jsr166 1.2 CompletionNode(Completion completion) { this.completion = completion; }
140 dl 1.1 }
141    
142 dl 1.20
143     // Fields
144    
145     volatile Object result; // Either the result or boxed AltResult
146 dl 1.1 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
147     volatile CompletionNode completions; // list (Treiber stack) of completions
148    
149 dl 1.20 // Basic utilities for triggering and processing completions
150     // (The Completion and Async classes and internal methods that
151     // use them are defined after the public methods.)
152    
153     /**
154     * Removes and signals all waiting threads and runs all completions.
155     */
156     final void postComplete() {
157     WaitNode q; Thread t;
158     while ((q = waiters) != null) {
159     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
160     (t = q.thread) != null) {
161     q.thread = null;
162     LockSupport.unpark(t);
163     }
164     }
165    
166     CompletionNode h; Completion c;
167     while ((h = completions) != null) {
168     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
169     (c = h.completion) != null)
170     c.run();
171     }
172     }
173    
174     /**
175     * Triggers completion with the encoding of the given arguments:
176     * if the exception is non-null, encodes it as a wrapped
177     * CompletionException unless it is one already. Otherwise uses
178     * the given result, boxed as NIL if null.
179     */
180     final void internalComplete(Object v, Throwable ex) {
181     if (result == null)
182     UNSAFE.compareAndSwapObject
183     (this, RESULT, null,
184     (ex == null) ? (v == null) ? NIL : v :
185     new AltResult((ex instanceof CompletionException) ? ex :
186     new CompletionException(ex)));
187     postComplete(); // help out even if not triggered
188     }
189    
190     /**
191     * If triggered, help release and/or process completions
192     */
193     final void helpPostComplete() {
194     if (result != null)
195     postComplete();
196     }
197    
198     // public methods
199    
200 dl 1.1 /**
201     * Creates a new incomplete CompletableFuture.
202     */
203     public CompletableFuture() {
204     }
205    
206     /**
207     * Asynchronously executes in the {@link
208     * ForkJoinPool#commonPool()}, a task that completes the returned
209     * CompletableFuture with the result of the given Supplier.
210     *
211     * @param supplier a function returning the value to be used
212 jsr166 1.14 * to complete the returned CompletableFuture
213 jsr166 1.11 * @return the CompletableFuture
214 dl 1.1 */
215 dl 1.7 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
216 dl 1.1 if (supplier == null) throw new NullPointerException();
217     CompletableFuture<U> f = new CompletableFuture<U>();
218     ForkJoinPool.commonPool().
219 dl 1.7 execute((ForkJoinTask<?>)new AsyncSupplier<U>(supplier, f));
220 dl 1.1 return f;
221     }
222    
223     /**
224     * Asynchronously executes using the given executor, a task that
225     * completes the returned CompletableFuture with the result of the
226     * given Supplier.
227     *
228     * @param supplier a function returning the value to be used
229 jsr166 1.11 * to complete the returned CompletableFuture
230 dl 1.1 * @param executor the executor to use for asynchronous execution
231 jsr166 1.11 * @return the CompletableFuture
232 dl 1.1 */
233 dl 1.7 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
234     Executor executor) {
235 dl 1.1 if (executor == null || supplier == null)
236     throw new NullPointerException();
237     CompletableFuture<U> f = new CompletableFuture<U>();
238 dl 1.7 executor.execute(new AsyncSupplier<U>(supplier, f));
239 dl 1.1 return f;
240     }
241    
242     /**
243     * Asynchronously executes in the {@link
244     * ForkJoinPool#commonPool()} a task that runs the given action,
245 jsr166 1.11 * and then completes the returned CompletableFuture.
246 dl 1.1 *
247     * @param runnable the action to run before completing the
248 jsr166 1.11 * returned CompletableFuture
249     * @return the CompletableFuture
250 dl 1.1 */
251 dl 1.7 public static CompletableFuture<Void> runAsync(Runnable runnable) {
252 dl 1.1 if (runnable == null) throw new NullPointerException();
253     CompletableFuture<Void> f = new CompletableFuture<Void>();
254     ForkJoinPool.commonPool().
255     execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
256     return f;
257     }
258    
259     /**
260     * Asynchronously executes using the given executor, a task that
261     * runs the given action, and then completes the returned
262 jsr166 1.11 * CompletableFuture.
263 dl 1.1 *
264     * @param runnable the action to run before completing the
265 jsr166 1.11 * returned CompletableFuture
266 dl 1.1 * @param executor the executor to use for asynchronous execution
267 jsr166 1.11 * @return the CompletableFuture
268 dl 1.1 */
269 dl 1.7 public static CompletableFuture<Void> runAsync(Runnable runnable,
270 jsr166 1.12 Executor executor) {
271 dl 1.1 if (executor == null || runnable == null)
272     throw new NullPointerException();
273     CompletableFuture<Void> f = new CompletableFuture<Void>();
274     executor.execute(new AsyncRunnable(runnable, f));
275     return f;
276     }
277    
278     /**
279     * Returns {@code true} if completed in any fashion: normally,
280 jsr166 1.16 * exceptionally, or via cancellation.
281 dl 1.1 *
282     * @return {@code true} if completed
283     */
284     public boolean isDone() {
285     return result != null;
286     }
287    
288     /**
289 dl 1.19 * Waits if necessary for the computation to complete, and then
290     * retrieves its result.
291     *
292     * @return the computed result
293     * @throws CancellationException if the computation was cancelled
294     * @throws ExecutionException if the computation threw an
295     * exception
296     * @throws InterruptedException if the current thread was interrupted
297     * while waiting
298     */
299 dl 1.20 @SuppressWarnings("unchecked") public T get() throws InterruptedException, ExecutionException {
300     Object r; Throwable ex, cause;
301 dl 1.19 if ((r = result) == null && (r = waitingGet(true)) == null)
302     throw new InterruptedException();
303     if (r instanceof AltResult) {
304     if ((ex = ((AltResult)r).ex) != null) {
305     if (ex instanceof CancellationException)
306     throw (CancellationException)ex;
307 dl 1.20 if ((ex instanceof CompletionException) &&
308     (cause = ex.getCause()) != null)
309     ex = cause;
310 dl 1.19 throw new ExecutionException(ex);
311     }
312     return null;
313     }
314     return (T)r;
315     }
316    
317     /**
318 dl 1.20 * Waits if necessary for at most the given time for completion,
319     * and then retrieves its result, if available.
320 dl 1.1 *
321 dl 1.20 * @param timeout the maximum time to wait
322     * @param unit the time unit of the timeout argument
323     * @return the computed result
324 dl 1.19 * @throws CancellationException if the computation was cancelled
325 dl 1.20 * @throws ExecutionException if the computation threw an
326 dl 1.19 * exception
327 dl 1.20 * @throws InterruptedException if the current thread was interrupted
328     * while waiting
329     * @throws TimeoutException if the wait timed out
330 dl 1.1 */
331 dl 1.20 @SuppressWarnings("unchecked") public T get(long timeout, TimeUnit unit)
332     throws InterruptedException, ExecutionException, TimeoutException {
333     Object r; Throwable ex, cause;
334     long nanos = unit.toNanos(timeout);
335     if (Thread.interrupted())
336     throw new InterruptedException();
337 jsr166 1.2 if ((r = result) == null)
338 dl 1.20 r = timedAwaitDone(nanos);
339 jsr166 1.2 if (r instanceof AltResult) {
340 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
341 dl 1.19 if (ex instanceof CancellationException)
342     throw (CancellationException)ex;
343 dl 1.20 if ((ex instanceof CompletionException) &&
344     (cause = ex.getCause()) != null)
345     ex = cause;
346     throw new ExecutionException(ex);
347 dl 1.5 }
348 jsr166 1.2 return null;
349     }
350     return (T)r;
351 dl 1.1 }
352    
353     /**
354 dl 1.20 * Returns the result value when complete, or throws an
355     * (unchecked) exception if completed exceptionally. To better
356     * conform with the use of common functional forms, if a
357     * computation involved in the completion of this
358     * CompletableFuture threw an exception, this method throws an
359     * (unchecked) {@link CompletionException} with the underlying
360     * exception as its cause.
361 dl 1.1 *
362 dl 1.20 * @return the result value
363 dl 1.19 * @throws CancellationException if the computation was cancelled
364 dl 1.20 * @throws CompletionException if a completion computation threw
365     * an exception
366 dl 1.1 */
367 dl 1.20 @SuppressWarnings("unchecked") public T join() {
368 dl 1.1 Object r; Throwable ex;
369 jsr166 1.2 if ((r = result) == null)
370 dl 1.20 r = waitingGet(false);
371 jsr166 1.2 if (r instanceof AltResult) {
372 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
373 dl 1.19 if (ex instanceof CancellationException)
374     throw (CancellationException)ex;
375     if (ex instanceof CompletionException)
376     throw (CompletionException)ex;
377     throw new CompletionException(ex);
378 dl 1.5 }
379 jsr166 1.2 return null;
380     }
381     return (T)r;
382 dl 1.1 }
383    
384     /**
385 dl 1.20 * Returns the result value (or throws any encountered exception)
386     * if completed, else returns the given valueIfAbsent.
387 dl 1.1 *
388 dl 1.20 * @param valueIfAbsent the value to return if not completed
389     * @return the result value, if completed, else the given valueIfAbsent
390 dl 1.1 * @throws CancellationException if the computation was cancelled
391 dl 1.20 * @throws CompletionException if a completion computation threw
392     * an exception
393 dl 1.1 */
394 dl 1.20 @SuppressWarnings("unchecked") public T getNow(T valueIfAbsent) {
395 dl 1.1 Object r; Throwable ex;
396 jsr166 1.2 if ((r = result) == null)
397 dl 1.20 return valueIfAbsent;
398 jsr166 1.2 if (r instanceof AltResult) {
399 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
400 dl 1.19 if (ex instanceof CancellationException)
401     throw (CancellationException)ex;
402 dl 1.20 if (ex instanceof CompletionException)
403     throw (CompletionException)ex;
404     throw new CompletionException(ex);
405 dl 1.5 }
406 jsr166 1.2 return null;
407     }
408     return (T)r;
409 dl 1.1 }
410    
411     /**
412     * If not already completed, sets the value returned by {@link
413     * #get()} and related methods to the given value.
414     *
415     * @param value the result value
416     * @return true if this invocation caused this CompletableFuture
417 jsr166 1.14 * to transition to a completed state, else false
418 dl 1.1 */
419     public boolean complete(T value) {
420 dl 1.20 boolean triggered =
421     result == null &&
422 dl 1.1 UNSAFE.compareAndSwapObject(this, RESULT, null,
423 dl 1.20 value == null ? NIL : value);
424     postComplete();
425     return triggered;
426 dl 1.19 }
427    
428     /**
429 dl 1.1 * If not already completed, causes invocations of {@link #get()}
430     * and related methods to throw the given exception.
431     *
432     * @param ex the exception
433     * @return true if this invocation caused this CompletableFuture
434 jsr166 1.14 * to transition to a completed state, else false
435 dl 1.1 */
436     public boolean completeExceptionally(Throwable ex) {
437     if (ex == null) throw new NullPointerException();
438 dl 1.20 boolean triggered =
439     result == null &&
440     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
441     postComplete();
442     return triggered;
443 dl 1.19 }
444    
445     /**
446 dl 1.1 * Creates and returns a CompletableFuture that is completed with
447     * the result of the given function of this CompletableFuture.
448     * If this CompletableFuture completes exceptionally,
449     * then the returned CompletableFuture also does so,
450 dl 1.19 * with a CompletionException holding this exception as
451 dl 1.1 * its cause.
452     *
453     * @param fn the function to use to compute the value of
454     * the returned CompletableFuture
455     * @return the new CompletableFuture
456     */
457 dl 1.7 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
458 dl 1.1 return thenFunction(fn, null);
459     }
460    
461     /**
462     * Creates and returns a CompletableFuture that is asynchronously
463     * completed using the {@link ForkJoinPool#commonPool()} with the
464     * result of the given function of this CompletableFuture. If
465     * this CompletableFuture completes exceptionally, then the
466     * returned CompletableFuture also does so, with a
467 dl 1.19 * CompletionException holding this exception as its cause.
468 dl 1.1 *
469     * @param fn the function to use to compute the value of
470     * the returned CompletableFuture
471     * @return the new CompletableFuture
472     */
473 dl 1.7 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
474 dl 1.1 return thenFunction(fn, ForkJoinPool.commonPool());
475     }
476    
477     /**
478     * Creates and returns a CompletableFuture that is asynchronously
479     * completed using the given executor with the result of the given
480     * function of this CompletableFuture. If this CompletableFuture
481     * completes exceptionally, then the returned CompletableFuture
482 dl 1.19 * also does so, with a CompletionException holding this exception as
483 dl 1.1 * its cause.
484     *
485     * @param fn the function to use to compute the value of
486     * the returned CompletableFuture
487     * @param executor the executor to use for asynchronous execution
488     * @return the new CompletableFuture
489     */
490 dl 1.7 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
491 jsr166 1.12 Executor executor) {
492 dl 1.1 if (executor == null) throw new NullPointerException();
493     return thenFunction(fn, executor);
494     }
495    
496     /**
497     * Creates and returns a CompletableFuture that is completed after
498 dl 1.7 * performing the given action with this CompletableFuture's
499     * result when it completes. If this CompletableFuture
500     * completes exceptionally, then the returned CompletableFuture
501 dl 1.19 * also does so, with a CompletionException holding this exception as
502 dl 1.7 * its cause.
503     *
504     * @param block the action to perform before completing the
505     * returned CompletableFuture
506     * @return the new CompletableFuture
507     */
508     public CompletableFuture<Void> thenAccept(Block<? super T> block) {
509     return thenBlock(block, null);
510     }
511    
512     /**
513     * Creates and returns a CompletableFuture that is asynchronously
514     * completed using the {@link ForkJoinPool#commonPool()} with this
515     * CompletableFuture's result when it completes. If this
516     * CompletableFuture completes exceptionally, then the returned
517 dl 1.19 * CompletableFuture also does so, with a CompletionException holding
518 dl 1.7 * this exception as its cause.
519     *
520     * @param block the action to perform before completing the
521     * returned CompletableFuture
522     * @return the new CompletableFuture
523     */
524     public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block) {
525     return thenBlock(block, ForkJoinPool.commonPool());
526     }
527    
528     /**
529     * Creates and returns a CompletableFuture that is asynchronously
530     * completed using the given executor with this
531     * CompletableFuture's result when it completes. If this
532     * CompletableFuture completes exceptionally, then the returned
533 dl 1.19 * CompletableFuture also does so, with a CompletionException holding
534 dl 1.7 * this exception as its cause.
535     *
536     * @param block the action to perform before completing the
537     * returned CompletableFuture
538     * @param executor the executor to use for asynchronous execution
539     * @return the new CompletableFuture
540     */
541     public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block,
542     Executor executor) {
543     if (executor == null) throw new NullPointerException();
544     return thenBlock(block, executor);
545     }
546    
547     /**
548     * Creates and returns a CompletableFuture that is completed after
549     * performing the given action when this CompletableFuture
550 dl 1.1 * completes. If this CompletableFuture completes exceptionally,
551     * then the returned CompletableFuture also does so, with a
552 dl 1.19 * CompletionException holding this exception as its cause.
553 dl 1.1 *
554     * @param action the action to perform before completing the
555     * returned CompletableFuture
556     * @return the new CompletableFuture
557     */
558 dl 1.7 public CompletableFuture<Void> thenRun(Runnable action) {
559 dl 1.1 return thenRunnable(action, null);
560     }
561    
562     /**
563     * Creates and returns a CompletableFuture that is asynchronously
564     * completed using the {@link ForkJoinPool#commonPool()} after
565 dl 1.7 * performing the given action when this CompletableFuture
566 dl 1.1 * completes. If this CompletableFuture completes exceptionally,
567     * then the returned CompletableFuture also does so, with a
568 dl 1.19 * CompletionException holding this exception as its cause.
569 dl 1.1 *
570     * @param action the action to perform before completing the
571     * returned CompletableFuture
572     * @return the new CompletableFuture
573     */
574 dl 1.7 public CompletableFuture<Void> thenRunAsync(Runnable action) {
575 dl 1.1 return thenRunnable(action, ForkJoinPool.commonPool());
576     }
577    
578     /**
579     * Creates and returns a CompletableFuture that is asynchronously
580     * completed using the given executor after performing the given
581 dl 1.7 * action when this CompletableFuture completes. If this
582 dl 1.1 * CompletableFuture completes exceptionally, then the returned
583 dl 1.19 * CompletableFuture also does so, with a CompletionException holding
584 dl 1.1 * this exception as its cause.
585     *
586     * @param action the action to perform before completing the
587     * returned CompletableFuture
588     * @param executor the executor to use for asynchronous execution
589     * @return the new CompletableFuture
590     */
591 jsr166 1.12 public CompletableFuture<Void> thenRunAsync(Runnable action,
592     Executor executor) {
593 dl 1.1 if (executor == null) throw new NullPointerException();
594     return thenRunnable(action, executor);
595     }
596    
597     /**
598     * Creates and returns a CompletableFuture that is completed with
599     * the result of the given function of this and the other given
600 dl 1.7 * CompletableFuture's results when both complete. If this or
601 dl 1.1 * the other CompletableFuture complete exceptionally, then the
602     * returned CompletableFuture also does so, with a
603 dl 1.19 * CompletionException holding the exception as its cause.
604 dl 1.1 *
605     * @param other the other CompletableFuture
606     * @param fn the function to use to compute the value of
607     * the returned CompletableFuture
608     * @return the new CompletableFuture
609     */
610 dl 1.17 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
611     BiFunction<? super T,? super U,? extends V> fn) {
612 dl 1.1 return andFunction(other, fn, null);
613     }
614    
615     /**
616 jsr166 1.3 * Creates and returns a CompletableFuture that is asynchronously
617 dl 1.1 * completed using the {@link ForkJoinPool#commonPool()} with
618     * the result of the given function of this and the other given
619 dl 1.7 * CompletableFuture's results when both complete. If this or
620 dl 1.1 * the other CompletableFuture complete exceptionally, then the
621     * returned CompletableFuture also does so, with a
622 dl 1.19 * CompletionException holding the exception as its cause.
623 dl 1.1 *
624     * @param other the other CompletableFuture
625     * @param fn the function to use to compute the value of
626     * the returned CompletableFuture
627     * @return the new CompletableFuture
628     */
629 dl 1.17 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
630     BiFunction<? super T,? super U,? extends V> fn) {
631 dl 1.1 return andFunction(other, fn, ForkJoinPool.commonPool());
632     }
633    
634     /**
635 jsr166 1.3 * Creates and returns a CompletableFuture that is
636 dl 1.1 * asynchronously completed using the given executor with the
637     * result of the given function of this and the other given
638 dl 1.7 * CompletableFuture's results when both complete. If this or
639 dl 1.1 * the other CompletableFuture complete exceptionally, then the
640     * returned CompletableFuture also does so, with a
641 dl 1.19 * CompletionException holding the exception as its cause.
642 dl 1.1 *
643     * @param other the other CompletableFuture
644     * @param fn the function to use to compute the value of
645     * the returned CompletableFuture
646     * @param executor the executor to use for asynchronous execution
647     * @return the new CompletableFuture
648     */
649    
650 dl 1.17 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
651     BiFunction<? super T,? super U,? extends V> fn,
652     Executor executor) {
653 dl 1.7 if (executor == null) throw new NullPointerException();
654     return andFunction(other, fn, executor);
655     }
656    
657     /**
658     * Creates and returns a CompletableFuture that is completed with
659     * the results of this and the other given CompletableFuture if
660     * both complete. If this and/or the other CompletableFuture
661     * complete exceptionally, then the returned CompletableFuture
662 dl 1.19 * also does so, with a CompletionException holding one of these
663 dl 1.7 * exceptions as its cause.
664     *
665     * @param other the other CompletableFuture
666     * @param block the action to perform before completing the
667     * returned CompletableFuture
668     * @return the new CompletableFuture
669     */
670 dl 1.17 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
671     BiBlock<? super T, ? super U> block) {
672 dl 1.7 return andBlock(other, block, null);
673     }
674    
675     /**
676     * Creates and returns a CompletableFuture that is completed
677     * asynchronously using the {@link ForkJoinPool#commonPool()} with
678     * the results of this and the other given CompletableFuture when
679     * both complete. If this and/or the other CompletableFuture
680     * complete exceptionally, then the returned CompletableFuture
681 dl 1.19 * also does so, with a CompletionException holding one of these
682 dl 1.7 * exceptions as its cause.
683     *
684     * @param other the other CompletableFuture
685     * @param block the action to perform before completing the
686     * returned CompletableFuture
687     * @return the new CompletableFuture
688     */
689 dl 1.17 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
690     BiBlock<? super T, ? super U> block) {
691 dl 1.7 return andBlock(other, block, ForkJoinPool.commonPool());
692     }
693    
694     /**
695     * Creates and returns a CompletableFuture that is completed
696     * asynchronously using the given executor with the results of
697     * this and the other given CompletableFuture when both complete.
698     * If this and/or the other CompletableFuture complete
699     * exceptionally, then the returned CompletableFuture also does
700 dl 1.19 * so, with a CompletionException holding one of these exceptions as
701 dl 1.7 * its cause.
702     *
703     * @param other the other CompletableFuture
704     * @param block the action to perform before completing the
705     * returned CompletableFuture
706     * @param executor the executor to use for asynchronous execution
707     * @return the new CompletableFuture
708     */
709 dl 1.17 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
710     BiBlock<? super T, ? super U> block,
711     Executor executor) {
712 dl 1.1 if (executor == null) throw new NullPointerException();
713 dl 1.7 return andBlock(other, block, executor);
714 dl 1.1 }
715    
716     /**
717     * Creates and returns a CompletableFuture that is completed
718 dl 1.7 * when this and the other given CompletableFuture both
719 dl 1.1 * complete. If this and/or the other CompletableFuture complete
720     * exceptionally, then the returned CompletableFuture also does
721 dl 1.19 * so, with a CompletionException holding one of these exceptions as
722 dl 1.1 * its cause.
723     *
724     * @param other the other CompletableFuture
725     * @param action the action to perform before completing the
726     * returned CompletableFuture
727     * @return the new CompletableFuture
728     */
729 dl 1.17 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
730     Runnable action) {
731 dl 1.1 return andRunnable(other, action, null);
732     }
733    
734     /**
735     * Creates and returns a CompletableFuture that is completed
736     * asynchronously using the {@link ForkJoinPool#commonPool()}
737 dl 1.7 * when this and the other given CompletableFuture both
738 dl 1.1 * complete. If this and/or the other CompletableFuture complete
739     * exceptionally, then the returned CompletableFuture also does
740 dl 1.19 * so, with a CompletionException holding one of these exceptions as
741 dl 1.1 * its cause.
742     *
743     * @param other the other CompletableFuture
744     * @param action the action to perform before completing the
745     * returned CompletableFuture
746     * @return the new CompletableFuture
747     */
748 dl 1.17 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
749     Runnable action) {
750 dl 1.1 return andRunnable(other, action, ForkJoinPool.commonPool());
751     }
752    
753     /**
754     * Creates and returns a CompletableFuture that is completed
755     * asynchronously using the given executor
756 dl 1.7 * when this and the other given CompletableFuture both
757 dl 1.1 * complete. If this and/or the other CompletableFuture complete
758     * exceptionally, then the returned CompletableFuture also does
759 dl 1.19 * so, with a CompletionException holding one of these exceptions as
760 dl 1.1 * its cause.
761     *
762     * @param other the other CompletableFuture
763     * @param action the action to perform before completing the
764     * returned CompletableFuture
765     * @param executor the executor to use for asynchronous execution
766     * @return the new CompletableFuture
767     */
768 dl 1.17 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
769     Runnable action,
770     Executor executor) {
771 dl 1.1 if (executor == null) throw new NullPointerException();
772     return andRunnable(other, action, executor);
773     }
774    
775     /**
776     * Creates and returns a CompletableFuture that is completed with
777     * the result of the given function of either this or the other
778 dl 1.7 * given CompletableFuture's results when either complete. If
779 dl 1.1 * this and/or the other CompletableFuture complete exceptionally,
780     * then the returned CompletableFuture may also do so, with a
781 dl 1.19 * CompletionException holding one of these exceptions as its cause.
782 dl 1.1 * No guarantees are made about which result or exception is used
783     * in the returned CompletableFuture.
784     *
785     * @param other the other CompletableFuture
786     * @param fn the function to use to compute the value of
787     * the returned CompletableFuture
788     * @return the new CompletableFuture
789     */
790 dl 1.17 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
791     Function<? super T, U> fn) {
792 dl 1.1 return orFunction(other, fn, null);
793     }
794    
795     /**
796     * Creates and returns a CompletableFuture that is completed
797     * asynchronously using the {@link ForkJoinPool#commonPool()} with
798     * the result of the given function of either this or the other
799 dl 1.7 * given CompletableFuture's results when either complete. If
800 dl 1.1 * this and/or the other CompletableFuture complete exceptionally,
801     * then the returned CompletableFuture may also do so, with a
802 dl 1.19 * CompletionException holding one of these exceptions as its cause.
803 dl 1.1 * No guarantees are made about which result or exception is used
804     * in the returned CompletableFuture.
805     *
806     * @param other the other CompletableFuture
807     * @param fn the function to use to compute the value of
808     * the returned CompletableFuture
809     * @return the new CompletableFuture
810     */
811 dl 1.17 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
812     Function<? super T, U> fn) {
813 dl 1.1 return orFunction(other, fn, ForkJoinPool.commonPool());
814     }
815    
816     /**
817     * Creates and returns a CompletableFuture that is completed
818     * asynchronously using the given executor with the result of the
819     * given function of either this or the other given
820 dl 1.7 * CompletableFuture's results when either complete. If this
821 dl 1.1 * and/or the other CompletableFuture complete exceptionally, then
822     * the returned CompletableFuture may also do so, with a
823 dl 1.19 * CompletionException holding one of these exceptions as its cause.
824 dl 1.1 * No guarantees are made about which result or exception is used
825     * in the returned CompletableFuture.
826     *
827     * @param other the other CompletableFuture
828     * @param fn the function to use to compute the value of
829     * the returned CompletableFuture
830     * @param executor the executor to use for asynchronous execution
831     * @return the new CompletableFuture
832     */
833 dl 1.17 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
834     Function<? super T, U> fn,
835     Executor executor) {
836 dl 1.1 if (executor == null) throw new NullPointerException();
837     return orFunction(other, fn, executor);
838     }
839    
840     /**
841 dl 1.7 * Creates and returns a CompletableFuture that is completed after
842     * performing the given action with the result of either this or the
843     * other given CompletableFuture's result, when either complete.
844     * If this and/or the other CompletableFuture complete
845     * exceptionally, then the returned CompletableFuture may also do
846 dl 1.19 * so, with a CompletionException holding one of these exceptions as
847 dl 1.7 * its cause. No guarantees are made about which exception is
848     * used in the returned CompletableFuture.
849     *
850     * @param other the other CompletableFuture
851     * @param block the action to perform before completing the
852     * returned CompletableFuture
853     * @return the new CompletableFuture
854     */
855 dl 1.17 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
856     Block<? super T> block) {
857 dl 1.7 return orBlock(other, block, null);
858     }
859    
860     /**
861     * Creates and returns a CompletableFuture that is completed
862     * asynchronously using the {@link ForkJoinPool#commonPool()},
863     * performing the given action with the result of either this or
864     * the other given CompletableFuture's result, when either
865     * complete. If this and/or the other CompletableFuture complete
866     * exceptionally, then the returned CompletableFuture may also do
867 dl 1.19 * so, with a CompletionException holding one of these exceptions as
868 dl 1.7 * its cause. No guarantees are made about which exception is
869     * used in the returned CompletableFuture.
870     *
871     * @param other the other CompletableFuture
872     * @param block the action to perform before completing the
873     * returned CompletableFuture
874     * @return the new CompletableFuture
875     */
876 dl 1.17 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
877     Block<? super T> block) {
878 dl 1.7 return orBlock(other, block, ForkJoinPool.commonPool());
879     }
880    
881     /**
882     * Creates and returns a CompletableFuture that is completed
883     * asynchronously using the given executor,
884     * performing the given action with the result of either this or
885     * the other given CompletableFuture's result, when either
886     * complete. If this and/or the other CompletableFuture complete
887     * exceptionally, then the returned CompletableFuture may also do
888 dl 1.19 * so, with a CompletionException holding one of these exceptions as
889 dl 1.7 * its cause. No guarantees are made about which exception is
890     * used in the returned CompletableFuture.
891     *
892     * @param other the other CompletableFuture
893     * @param block the action to perform before completing the
894     * returned CompletableFuture
895     * @param executor the executor to use for asynchronous execution
896     * @return the new CompletableFuture
897     */
898 dl 1.17 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
899     Block<? super T> block,
900     Executor executor) {
901 dl 1.7 if (executor == null) throw new NullPointerException();
902     return orBlock(other, block, executor);
903     }
904    
905     /**
906 dl 1.1 * Creates and returns a CompletableFuture that is completed
907     * after this or the other given CompletableFuture complete. If
908     * this and/or the other CompletableFuture complete exceptionally,
909     * then the returned CompletableFuture may also do so, with a
910 dl 1.19 * CompletionException holding one of these exceptions as its cause.
911 dl 1.1 * No guarantees are made about which exception is used in the
912     * returned CompletableFuture.
913     *
914     * @param other the other CompletableFuture
915     * @param action the action to perform before completing the
916     * returned CompletableFuture
917     * @return the new CompletableFuture
918     */
919 dl 1.17 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
920     Runnable action) {
921 dl 1.1 return orRunnable(other, action, null);
922     }
923    
924     /**
925     * Creates and returns a CompletableFuture that is completed
926     * asynchronously using the {@link ForkJoinPool#commonPool()}
927     * after this or the other given CompletableFuture complete. If
928     * this and/or the other CompletableFuture complete exceptionally,
929     * then the returned CompletableFuture may also do so, with a
930 dl 1.19 * CompletionException holding one of these exceptions as its cause.
931 dl 1.1 * No guarantees are made about which exception is used in the
932     * returned CompletableFuture.
933     *
934     * @param other the other CompletableFuture
935     * @param action the action to perform before completing the
936     * returned CompletableFuture
937     * @return the new CompletableFuture
938     */
939 dl 1.17 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
940     Runnable action) {
941 dl 1.1 return orRunnable(other, action, ForkJoinPool.commonPool());
942     }
943    
944     /**
945     * Creates and returns a CompletableFuture that is completed
946     * asynchronously using the given executor after this or the other
947     * given CompletableFuture complete. If this and/or the other
948     * CompletableFuture complete exceptionally, then the returned
949 dl 1.19 * CompletableFuture may also do so, with a CompletionException
950     * holding one of these exceptions as its cause. No guarantees are
951 dl 1.1 * made about which exception is used in the returned
952     * CompletableFuture.
953     *
954     * @param other the other CompletableFuture
955     * @param action the action to perform before completing the
956     * returned CompletableFuture
957     * @param executor the executor to use for asynchronous execution
958     * @return the new CompletableFuture
959     */
960 dl 1.17 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
961     Runnable action,
962     Executor executor) {
963 dl 1.1 if (executor == null) throw new NullPointerException();
964     return orRunnable(other, action, executor);
965     }
966    
967     /**
968 dl 1.20 * Returns a CompletableFuture (or an equivalent one) produced by
969     * the given function of the result of this CompletableFuture when
970     * completed. If this CompletableFuture completes exceptionally,
971     * then the returned CompletableFuture also does so, with a
972     * CompletionException holding this exception as its cause.
973 dl 1.17 *
974     * @param fn the function returning a new CompletableFuture.
975     * @return the CompletableFuture, that {@code isDone()} upon
976     * return if completed by the given function, or an exception
977     * occurs.
978     */
979 dl 1.20 @SuppressWarnings("unchecked") public <U> CompletableFuture<U> thenCompose(Function<? super T,
980 dl 1.17 CompletableFuture<U>> fn) {
981     if (fn == null) throw new NullPointerException();
982     CompletableFuture<U> dst = null;
983     ThenCompose<T,U> d = null;
984     Object r;
985     if ((r = result) == null) {
986     dst = new CompletableFuture<U>();
987     CompletionNode p = new CompletionNode
988     (d = new ThenCompose<T,U>(this, fn, dst));
989     while ((r = result) == null) {
990     if (UNSAFE.compareAndSwapObject
991     (this, COMPLETIONS, p.next = completions, p))
992     break;
993     }
994     }
995     if (r != null && (d == null || d.compareAndSet(0, 1))) {
996 dl 1.22 T t; Throwable ex;
997 dl 1.17 if (r instanceof AltResult) {
998     ex = ((AltResult)r).ex;
999     t = null;
1000     }
1001 dl 1.22 else {
1002     ex = null;
1003 dl 1.17 t = (T) r;
1004 dl 1.22 }
1005 dl 1.17 if (ex == null) {
1006     try {
1007     dst = fn.apply(t);
1008     } catch (Throwable rex) {
1009     ex = rex;
1010     }
1011     }
1012     if (dst == null) {
1013     dst = new CompletableFuture<U>();
1014     if (ex == null)
1015     ex = new NullPointerException();
1016     }
1017     if (ex != null)
1018 dl 1.20 dst.internalComplete(null, ex);
1019 dl 1.17 }
1020 dl 1.20 helpPostComplete();
1021     dst.helpPostComplete();
1022 dl 1.17 return dst;
1023     }
1024    
1025     /**
1026 dl 1.6 * Creates and returns a CompletableFuture that is completed with
1027     * the result of the given function of the exception triggering
1028 dl 1.7 * this CompletableFuture's completion when it completes
1029 dl 1.6 * exceptionally; Otherwise, if this CompletableFuture completes
1030     * normally, then the returned CompletableFuture also completes
1031     * normally with the same value.
1032     *
1033     * @param fn the function to use to compute the value of the
1034     * returned CompletableFuture if this CompletableFuture completed
1035     * exceptionally
1036 dl 1.1 * @return the new CompletableFuture
1037     */
1038 dl 1.20 @SuppressWarnings("unchecked") public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
1039 dl 1.6 if (fn == null) throw new NullPointerException();
1040     CompletableFuture<T> dst = new CompletableFuture<T>();
1041 dl 1.1 ExceptionAction<T> d = null;
1042 dl 1.6 Object r;
1043 jsr166 1.2 if ((r = result) == null) {
1044 dl 1.1 CompletionNode p =
1045 dl 1.6 new CompletionNode(d = new ExceptionAction<T>(this, fn, dst));
1046 dl 1.1 while ((r = result) == null) {
1047     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1048     p.next = completions, p))
1049     break;
1050     }
1051     }
1052 dl 1.6 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1053 dl 1.20 T t = null; Throwable ex, dx = null;
1054 dl 1.6 if (r instanceof AltResult) {
1055     if ((ex = ((AltResult)r).ex) != null) {
1056     try {
1057 dl 1.20 t = fn.apply(ex);
1058 dl 1.6 } catch (Throwable rex) {
1059 dl 1.20 dx = rex;
1060 dl 1.6 }
1061     }
1062 dl 1.1 }
1063 dl 1.6 else
1064     t = (T) r;
1065 dl 1.20 dst.internalComplete(t, dx);
1066 dl 1.1 }
1067 dl 1.20 helpPostComplete();
1068 dl 1.17 return dst;
1069     }
1070    
1071     /**
1072     * Creates and returns a CompletableFuture that is completed with
1073     * the result of the given function of the result and exception of
1074     * this CompletableFuture's completion when it completes. The
1075     * given function is invoked with the result (or {@code null} if
1076     * none) and the exception (or {@code null} if none) of this
1077     * CompletableFuture when complete.
1078     *
1079     * @param fn the function to use to compute the value of the
1080     * returned CompletableFuture
1081    
1082     * @return the new CompletableFuture
1083     */
1084 dl 1.20 @SuppressWarnings("unchecked") public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
1085 dl 1.17 if (fn == null) throw new NullPointerException();
1086     CompletableFuture<U> dst = new CompletableFuture<U>();
1087     ThenHandle<T,U> d = null;
1088     Object r;
1089     if ((r = result) == null) {
1090     CompletionNode p =
1091     new CompletionNode(d = new ThenHandle<T,U>(this, fn, dst));
1092     while ((r = result) == null) {
1093     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1094     p.next = completions, p))
1095     break;
1096     }
1097     }
1098     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1099     T t; Throwable ex;
1100     if (r instanceof AltResult) {
1101     ex = ((AltResult)r).ex;
1102     t = null;
1103     }
1104     else {
1105     ex = null;
1106     t = (T) r;
1107     }
1108 dl 1.20 U u = null; Throwable dx = null;
1109 dl 1.17 try {
1110 dl 1.20 u = fn.apply(t, ex);
1111 dl 1.17 } catch (Throwable rex) {
1112 dl 1.20 dx = rex;
1113 dl 1.17 }
1114 dl 1.20 dst.internalComplete(u, dx);
1115 dl 1.17 }
1116 dl 1.20 helpPostComplete();
1117 dl 1.1 return dst;
1118     }
1119    
1120     /**
1121     * Attempts to complete this CompletableFuture with
1122     * a {@link CancellationException}.
1123     *
1124     * @param mayInterruptIfRunning this value has no effect in this
1125     * implementation because interrupts are not used to control
1126     * processing.
1127     *
1128     * @return {@code true} if this task is now cancelled
1129     */
1130     public boolean cancel(boolean mayInterruptIfRunning) {
1131     Object r;
1132     while ((r = result) == null) {
1133     r = new AltResult(new CancellationException());
1134     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
1135     postComplete();
1136     return true;
1137     }
1138     }
1139     return ((r instanceof AltResult) &&
1140     (((AltResult)r).ex instanceof CancellationException));
1141     }
1142    
1143     /**
1144     * Returns {@code true} if this CompletableFuture was cancelled
1145     * before it completed normally.
1146     *
1147     * @return {@code true} if this CompletableFuture was cancelled
1148     * before it completed normally
1149     */
1150     public boolean isCancelled() {
1151     Object r;
1152     return ((r = result) != null &&
1153     (r instanceof AltResult) &&
1154     (((AltResult)r).ex instanceof CancellationException));
1155     }
1156    
1157     /**
1158 dl 1.6 * Forcibly sets or resets the value subsequently returned by
1159     * method get() and related methods, whether or not already
1160     * completed. This method is designed for use only in error
1161     * recovery actions, and even in such situations may result in
1162     * ongoing dependent completions using established versus
1163     * overwritten values.
1164 dl 1.1 *
1165     * @param value the completion value
1166     */
1167 dl 1.6 public void obtrudeValue(T value) {
1168 jsr166 1.2 result = (value == null) ? NIL : value;
1169 dl 1.1 postComplete();
1170     }
1171    
1172     /* ------------- waiting for completions -------------- */
1173    
1174 jsr166 1.2 /**
1175 dl 1.1 * Heuristic spin value for waitingGet() before blocking on
1176     * multiprocessors
1177     */
1178     static final int WAITING_GET_SPINS = 256;
1179    
1180     /**
1181 dl 1.19 * Returns raw result after waiting, or null if interruptible and
1182     * interrupted.
1183 dl 1.1 */
1184 dl 1.19 private Object waitingGet(boolean interruptible) {
1185 dl 1.1 WaitNode q = null;
1186     boolean queued = false, interrupted = false;
1187     int h = 0, spins = 0;
1188     for (Object r;;) {
1189     if ((r = result) != null) {
1190 dl 1.5 Throwable ex;
1191 dl 1.1 if (q != null) // suppress unpark
1192     q.thread = null;
1193     postComplete(); // help release others
1194     if (interrupted)
1195     Thread.currentThread().interrupt();
1196 dl 1.19 return r;
1197 dl 1.1 }
1198     else if (h == 0) {
1199     h = ThreadLocalRandom.current().nextInt();
1200     if (Runtime.getRuntime().availableProcessors() > 1)
1201     spins = WAITING_GET_SPINS;
1202     }
1203     else if (spins > 0) {
1204     h ^= h << 1; // xorshift
1205 jsr166 1.2 h ^= h >>> 3;
1206 dl 1.1 if ((h ^= h << 10) >= 0)
1207     --spins;
1208 jsr166 1.2 }
1209 dl 1.1 else if (q == null)
1210     q = new WaitNode();
1211     else if (!queued)
1212     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1213     q.next = waiters, q);
1214 dl 1.19 else if (Thread.interrupted()) {
1215 dl 1.20 if (interruptible) {
1216     removeWaiter(q);
1217 dl 1.19 return null;
1218 dl 1.20 }
1219 dl 1.1 interrupted = true;
1220 dl 1.19 }
1221 dl 1.1 else if (q.thread == null)
1222     q.thread = Thread.currentThread();
1223     else
1224     LockSupport.park(this);
1225     }
1226     }
1227    
1228     /**
1229     * Awaits completion or aborts on interrupt or timeout.
1230     *
1231     * @param nanos time to wait
1232     * @return raw result
1233     */
1234     private Object timedAwaitDone(long nanos)
1235     throws InterruptedException, TimeoutException {
1236     final long deadline = System.nanoTime() + nanos;
1237     WaitNode q = null;
1238     boolean queued = false;
1239     for (Object r;;) {
1240     if (Thread.interrupted()) {
1241     removeWaiter(q);
1242     throw new InterruptedException();
1243     }
1244     else if ((r = result) != null) {
1245     if (q != null)
1246     q.thread = null;
1247     postComplete();
1248     return r;
1249     }
1250     else if (q == null)
1251     q = new WaitNode();
1252     else if (!queued)
1253     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1254     q.next = waiters, q);
1255     else if ((nanos = deadline - System.nanoTime()) <= 0L) {
1256     removeWaiter(q);
1257     throw new TimeoutException();
1258     }
1259     else if (q.thread == null)
1260     q.thread = Thread.currentThread();
1261     else
1262     LockSupport.parkNanos(this, nanos);
1263     }
1264     }
1265    
1266     /**
1267     * Tries to unlink a timed-out or interrupted wait node to avoid
1268     * accumulating garbage. Internal nodes are simply unspliced
1269     * without CAS since it is harmless if they are traversed anyway
1270     * by releasers. To avoid effects of unsplicing from already
1271     * removed nodes, the list is retraversed in case of an apparent
1272     * race. This is slow when there are a lot of nodes, but we don't
1273     * expect lists to be long enough to outweigh higher-overhead
1274     * schemes.
1275     */
1276     private void removeWaiter(WaitNode node) {
1277     if (node != null) {
1278     node.thread = null;
1279     retry:
1280     for (;;) { // restart on removeWaiter race
1281     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
1282     s = q.next;
1283     if (q.thread != null)
1284     pred = q;
1285     else if (pred != null) {
1286     pred.next = s;
1287     if (pred.thread == null) // check for race
1288     continue retry;
1289     }
1290     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
1291     continue retry;
1292     }
1293     break;
1294     }
1295     }
1296     }
1297    
1298     /* ------------- Async tasks -------------- */
1299    
1300     /** Base class can act as either FJ or plain Runnable */
1301     static abstract class Async extends ForkJoinTask<Void> implements Runnable {
1302     public final Void getRawResult() { return null; }
1303     public final void setRawResult(Void v) { }
1304     public final void run() { exec(); }
1305     }
1306    
1307     static final class AsyncRunnable extends Async {
1308 dl 1.21 final Runnable fn;
1309 dl 1.1 final CompletableFuture<Void> dst;
1310 dl 1.21 AsyncRunnable(Runnable fn, CompletableFuture<Void> dst) {
1311     this.fn = fn; this.dst = dst;
1312 dl 1.1 }
1313     public final boolean exec() {
1314 dl 1.22 CompletableFuture<Void> d; Throwable ex;
1315 dl 1.21 if ((d = this.dst) != null) {
1316     try {
1317     fn.run();
1318     ex = null;
1319     } catch (Throwable rex) {
1320     ex = rex;
1321     }
1322     d.internalComplete(null, ex);
1323 dl 1.1 }
1324     return true;
1325     }
1326     private static final long serialVersionUID = 5232453952276885070L;
1327     }
1328    
1329     static final class AsyncSupplier<U> extends Async {
1330 dl 1.21 final Supplier<U> fn;
1331 dl 1.1 final CompletableFuture<U> dst;
1332 dl 1.21 AsyncSupplier(Supplier<U> fn, CompletableFuture<U> dst) {
1333     this.fn = fn; this.dst = dst;
1334 dl 1.1 }
1335     public final boolean exec() {
1336 dl 1.22 CompletableFuture<U> d; U u; Throwable ex;
1337 dl 1.21 if ((d = this.dst) != null) {
1338     try {
1339     u = fn.get();
1340     ex = null;
1341     } catch (Throwable rex) {
1342     ex = rex;
1343     u = null;
1344     }
1345     d.internalComplete(u, ex);
1346 dl 1.1 }
1347     return true;
1348     }
1349     private static final long serialVersionUID = 5232453952276885070L;
1350     }
1351    
1352     static final class AsyncFunction<T,U> extends Async {
1353     Function<? super T,? extends U> fn;
1354     T arg;
1355     final CompletableFuture<U> dst;
1356     AsyncFunction(T arg, Function<? super T,? extends U> fn,
1357     CompletableFuture<U> dst) {
1358     this.arg = arg; this.fn = fn; this.dst = dst;
1359     }
1360     public final boolean exec() {
1361 dl 1.21 CompletableFuture<U> d; U u; Throwable ex;
1362     if ((d = this.dst) != null) {
1363     try {
1364     u = fn.apply(arg);
1365     ex = null;
1366     } catch (Throwable rex) {
1367     ex = rex;
1368     u = null;
1369     }
1370     d.internalComplete(u, ex);
1371 dl 1.1 }
1372     return true;
1373     }
1374     private static final long serialVersionUID = 5232453952276885070L;
1375     }
1376    
1377     static final class AsyncBiFunction<T,U,V> extends Async {
1378     final BiFunction<? super T,? super U,? extends V> fn;
1379     final T arg1;
1380     final U arg2;
1381     final CompletableFuture<V> dst;
1382     AsyncBiFunction(T arg1, U arg2,
1383     BiFunction<? super T,? super U,? extends V> fn,
1384     CompletableFuture<V> dst) {
1385     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1386     }
1387     public final boolean exec() {
1388 dl 1.21 CompletableFuture<V> d; V v; Throwable ex;
1389     if ((d = this.dst) != null) {
1390     try {
1391     v = fn.apply(arg1, arg2);
1392     ex = null;
1393     } catch (Throwable rex) {
1394     ex = rex;
1395     v = null;
1396     }
1397     d.internalComplete(v, ex);
1398 dl 1.1 }
1399     return true;
1400     }
1401     private static final long serialVersionUID = 5232453952276885070L;
1402     }
1403    
1404 dl 1.7 static final class AsyncBlock<T> extends Async {
1405     Block<? super T> fn;
1406     T arg;
1407     final CompletableFuture<Void> dst;
1408     AsyncBlock(T arg, Block<? super T> fn,
1409     CompletableFuture<Void> dst) {
1410     this.arg = arg; this.fn = fn; this.dst = dst;
1411     }
1412     public final boolean exec() {
1413 dl 1.21 CompletableFuture<Void> d; Throwable ex;
1414     if ((d = this.dst) != null) {
1415     try {
1416     fn.accept(arg);
1417     ex = null;
1418     } catch (Throwable rex) {
1419     ex = rex;
1420     }
1421     d.internalComplete(null, ex);
1422 dl 1.7 }
1423     return true;
1424     }
1425     private static final long serialVersionUID = 5232453952276885070L;
1426     }
1427    
1428     static final class AsyncBiBlock<T,U> extends Async {
1429     final BiBlock<? super T,? super U> fn;
1430     final T arg1;
1431     final U arg2;
1432     final CompletableFuture<Void> dst;
1433     AsyncBiBlock(T arg1, U arg2,
1434     BiBlock<? super T,? super U> fn,
1435     CompletableFuture<Void> dst) {
1436     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1437     }
1438     public final boolean exec() {
1439 dl 1.21 CompletableFuture<Void> d; Throwable ex;
1440     if ((d = this.dst) != null) {
1441     try {
1442     fn.accept(arg1, arg2);
1443     ex = null;
1444     } catch (Throwable rex) {
1445     ex = rex;
1446     }
1447     d.internalComplete(null, ex);
1448 dl 1.7 }
1449     return true;
1450     }
1451     private static final long serialVersionUID = 5232453952276885070L;
1452     }
1453    
1454 dl 1.1 /* ------------- Completions -------------- */
1455    
1456     // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1457     static abstract class Completion extends AtomicInteger implements Runnable {
1458     }
1459    
1460     static final class ThenFunction<T,U> extends Completion {
1461 jsr166 1.2 final CompletableFuture<? extends T> src;
1462     final Function<? super T,? extends U> fn;
1463     final CompletableFuture<U> dst;
1464 dl 1.1 final Executor executor;
1465     ThenFunction(CompletableFuture<? extends T> src,
1466     final Function<? super T,? extends U> fn,
1467     final CompletableFuture<U> dst, Executor executor) {
1468     this.src = src; this.fn = fn; this.dst = dst;
1469     this.executor = executor;
1470     }
1471 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1472 dl 1.1 CompletableFuture<? extends T> a;
1473     Function<? super T,? extends U> fn;
1474     CompletableFuture<U> dst;
1475 jsr166 1.2 Object r; T t; Throwable ex;
1476     if ((dst = this.dst) != null &&
1477 dl 1.1 (fn = this.fn) != null &&
1478     (a = this.src) != null &&
1479     (r = a.result) != null &&
1480     compareAndSet(0, 1)) {
1481 jsr166 1.2 if (r instanceof AltResult) {
1482 dl 1.19 ex = ((AltResult)r).ex;
1483 dl 1.1 t = null;
1484     }
1485 dl 1.17 else {
1486     ex = null;
1487 dl 1.1 t = (T) r;
1488     }
1489 dl 1.20 Executor e = executor;
1490     U u = null;
1491 dl 1.17 if (ex == null) {
1492     try {
1493 dl 1.20 if (e != null)
1494     e.execute(new AsyncFunction<T,U>(t, fn, dst));
1495 dl 1.17 else
1496 dl 1.20 u = fn.apply(t);
1497 dl 1.17 } catch (Throwable rex) {
1498     ex = rex;
1499     }
1500     }
1501 dl 1.20 if (e == null || ex != null)
1502     dst.internalComplete(u, ex);
1503 dl 1.1 }
1504     }
1505 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1506 dl 1.1 }
1507    
1508 dl 1.7 static final class ThenBlock<T> extends Completion {
1509     final CompletableFuture<? extends T> src;
1510     final Block<? super T> fn;
1511     final CompletableFuture<Void> dst;
1512     final Executor executor;
1513     ThenBlock(CompletableFuture<? extends T> src,
1514     final Block<? super T> fn,
1515     final CompletableFuture<Void> dst, Executor executor) {
1516     this.src = src; this.fn = fn; this.dst = dst;
1517     this.executor = executor;
1518     }
1519 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1520 dl 1.7 CompletableFuture<? extends T> a;
1521     Block<? super T> fn;
1522     CompletableFuture<Void> dst;
1523     Object r; T t; Throwable ex;
1524     if ((dst = this.dst) != null &&
1525     (fn = this.fn) != null &&
1526     (a = this.src) != null &&
1527     (r = a.result) != null &&
1528     compareAndSet(0, 1)) {
1529     if (r instanceof AltResult) {
1530 dl 1.19 ex = ((AltResult)r).ex;
1531 dl 1.7 t = null;
1532     }
1533 dl 1.17 else {
1534     ex = null;
1535 dl 1.7 t = (T) r;
1536 dl 1.17 }
1537 dl 1.20 Executor e = executor;
1538 dl 1.17 if (ex == null) {
1539     try {
1540 dl 1.20 if (e != null)
1541     e.execute(new AsyncBlock<T>(t, fn, dst));
1542     else
1543 dl 1.17 fn.accept(t);
1544     } catch (Throwable rex) {
1545     ex = rex;
1546 dl 1.7 }
1547     }
1548 dl 1.20 if (e == null || ex != null)
1549     dst.internalComplete(null, ex);
1550 dl 1.7 }
1551     }
1552 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1553 dl 1.7 }
1554    
1555 dl 1.1 static final class ThenRunnable<T> extends Completion {
1556 jsr166 1.2 final CompletableFuture<? extends T> src;
1557     final Runnable fn;
1558     final CompletableFuture<Void> dst;
1559 dl 1.1 final Executor executor;
1560     ThenRunnable(CompletableFuture<? extends T> src,
1561     Runnable fn,
1562     CompletableFuture<Void> dst,
1563     Executor executor) {
1564     this.src = src; this.fn = fn; this.dst = dst;
1565     this.executor = executor;
1566     }
1567 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1568 dl 1.1 CompletableFuture<? extends T> a;
1569     Runnable fn;
1570     CompletableFuture<Void> dst;
1571 jsr166 1.2 Object r; Throwable ex;
1572     if ((dst = this.dst) != null &&
1573 dl 1.1 (fn = this.fn) != null &&
1574     (a = this.src) != null &&
1575     (r = a.result) != null &&
1576     compareAndSet(0, 1)) {
1577 dl 1.19 if (r instanceof AltResult)
1578     ex = ((AltResult)r).ex;
1579 dl 1.17 else
1580     ex = null;
1581 dl 1.20 Executor e = executor;
1582 dl 1.17 if (ex == null) {
1583     try {
1584 dl 1.20 if (e != null)
1585     e.execute(new AsyncRunnable(fn, dst));
1586     else
1587 dl 1.17 fn.run();
1588     } catch (Throwable rex) {
1589     ex = rex;
1590 dl 1.1 }
1591     }
1592 dl 1.20 if (e == null || ex != null)
1593     dst.internalComplete(null, ex);
1594 dl 1.1 }
1595     }
1596 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1597 dl 1.1 }
1598    
1599     static final class AndFunction<T,U,V> extends Completion {
1600 jsr166 1.2 final CompletableFuture<? extends T> src;
1601     final CompletableFuture<? extends U> snd;
1602     final BiFunction<? super T,? super U,? extends V> fn;
1603     final CompletableFuture<V> dst;
1604 dl 1.1 final Executor executor;
1605     AndFunction(CompletableFuture<? extends T> src,
1606     CompletableFuture<? extends U> snd,
1607     BiFunction<? super T,? super U,? extends V> fn,
1608     CompletableFuture<V> dst, Executor executor) {
1609     this.src = src; this.snd = snd;
1610     this.fn = fn; this.dst = dst;
1611     this.executor = executor;
1612     }
1613 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1614 jsr166 1.2 Object r, s; T t; U u; Throwable ex;
1615 dl 1.1 CompletableFuture<? extends T> a;
1616     CompletableFuture<? extends U> b;
1617     BiFunction<? super T,? super U,? extends V> fn;
1618     CompletableFuture<V> dst;
1619 jsr166 1.2 if ((dst = this.dst) != null &&
1620 dl 1.1 (fn = this.fn) != null &&
1621     (a = this.src) != null &&
1622     (r = a.result) != null &&
1623     (b = this.snd) != null &&
1624     (s = b.result) != null &&
1625     compareAndSet(0, 1)) {
1626 jsr166 1.2 if (r instanceof AltResult) {
1627 dl 1.19 ex = ((AltResult)r).ex;
1628 jsr166 1.2 t = null;
1629     }
1630 dl 1.19 else {
1631     ex = null;
1632 jsr166 1.2 t = (T) r;
1633 dl 1.19 }
1634     if (ex != null)
1635     u = null;
1636     else if (s instanceof AltResult) {
1637     ex = ((AltResult)s).ex;
1638 jsr166 1.2 u = null;
1639     }
1640     else
1641     u = (U) s;
1642 dl 1.20 Executor e = executor;
1643     V v = null;
1644 dl 1.19 if (ex == null) {
1645     try {
1646 dl 1.20 if (e != null)
1647     e.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1648 dl 1.19 else
1649 dl 1.20 v = fn.apply(t, u);
1650 dl 1.19 } catch (Throwable rex) {
1651     ex = rex;
1652     }
1653 dl 1.1 }
1654 dl 1.20 if (e == null || ex != null)
1655     dst.internalComplete(v, ex);
1656 jsr166 1.2 }
1657     }
1658 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1659 dl 1.1 }
1660    
1661 dl 1.7 static final class AndBlock<T,U> extends Completion {
1662     final CompletableFuture<? extends T> src;
1663     final CompletableFuture<? extends U> snd;
1664     final BiBlock<? super T,? super U> fn;
1665     final CompletableFuture<Void> dst;
1666     final Executor executor;
1667     AndBlock(CompletableFuture<? extends T> src,
1668     CompletableFuture<? extends U> snd,
1669     BiBlock<? super T,? super U> fn,
1670     CompletableFuture<Void> dst, Executor executor) {
1671     this.src = src; this.snd = snd;
1672     this.fn = fn; this.dst = dst;
1673     this.executor = executor;
1674     }
1675 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1676 dl 1.7 Object r, s; T t; U u; Throwable ex;
1677     CompletableFuture<? extends T> a;
1678     CompletableFuture<? extends U> b;
1679     BiBlock<? super T,? super U> fn;
1680     CompletableFuture<Void> dst;
1681     if ((dst = this.dst) != null &&
1682     (fn = this.fn) != null &&
1683     (a = this.src) != null &&
1684     (r = a.result) != null &&
1685     (b = this.snd) != null &&
1686     (s = b.result) != null &&
1687     compareAndSet(0, 1)) {
1688     if (r instanceof AltResult) {
1689 dl 1.19 ex = ((AltResult)r).ex;
1690 dl 1.7 t = null;
1691     }
1692 dl 1.19 else {
1693     ex = null;
1694 dl 1.7 t = (T) r;
1695 dl 1.19 }
1696     if (ex != null)
1697     u = null;
1698     else if (s instanceof AltResult) {
1699     ex = ((AltResult)s).ex;
1700 dl 1.7 u = null;
1701     }
1702     else
1703     u = (U) s;
1704 dl 1.20 Executor e = executor;
1705 dl 1.19 if (ex == null) {
1706     try {
1707 dl 1.20 if (e != null)
1708     e.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
1709     else
1710 dl 1.19 fn.accept(t, u);
1711     } catch (Throwable rex) {
1712     ex = rex;
1713 dl 1.7 }
1714     }
1715 dl 1.20 if (e == null || ex != null)
1716     dst.internalComplete(null, ex);
1717 dl 1.7 }
1718     }
1719 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1720 dl 1.7 }
1721    
1722 dl 1.1 static final class AndRunnable<T> extends Completion {
1723 jsr166 1.2 final CompletableFuture<? extends T> src;
1724     final CompletableFuture<?> snd;
1725     final Runnable fn;
1726     final CompletableFuture<Void> dst;
1727 dl 1.1 final Executor executor;
1728     AndRunnable(CompletableFuture<? extends T> src,
1729     CompletableFuture<?> snd,
1730     Runnable fn,
1731     CompletableFuture<Void> dst, Executor executor) {
1732     this.src = src; this.snd = snd;
1733     this.fn = fn; this.dst = dst;
1734     this.executor = executor;
1735     }
1736 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1737 jsr166 1.2 Object r, s; Throwable ex;
1738 dl 1.1 final CompletableFuture<? extends T> a;
1739     final CompletableFuture<?> b;
1740     final Runnable fn;
1741     final CompletableFuture<Void> dst;
1742 jsr166 1.2 if ((dst = this.dst) != null &&
1743 dl 1.1 (fn = this.fn) != null &&
1744     (a = this.src) != null &&
1745     (r = a.result) != null &&
1746     (b = this.snd) != null &&
1747     (s = b.result) != null &&
1748     compareAndSet(0, 1)) {
1749 dl 1.19 if (r instanceof AltResult)
1750     ex = ((AltResult)r).ex;
1751     else
1752     ex = null;
1753     if (ex == null && (s instanceof AltResult))
1754     ex = ((AltResult)s).ex;
1755 dl 1.20 Executor e = executor;
1756 dl 1.19 if (ex == null) {
1757     try {
1758 dl 1.20 if (e != null)
1759     e.execute(new AsyncRunnable(fn, dst));
1760     else
1761 dl 1.19 fn.run();
1762     } catch (Throwable rex) {
1763     ex = rex;
1764 dl 1.1 }
1765 jsr166 1.2 }
1766 dl 1.20 if (e == null || ex != null)
1767     dst.internalComplete(null, ex);
1768 jsr166 1.2 }
1769     }
1770 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1771 dl 1.1 }
1772    
1773     static final class OrFunction<T,U> extends Completion {
1774 jsr166 1.2 final CompletableFuture<? extends T> src;
1775     final CompletableFuture<? extends T> snd;
1776     final Function<? super T,? extends U> fn;
1777     final CompletableFuture<U> dst;
1778 dl 1.1 final Executor executor;
1779     OrFunction(CompletableFuture<? extends T> src,
1780     CompletableFuture<? extends T> snd,
1781     Function<? super T,? extends U> fn,
1782     CompletableFuture<U> dst, Executor executor) {
1783     this.src = src; this.snd = snd;
1784     this.fn = fn; this.dst = dst;
1785     this.executor = executor;
1786     }
1787 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1788 jsr166 1.2 Object r; T t; Throwable ex;
1789 dl 1.1 CompletableFuture<? extends T> a;
1790     CompletableFuture<? extends T> b;
1791     Function<? super T,? extends U> fn;
1792     CompletableFuture<U> dst;
1793 jsr166 1.2 if ((dst = this.dst) != null &&
1794 dl 1.1 (fn = this.fn) != null &&
1795     (((a = this.src) != null && (r = a.result) != null) ||
1796     ((b = this.snd) != null && (r = b.result) != null)) &&
1797     compareAndSet(0, 1)) {
1798 jsr166 1.2 if (r instanceof AltResult) {
1799 dl 1.19 ex = ((AltResult)r).ex;
1800 jsr166 1.2 t = null;
1801     }
1802 dl 1.19 else {
1803     ex = null;
1804 jsr166 1.2 t = (T) r;
1805 dl 1.1 }
1806 dl 1.20 Executor e = executor;
1807     U u = null;
1808 dl 1.19 if (ex == null) {
1809     try {
1810 dl 1.20 if (e != null)
1811     e.execute(new AsyncFunction<T,U>(t, fn, dst));
1812 dl 1.19 else
1813 dl 1.20 u = fn.apply(t);
1814 dl 1.19 } catch (Throwable rex) {
1815     ex = rex;
1816     }
1817     }
1818 dl 1.20 if (e == null || ex != null)
1819     dst.internalComplete(u, ex);
1820 jsr166 1.2 }
1821     }
1822 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1823 dl 1.1 }
1824    
1825 dl 1.7 static final class OrBlock<T> extends Completion {
1826     final CompletableFuture<? extends T> src;
1827     final CompletableFuture<? extends T> snd;
1828     final Block<? super T> fn;
1829     final CompletableFuture<Void> dst;
1830     final Executor executor;
1831     OrBlock(CompletableFuture<? extends T> src,
1832     CompletableFuture<? extends T> snd,
1833     Block<? super T> fn,
1834     CompletableFuture<Void> dst, Executor executor) {
1835     this.src = src; this.snd = snd;
1836     this.fn = fn; this.dst = dst;
1837     this.executor = executor;
1838     }
1839 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1840 dl 1.7 Object r; T t; Throwable ex;
1841     CompletableFuture<? extends T> a;
1842     CompletableFuture<? extends T> b;
1843     Block<? super T> fn;
1844     CompletableFuture<Void> dst;
1845     if ((dst = this.dst) != null &&
1846     (fn = this.fn) != null &&
1847     (((a = this.src) != null && (r = a.result) != null) ||
1848     ((b = this.snd) != null && (r = b.result) != null)) &&
1849     compareAndSet(0, 1)) {
1850     if (r instanceof AltResult) {
1851 dl 1.19 ex = ((AltResult)r).ex;
1852 dl 1.7 t = null;
1853     }
1854 dl 1.19 else {
1855     ex = null;
1856 dl 1.7 t = (T) r;
1857 dl 1.19 }
1858 dl 1.20 Executor e = executor;
1859 dl 1.19 if (ex == null) {
1860     try {
1861 dl 1.20 if (e != null)
1862     e.execute(new AsyncBlock<T>(t, fn, dst));
1863     else
1864 dl 1.19 fn.accept(t);
1865     } catch (Throwable rex) {
1866     ex = rex;
1867 dl 1.7 }
1868     }
1869 dl 1.20 if (e == null || ex != null)
1870     dst.internalComplete(null, ex);
1871 dl 1.7 }
1872     }
1873 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1874 dl 1.7 }
1875    
1876 dl 1.1 static final class OrRunnable<T> extends Completion {
1877 jsr166 1.2 final CompletableFuture<? extends T> src;
1878     final CompletableFuture<?> snd;
1879     final Runnable fn;
1880     final CompletableFuture<Void> dst;
1881 dl 1.1 final Executor executor;
1882     OrRunnable(CompletableFuture<? extends T> src,
1883     CompletableFuture<?> snd,
1884     Runnable fn,
1885     CompletableFuture<Void> dst, Executor executor) {
1886     this.src = src; this.snd = snd;
1887     this.fn = fn; this.dst = dst;
1888     this.executor = executor;
1889     }
1890 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1891 jsr166 1.2 Object r; Throwable ex;
1892 dl 1.1 CompletableFuture<? extends T> a;
1893     final CompletableFuture<?> b;
1894     final Runnable fn;
1895     final CompletableFuture<Void> dst;
1896 jsr166 1.2 if ((dst = this.dst) != null &&
1897 dl 1.1 (fn = this.fn) != null &&
1898     (((a = this.src) != null && (r = a.result) != null) ||
1899     ((b = this.snd) != null && (r = b.result) != null)) &&
1900     compareAndSet(0, 1)) {
1901 dl 1.19 if (r instanceof AltResult)
1902     ex = ((AltResult)r).ex;
1903     else
1904     ex = null;
1905 dl 1.20 Executor e = executor;
1906 dl 1.19 if (ex == null) {
1907 dl 1.1 try {
1908 dl 1.20 if (e != null)
1909     e.execute(new AsyncRunnable(fn, dst));
1910     else
1911 dl 1.1 fn.run();
1912     } catch (Throwable rex) {
1913 dl 1.19 ex = rex;
1914 dl 1.1 }
1915     }
1916 dl 1.20 if (e == null || ex != null)
1917     dst.internalComplete(null, ex);
1918 jsr166 1.2 }
1919     }
1920 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1921 dl 1.1 }
1922    
1923     static final class ExceptionAction<T> extends Completion {
1924 jsr166 1.2 final CompletableFuture<? extends T> src;
1925 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1926     final CompletableFuture<T> dst;
1927 dl 1.1 ExceptionAction(CompletableFuture<? extends T> src,
1928 dl 1.6 Function<? super Throwable, ? extends T> fn,
1929     CompletableFuture<T> dst) {
1930 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1931     }
1932 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1933 dl 1.1 CompletableFuture<? extends T> a;
1934 dl 1.6 Function<? super Throwable, ? extends T> fn;
1935     CompletableFuture<T> dst;
1936 dl 1.20 Object r; T t = null; Throwable ex, dx = null;
1937 jsr166 1.2 if ((dst = this.dst) != null &&
1938 dl 1.1 (fn = this.fn) != null &&
1939     (a = this.src) != null &&
1940     (r = a.result) != null &&
1941     compareAndSet(0, 1)) {
1942 dl 1.20 if ((r instanceof AltResult) &&
1943     (ex = ((AltResult)r).ex) != null) {
1944     try {
1945     t = fn.apply(ex);
1946     } catch (Throwable rex) {
1947     dx = rex;
1948 dl 1.1 }
1949     }
1950 dl 1.6 else
1951     t = (T) r;
1952 dl 1.20 dst.internalComplete(t, dx);
1953 dl 1.1 }
1954     }
1955 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1956 dl 1.1 }
1957    
1958 dl 1.17 static final class ThenCopy<T> extends Completion {
1959     final CompletableFuture<? extends T> src;
1960     final CompletableFuture<T> dst;
1961     ThenCopy(CompletableFuture<? extends T> src,
1962     CompletableFuture<T> dst) {
1963     this.src = src; this.dst = dst;
1964     }
1965 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1966 dl 1.17 CompletableFuture<? extends T> a;
1967     CompletableFuture<T> dst;
1968 dl 1.20 Object r; Object t; Throwable ex;
1969 dl 1.17 if ((dst = this.dst) != null &&
1970     (a = this.src) != null &&
1971     (r = a.result) != null &&
1972     compareAndSet(0, 1)) {
1973 dl 1.20 if (r instanceof AltResult) {
1974     ex = ((AltResult)r).ex;
1975     t = null;
1976     }
1977     else {
1978     ex = null;
1979     t = r;
1980     }
1981     dst.internalComplete(t, ex);
1982 dl 1.17 }
1983     }
1984 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1985 dl 1.17 }
1986    
1987     static final class ThenHandle<T,U> extends Completion {
1988     final CompletableFuture<? extends T> src;
1989     final BiFunction<? super T, Throwable, ? extends U> fn;
1990     final CompletableFuture<U> dst;
1991     ThenHandle(CompletableFuture<? extends T> src,
1992     BiFunction<? super T, Throwable, ? extends U> fn,
1993     final CompletableFuture<U> dst) {
1994     this.src = src; this.fn = fn; this.dst = dst;
1995     }
1996 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1997 dl 1.17 CompletableFuture<? extends T> a;
1998     BiFunction<? super T, Throwable, ? extends U> fn;
1999     CompletableFuture<U> dst;
2000     Object r; T t; Throwable ex;
2001     if ((dst = this.dst) != null &&
2002     (fn = this.fn) != null &&
2003     (a = this.src) != null &&
2004     (r = a.result) != null &&
2005     compareAndSet(0, 1)) {
2006     if (r instanceof AltResult) {
2007     ex = ((AltResult)r).ex;
2008     t = null;
2009     }
2010     else {
2011     ex = null;
2012     t = (T) r;
2013     }
2014 dl 1.20 U u = null; Throwable dx = null;
2015 dl 1.17 try {
2016 dl 1.20 u = fn.apply(t, ex);
2017 dl 1.17 } catch (Throwable rex) {
2018 dl 1.20 dx = rex;
2019 dl 1.17 }
2020 dl 1.20 dst.internalComplete(u, dx);
2021 dl 1.17 }
2022     }
2023 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
2024 dl 1.17 }
2025    
2026     static final class ThenCompose<T,U> extends Completion {
2027     final CompletableFuture<? extends T> src;
2028     final Function<? super T, CompletableFuture<U>> fn;
2029     final CompletableFuture<U> dst;
2030     ThenCompose(CompletableFuture<? extends T> src,
2031     Function<? super T, CompletableFuture<U>> fn,
2032     final CompletableFuture<U> dst) {
2033     this.src = src; this.fn = fn; this.dst = dst;
2034     }
2035 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
2036 dl 1.17 CompletableFuture<? extends T> a;
2037     Function<? super T, CompletableFuture<U>> fn;
2038     CompletableFuture<U> dst;
2039     Object r; T t; Throwable ex;
2040     if ((dst = this.dst) != null &&
2041     (fn = this.fn) != null &&
2042     (a = this.src) != null &&
2043     (r = a.result) != null &&
2044     compareAndSet(0, 1)) {
2045     if (r instanceof AltResult) {
2046     ex = ((AltResult)r).ex;
2047     t = null;
2048     }
2049     else {
2050     ex = null;
2051     t = (T) r;
2052     }
2053     CompletableFuture<U> c = null;
2054 dl 1.20 U u = null;
2055     boolean complete = false;
2056 dl 1.17 if (ex == null) {
2057     try {
2058     c = fn.apply(t);
2059     } catch (Throwable rex) {
2060     ex = rex;
2061     }
2062     }
2063     if (ex != null || c == null) {
2064     if (ex == null)
2065     ex = new NullPointerException();
2066     }
2067 dl 1.18 else {
2068     ThenCopy<U> d = null;
2069     Object s;
2070     if ((s = c.result) == null) {
2071     CompletionNode p = new CompletionNode
2072     (d = new ThenCopy<U>(c, dst));
2073     while ((s = c.result) == null) {
2074     if (UNSAFE.compareAndSwapObject
2075     (c, COMPLETIONS, p.next = c.completions, p))
2076     break;
2077     }
2078 dl 1.17 }
2079 dl 1.18 if (s != null && (d == null || d.compareAndSet(0, 1))) {
2080 dl 1.20 complete = true;
2081 dl 1.18 if (s instanceof AltResult) {
2082     ex = ((AltResult)s).ex; // no rewrap
2083     u = null;
2084     }
2085     else
2086     u = (U) s;
2087 dl 1.17 }
2088     }
2089 dl 1.20 if (complete || ex != null)
2090     dst.internalComplete(u, ex);
2091     if (c != null)
2092     c.helpPostComplete();
2093 dl 1.17 }
2094     }
2095 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
2096 dl 1.17 }
2097    
2098 dl 1.1 /* ------------- then/and/or implementations -------------- */
2099    
2100 dl 1.20 @SuppressWarnings("unchecked") private <U> CompletableFuture<U> thenFunction
2101     (Function<? super T,? extends U> fn,
2102 dl 1.22 Executor e) {
2103 dl 1.1 if (fn == null) throw new NullPointerException();
2104     CompletableFuture<U> dst = new CompletableFuture<U>();
2105     ThenFunction<T,U> d = null;
2106 jsr166 1.2 Object r;
2107     if ((r = result) == null) {
2108 dl 1.1 CompletionNode p = new CompletionNode
2109 dl 1.22 (d = new ThenFunction<T,U>(this, fn, dst, e));
2110 dl 1.1 while ((r = result) == null) {
2111     if (UNSAFE.compareAndSwapObject
2112     (this, COMPLETIONS, p.next = completions, p))
2113     break;
2114     }
2115     }
2116 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2117 dl 1.19 T t; Throwable ex;
2118 jsr166 1.2 if (r instanceof AltResult) {
2119 dl 1.19 ex = ((AltResult)r).ex;
2120 jsr166 1.2 t = null;
2121 dl 1.1 }
2122 dl 1.19 else {
2123     ex = null;
2124 jsr166 1.2 t = (T) r;
2125 dl 1.19 }
2126 dl 1.20 U u = null;
2127 jsr166 1.2 if (ex == null) {
2128 dl 1.1 try {
2129 dl 1.20 if (e != null)
2130     e.execute(new AsyncFunction<T,U>(t, fn, dst));
2131 dl 1.1 else
2132 dl 1.20 u = fn.apply(t);
2133 dl 1.1 } catch (Throwable rex) {
2134 dl 1.19 ex = rex;
2135 dl 1.1 }
2136     }
2137 dl 1.20 if (e == null || ex != null)
2138     dst.internalComplete(u, ex);
2139 dl 1.17 }
2140 dl 1.20 helpPostComplete();
2141 dl 1.1 return dst;
2142     }
2143    
2144 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> thenBlock
2145     (Block<? super T> fn,
2146 dl 1.22 Executor e) {
2147 dl 1.7 if (fn == null) throw new NullPointerException();
2148     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2149     ThenBlock<T> d = null;
2150     Object r;
2151     if ((r = result) == null) {
2152     CompletionNode p = new CompletionNode
2153 dl 1.22 (d = new ThenBlock<T>(this, fn, dst, e));
2154 dl 1.7 while ((r = result) == null) {
2155     if (UNSAFE.compareAndSwapObject
2156     (this, COMPLETIONS, p.next = completions, p))
2157     break;
2158     }
2159     }
2160     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2161 dl 1.19 T t; Throwable ex;
2162 dl 1.7 if (r instanceof AltResult) {
2163 dl 1.19 ex = ((AltResult)r).ex;
2164 dl 1.7 t = null;
2165     }
2166 dl 1.19 else {
2167     ex = null;
2168 dl 1.7 t = (T) r;
2169 dl 1.19 }
2170 dl 1.7 if (ex == null) {
2171     try {
2172 dl 1.20 if (e != null)
2173     e.execute(new AsyncBlock<T>(t, fn, dst));
2174     else
2175 dl 1.7 fn.accept(t);
2176     } catch (Throwable rex) {
2177 dl 1.19 ex = rex;
2178 dl 1.7 }
2179     }
2180 dl 1.20 if (e == null || ex != null)
2181     dst.internalComplete(null, ex);
2182 dl 1.17 }
2183 dl 1.20 helpPostComplete();
2184 dl 1.7 return dst;
2185     }
2186    
2187 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> thenRunnable
2188     (Runnable action,
2189 dl 1.22 Executor e) {
2190 dl 1.1 if (action == null) throw new NullPointerException();
2191     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2192     ThenRunnable<T> d = null;
2193 jsr166 1.2 Object r;
2194     if ((r = result) == null) {
2195 dl 1.1 CompletionNode p = new CompletionNode
2196 dl 1.22 (d = new ThenRunnable<T>(this, action, dst, e));
2197 dl 1.1 while ((r = result) == null) {
2198     if (UNSAFE.compareAndSwapObject
2199     (this, COMPLETIONS, p.next = completions, p))
2200     break;
2201     }
2202     }
2203 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2204 dl 1.19 Throwable ex;
2205     if (r instanceof AltResult)
2206     ex = ((AltResult)r).ex;
2207     else
2208     ex = null;
2209 jsr166 1.2 if (ex == null) {
2210 dl 1.1 try {
2211 dl 1.20 if (e != null)
2212     e.execute(new AsyncRunnable(action, dst));
2213     else
2214 dl 1.1 action.run();
2215     } catch (Throwable rex) {
2216 dl 1.19 ex = rex;
2217 dl 1.1 }
2218     }
2219 dl 1.20 if (e == null || ex != null)
2220     dst.internalComplete(null, ex);
2221 dl 1.17 }
2222 dl 1.20 helpPostComplete();
2223 dl 1.1 return dst;
2224     }
2225    
2226 dl 1.20 @SuppressWarnings("unchecked") private <U,V> CompletableFuture<V> andFunction
2227     (CompletableFuture<? extends U> other,
2228     BiFunction<? super T,? super U,? extends V> fn,
2229 dl 1.22 Executor e) {
2230 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
2231 jsr166 1.2 CompletableFuture<V> dst = new CompletableFuture<V>();
2232     AndFunction<T,U,V> d = null;
2233     Object r, s = null;
2234     if ((r = result) == null || (s = other.result) == null) {
2235 dl 1.22 d = new AndFunction<T,U,V>(this, other, fn, dst, e);
2236 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2237     while ((r == null && (r = result) == null) ||
2238     (s == null && (s = other.result) == null)) {
2239     if (q != null) {
2240     if (s != null ||
2241     UNSAFE.compareAndSwapObject
2242     (other, COMPLETIONS, q.next = other.completions, q))
2243     break;
2244     }
2245     else if (r != null ||
2246     UNSAFE.compareAndSwapObject
2247     (this, COMPLETIONS, p.next = completions, p)) {
2248     if (s != null)
2249     break;
2250     q = new CompletionNode(d);
2251     }
2252     }
2253     }
2254 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2255 dl 1.19 T t; U u; Throwable ex;
2256 jsr166 1.2 if (r instanceof AltResult) {
2257 dl 1.19 ex = ((AltResult)r).ex;
2258 jsr166 1.2 t = null;
2259 dl 1.1 }
2260 dl 1.19 else {
2261     ex = null;
2262 jsr166 1.2 t = (T) r;
2263 dl 1.19 }
2264 dl 1.1 if (ex != null)
2265     u = null;
2266     else if (s instanceof AltResult) {
2267 dl 1.19 ex = ((AltResult)s).ex;
2268 dl 1.1 u = null;
2269     }
2270     else
2271 jsr166 1.2 u = (U) s;
2272 dl 1.20 V v = null;
2273 jsr166 1.2 if (ex == null) {
2274 dl 1.1 try {
2275 dl 1.20 if (e != null)
2276     e.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
2277 dl 1.1 else
2278 dl 1.20 v = fn.apply(t, u);
2279 dl 1.1 } catch (Throwable rex) {
2280 dl 1.19 ex = rex;
2281 dl 1.1 }
2282     }
2283 dl 1.20 if (e == null || ex != null)
2284     dst.internalComplete(v, ex);
2285 jsr166 1.2 }
2286 dl 1.20 helpPostComplete();
2287     other.helpPostComplete();
2288 jsr166 1.2 return dst;
2289 dl 1.1 }
2290    
2291 dl 1.20 @SuppressWarnings("unchecked") private <U> CompletableFuture<Void> andBlock
2292     (CompletableFuture<? extends U> other,
2293     BiBlock<? super T,? super U> fn,
2294 dl 1.22 Executor e) {
2295 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
2296     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2297     AndBlock<T,U> d = null;
2298     Object r, s = null;
2299     if ((r = result) == null || (s = other.result) == null) {
2300 dl 1.22 d = new AndBlock<T,U>(this, other, fn, dst, e);
2301 dl 1.7 CompletionNode q = null, p = new CompletionNode(d);
2302     while ((r == null && (r = result) == null) ||
2303     (s == null && (s = other.result) == null)) {
2304     if (q != null) {
2305     if (s != null ||
2306     UNSAFE.compareAndSwapObject
2307     (other, COMPLETIONS, q.next = other.completions, q))
2308     break;
2309     }
2310     else if (r != null ||
2311     UNSAFE.compareAndSwapObject
2312     (this, COMPLETIONS, p.next = completions, p)) {
2313     if (s != null)
2314     break;
2315     q = new CompletionNode(d);
2316     }
2317     }
2318     }
2319     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2320 dl 1.19 T t; U u; Throwable ex;
2321 dl 1.7 if (r instanceof AltResult) {
2322 dl 1.19 ex = ((AltResult)r).ex;
2323 dl 1.7 t = null;
2324     }
2325 dl 1.19 else {
2326     ex = null;
2327 dl 1.7 t = (T) r;
2328 dl 1.19 }
2329 dl 1.7 if (ex != null)
2330     u = null;
2331     else if (s instanceof AltResult) {
2332 dl 1.19 ex = ((AltResult)s).ex;
2333 dl 1.7 u = null;
2334     }
2335     else
2336     u = (U) s;
2337     if (ex == null) {
2338     try {
2339 dl 1.20 if (e != null)
2340     e.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
2341     else
2342 dl 1.7 fn.accept(t, u);
2343     } catch (Throwable rex) {
2344 dl 1.19 ex = rex;
2345 dl 1.7 }
2346     }
2347 dl 1.20 if (e == null || ex != null)
2348     dst.internalComplete(null, ex);
2349 dl 1.7 }
2350 dl 1.20 helpPostComplete();
2351     other.helpPostComplete();
2352 dl 1.7 return dst;
2353     }
2354    
2355 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> andRunnable
2356     (CompletableFuture<?> other,
2357     Runnable action,
2358 dl 1.22 Executor e) {
2359 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2360 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2361     AndRunnable<T> d = null;
2362     Object r, s = null;
2363     if ((r = result) == null || (s = other.result) == null) {
2364 dl 1.22 d = new AndRunnable<T>(this, other, action, dst, e);
2365 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2366     while ((r == null && (r = result) == null) ||
2367     (s == null && (s = other.result) == null)) {
2368     if (q != null) {
2369     if (s != null ||
2370     UNSAFE.compareAndSwapObject
2371     (other, COMPLETIONS, q.next = other.completions, q))
2372     break;
2373     }
2374     else if (r != null ||
2375     UNSAFE.compareAndSwapObject
2376     (this, COMPLETIONS, p.next = completions, p)) {
2377     if (s != null)
2378     break;
2379     q = new CompletionNode(d);
2380     }
2381     }
2382     }
2383 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2384 dl 1.19 Throwable ex;
2385     if (r instanceof AltResult)
2386     ex = ((AltResult)r).ex;
2387     else
2388     ex = null;
2389     if (ex == null && (s instanceof AltResult))
2390     ex = ((AltResult)s).ex;
2391     if (ex == null) {
2392 dl 1.1 try {
2393 dl 1.20 if (e != null)
2394     e.execute(new AsyncRunnable(action, dst));
2395     else
2396 dl 1.1 action.run();
2397     } catch (Throwable rex) {
2398 dl 1.19 ex = rex;
2399 dl 1.1 }
2400     }
2401 dl 1.20 if (e == null || ex != null)
2402     dst.internalComplete(null, ex);
2403 jsr166 1.2 }
2404 dl 1.20 helpPostComplete();
2405     other.helpPostComplete();
2406 jsr166 1.2 return dst;
2407 dl 1.1 }
2408    
2409 dl 1.20 @SuppressWarnings("unchecked") private <U> CompletableFuture<U> orFunction
2410     (CompletableFuture<? extends T> other,
2411     Function<? super T, U> fn,
2412 dl 1.22 Executor e) {
2413 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
2414 jsr166 1.2 CompletableFuture<U> dst = new CompletableFuture<U>();
2415     OrFunction<T,U> d = null;
2416     Object r;
2417     if ((r = result) == null && (r = other.result) == null) {
2418 dl 1.22 d = new OrFunction<T,U>(this, other, fn, dst, e);
2419 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2420     while ((r = result) == null && (r = other.result) == null) {
2421     if (q != null) {
2422     if (UNSAFE.compareAndSwapObject
2423     (other, COMPLETIONS, q.next = other.completions, q))
2424     break;
2425     }
2426     else if (UNSAFE.compareAndSwapObject
2427     (this, COMPLETIONS, p.next = completions, p))
2428     q = new CompletionNode(d);
2429     }
2430 jsr166 1.2 }
2431     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2432 dl 1.19 T t; Throwable ex;
2433 jsr166 1.2 if (r instanceof AltResult) {
2434 dl 1.19 ex = ((AltResult)r).ex;
2435 jsr166 1.2 t = null;
2436 dl 1.1 }
2437 dl 1.19 else {
2438     ex = null;
2439 jsr166 1.2 t = (T) r;
2440 dl 1.19 }
2441 dl 1.20 U u = null;
2442 jsr166 1.2 if (ex == null) {
2443 dl 1.1 try {
2444 dl 1.20 if (e != null)
2445     e.execute(new AsyncFunction<T,U>(t, fn, dst));
2446 dl 1.1 else
2447 dl 1.20 u = fn.apply(t);
2448 dl 1.1 } catch (Throwable rex) {
2449 dl 1.19 ex = rex;
2450 dl 1.1 }
2451     }
2452 dl 1.20 if (e == null || ex != null)
2453     dst.internalComplete(u, ex);
2454 jsr166 1.2 }
2455 dl 1.20 helpPostComplete();
2456     other.helpPostComplete();
2457 jsr166 1.2 return dst;
2458 dl 1.1 }
2459    
2460 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> orBlock
2461     (CompletableFuture<? extends T> other,
2462     Block<? super T> fn,
2463 dl 1.22 Executor e) {
2464 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
2465     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2466     OrBlock<T> d = null;
2467     Object r;
2468     if ((r = result) == null && (r = other.result) == null) {
2469 dl 1.22 d = new OrBlock<T>(this, other, fn, dst, e);
2470 dl 1.7 CompletionNode q = null, p = new CompletionNode(d);
2471     while ((r = result) == null && (r = other.result) == null) {
2472     if (q != null) {
2473     if (UNSAFE.compareAndSwapObject
2474     (other, COMPLETIONS, q.next = other.completions, q))
2475     break;
2476     }
2477     else if (UNSAFE.compareAndSwapObject
2478     (this, COMPLETIONS, p.next = completions, p))
2479     q = new CompletionNode(d);
2480     }
2481     }
2482     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2483 dl 1.19 T t; Throwable ex;
2484 dl 1.7 if (r instanceof AltResult) {
2485 dl 1.19 ex = ((AltResult)r).ex;
2486 dl 1.7 t = null;
2487     }
2488 dl 1.19 else {
2489     ex = null;
2490 dl 1.7 t = (T) r;
2491 dl 1.19 }
2492 dl 1.7 if (ex == null) {
2493     try {
2494 dl 1.20 if (e != null)
2495     e.execute(new AsyncBlock<T>(t, fn, dst));
2496     else
2497 dl 1.7 fn.accept(t);
2498     } catch (Throwable rex) {
2499 dl 1.19 ex = rex;
2500 dl 1.7 }
2501     }
2502 dl 1.20 if (e == null || ex != null)
2503     dst.internalComplete(null, ex);
2504 dl 1.7 }
2505 dl 1.20 helpPostComplete();
2506     other.helpPostComplete();
2507 dl 1.7 return dst;
2508     }
2509    
2510 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> orRunnable
2511     (CompletableFuture<?> other,
2512     Runnable action,
2513 dl 1.22 Executor e) {
2514 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2515 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2516     OrRunnable<T> d = null;
2517     Object r;
2518     if ((r = result) == null && (r = other.result) == null) {
2519 dl 1.22 d = new OrRunnable<T>(this, other, action, dst, e);
2520 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2521     while ((r = result) == null && (r = other.result) == null) {
2522     if (q != null) {
2523     if (UNSAFE.compareAndSwapObject
2524     (other, COMPLETIONS, q.next = other.completions, q))
2525     break;
2526     }
2527     else if (UNSAFE.compareAndSwapObject
2528     (this, COMPLETIONS, p.next = completions, p))
2529     q = new CompletionNode(d);
2530     }
2531 jsr166 1.2 }
2532     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2533 dl 1.19 Throwable ex;
2534     if (r instanceof AltResult)
2535     ex = ((AltResult)r).ex;
2536     else
2537     ex = null;
2538     if (ex == null) {
2539 dl 1.1 try {
2540 dl 1.20 if (e != null)
2541     e.execute(new AsyncRunnable(action, dst));
2542     else
2543 dl 1.1 action.run();
2544     } catch (Throwable rex) {
2545 dl 1.19 ex = rex;
2546 dl 1.1 }
2547     }
2548 dl 1.20 if (e == null || ex != null)
2549     dst.internalComplete(null, ex);
2550 jsr166 1.2 }
2551 dl 1.20 helpPostComplete();
2552     other.helpPostComplete();
2553 jsr166 1.2 return dst;
2554 dl 1.1 }
2555    
2556     // Unsafe mechanics
2557     private static final sun.misc.Unsafe UNSAFE;
2558     private static final long RESULT;
2559     private static final long WAITERS;
2560     private static final long COMPLETIONS;
2561     static {
2562     try {
2563     UNSAFE = sun.misc.Unsafe.getUnsafe();
2564     Class<?> k = CompletableFuture.class;
2565     RESULT = UNSAFE.objectFieldOffset
2566     (k.getDeclaredField("result"));
2567     WAITERS = UNSAFE.objectFieldOffset
2568     (k.getDeclaredField("waiters"));
2569     COMPLETIONS = UNSAFE.objectFieldOffset
2570     (k.getDeclaredField("completions"));
2571     } catch (Exception e) {
2572     throw new Error(e);
2573     }
2574     }
2575     }