ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.21
Committed: Mon Dec 31 16:59:29 2012 UTC (11 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.20: +63 -55 lines
Log Message:
More consistent style

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.7 import java.util.function.Block;
10     import java.util.function.BiBlock;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22 dl 1.19 import java.util.concurrent.CompletionException;
23 dl 1.1 import java.util.concurrent.atomic.AtomicInteger;
24     import java.util.concurrent.locks.LockSupport;
25    
26     /**
27     * A {@link Future} that may be explicitly completed (setting its
28     * value and status), and may include dependent functions and actions
29     * that trigger upon its completion.
30     *
31 dl 1.7 * <p>Similar methods are available for Functions, Blocks, and
32     * Runnables, depending on whether actions require arguments and/or
33     * produce results. Functions and actions supplied for dependent
34     * completions (mainly using methods with prefix {@code then}) may be
35 dl 1.1 * performed by the thread that completes the current
36     * CompletableFuture, or by any other caller of these methods. There
37     * are no guarantees about the order of processing completions unless
38 dl 1.17 * constrained by these methods.
39 dl 1.1 *
40 jsr166 1.4 * <p>When two or more threads attempt to {@link #complete} or {@link
41 dl 1.17 * #completeExceptionally} a CompletableFuture, only one of them
42 dl 1.19 * succeeds.
43     *
44     * <p> Upon exceptional completion, or when a completion entails
45 dl 1.5 * computation of a function or action, and it terminates abruptly
46 dl 1.19 * with an (unchecked) exception or error, then further completions
47     * act as {@code completeExceptionally} with a {@link
48     * CompletionException} holding that exception as its cause. If a
49     * CompletableFuture completes exceptionally, and is not followed by a
50     * {@link #exceptionally} or {@link #handle} completion, then all of
51     * its dependents (and their dependents) also complete exceptionally
52 dl 1.20 * with CompletionExceptions holding the ultimate cause. In case of a
53     * CompletionException, methods {@link #get()} and {@link #get(long,
54     * TimeUnit)} throw a {@link ExecutionException} with the same cause
55     * as would be held in the corresponding CompletionException. However,
56     * in these cases, methods {@link #join()} and {@link #getNow} throw
57     * the CompletionException, which simplifies usage especially within
58     * other completion functions.
59 dl 1.1 *
60     * <p>CompletableFutures themselves do not execute asynchronously.
61     * However, the {@code async} methods provide commonly useful ways to
62 jsr166 1.8 * commence asynchronous processing, using either a given {@link
63 dl 1.1 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
64     * function or action that will result in the completion of a new
65     * CompletableFuture.
66     *
67     * @author Doug Lea
68     * @since 1.8
69     */
70     public class CompletableFuture<T> implements Future<T> {
71     /*
72 dl 1.20 * Overview:
73 dl 1.1 *
74 dl 1.20 * 1. Non-nullness of field result (set via CAS) indicates
75     * done. An AltResult is used to box null as a result, as well as
76     * to hold exceptions. Using a single field makes completion fast
77     * and simple to detect and trigger, at the expense of a lot of
78     * encoding and decoding that infiltrates many methods. One minor
79     * simplification relies on the (static) NIL (to box null results)
80     * being the only AltResult with a null exception field, so we
81     * don't usually need explicit comparisons with NIL.
82 dl 1.1 *
83     * 2. Waiters are held in a Treiber stack similar to the one used
84 dl 1.20 * in FutureTask, Phaser, and SynchronousQueue. See their
85     * internal documentation for details.
86 dl 1.1 *
87     * 3. Completions are also kept in a list/stack, and pulled off
88 dl 1.20 * and run when completion is triggered. (We could in fact use the
89     * the same stack as for waiters, but would give up the potential
90     * parallelism obtained because woken waiters help release/run
91     * others (see method postComplete). Because post-processing may
92     * race with direct calls, completions extend AtomicInteger so
93     * callers can claim the action via compareAndSet(0, 1). The
94     * Completion.run methods are all written a boringly similar
95     * uniform way (that sometimes includes unnecessary-looking
96     * checks, kept to maintain uniformity). There are enough
97     * dimensions upon which they differ that factoring to use common
98     * code isn't worthwhile.
99     *
100     * 4. The exported then/and/or methods do support a bit of
101     * factoring (see thenFunction, andBlock, etc). They must cope
102     * with the intrinsic races surrounding addition of a dependent
103     * action versus performing the action directly because the task
104     * is already complete. For example, a CF may not be complete
105     * upon entry, so a dependent completion is added, but by the time
106     * it is added, the target CF is complete, so must be directly
107     * executed. This is all done while avoiding unnecessary object
108     * construction in safe-bypass cases.
109 dl 1.1 */
110    
111 dl 1.20 // preliminary class definitions
112    
113 dl 1.1 static final class AltResult {
114     final Throwable ex; // null only for NIL
115 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
116 dl 1.1 }
117    
118     static final AltResult NIL = new AltResult(null);
119    
120     /**
121     * Simple linked list nodes to record waiting threads in a Treiber
122     * stack. See other classes such as Phaser and SynchronousQueue
123     * for more detailed explanation.
124     */
125     static final class WaitNode {
126     volatile Thread thread;
127     volatile WaitNode next;
128     }
129    
130     /**
131     * Simple linked list nodes to record completions, used in
132 dl 1.20 * basically the same way as WaitNodes. (We separate nodes from
133     * the Completions themselves mainly because for the And and Or
134     * methods, the same Completion object resides in two lists.)
135 dl 1.1 */
136     static final class CompletionNode {
137 jsr166 1.2 final Completion completion;
138 dl 1.1 volatile CompletionNode next;
139 jsr166 1.2 CompletionNode(Completion completion) { this.completion = completion; }
140 dl 1.1 }
141    
142 dl 1.20
143     // Fields
144    
145     volatile Object result; // Either the result or boxed AltResult
146 dl 1.1 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
147     volatile CompletionNode completions; // list (Treiber stack) of completions
148    
149 dl 1.20 // Basic utilities for triggering and processing completions
150     // (The Completion and Async classes and internal methods that
151     // use them are defined after the public methods.)
152    
153     /**
154     * Removes and signals all waiting threads and runs all completions.
155     */
156     final void postComplete() {
157     WaitNode q; Thread t;
158     while ((q = waiters) != null) {
159     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
160     (t = q.thread) != null) {
161     q.thread = null;
162     LockSupport.unpark(t);
163     }
164     }
165    
166     CompletionNode h; Completion c;
167     while ((h = completions) != null) {
168     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
169     (c = h.completion) != null)
170     c.run();
171     }
172     }
173    
174     /**
175     * Triggers completion with the encoding of the given arguments:
176     * if the exception is non-null, encodes it as a wrapped
177     * CompletionException unless it is one already. Otherwise uses
178     * the given result, boxed as NIL if null.
179     */
180     final void internalComplete(Object v, Throwable ex) {
181     if (result == null)
182     UNSAFE.compareAndSwapObject
183     (this, RESULT, null,
184     (ex == null) ? (v == null) ? NIL : v :
185     new AltResult((ex instanceof CompletionException) ? ex :
186     new CompletionException(ex)));
187     postComplete(); // help out even if not triggered
188     }
189    
190     /**
191     * If triggered, help release and/or process completions
192     */
193     final void helpPostComplete() {
194     if (result != null)
195     postComplete();
196     }
197    
198     // public methods
199    
200 dl 1.1 /**
201     * Creates a new incomplete CompletableFuture.
202     */
203     public CompletableFuture() {
204     }
205    
206     /**
207     * Asynchronously executes in the {@link
208     * ForkJoinPool#commonPool()}, a task that completes the returned
209     * CompletableFuture with the result of the given Supplier.
210     *
211     * @param supplier a function returning the value to be used
212 jsr166 1.14 * to complete the returned CompletableFuture
213 jsr166 1.11 * @return the CompletableFuture
214 dl 1.1 */
215 dl 1.7 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
216 dl 1.1 if (supplier == null) throw new NullPointerException();
217     CompletableFuture<U> f = new CompletableFuture<U>();
218     ForkJoinPool.commonPool().
219 dl 1.7 execute((ForkJoinTask<?>)new AsyncSupplier<U>(supplier, f));
220 dl 1.1 return f;
221     }
222    
223     /**
224     * Asynchronously executes using the given executor, a task that
225     * completes the returned CompletableFuture with the result of the
226     * given Supplier.
227     *
228     * @param supplier a function returning the value to be used
229 jsr166 1.11 * to complete the returned CompletableFuture
230 dl 1.1 * @param executor the executor to use for asynchronous execution
231 jsr166 1.11 * @return the CompletableFuture
232 dl 1.1 */
233 dl 1.7 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
234     Executor executor) {
235 dl 1.1 if (executor == null || supplier == null)
236     throw new NullPointerException();
237     CompletableFuture<U> f = new CompletableFuture<U>();
238 dl 1.7 executor.execute(new AsyncSupplier<U>(supplier, f));
239 dl 1.1 return f;
240     }
241    
242     /**
243     * Asynchronously executes in the {@link
244     * ForkJoinPool#commonPool()} a task that runs the given action,
245 jsr166 1.11 * and then completes the returned CompletableFuture.
246 dl 1.1 *
247     * @param runnable the action to run before completing the
248 jsr166 1.11 * returned CompletableFuture
249     * @return the CompletableFuture
250 dl 1.1 */
251 dl 1.7 public static CompletableFuture<Void> runAsync(Runnable runnable) {
252 dl 1.1 if (runnable == null) throw new NullPointerException();
253     CompletableFuture<Void> f = new CompletableFuture<Void>();
254     ForkJoinPool.commonPool().
255     execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
256     return f;
257     }
258    
259     /**
260     * Asynchronously executes using the given executor, a task that
261     * runs the given action, and then completes the returned
262 jsr166 1.11 * CompletableFuture.
263 dl 1.1 *
264     * @param runnable the action to run before completing the
265 jsr166 1.11 * returned CompletableFuture
266 dl 1.1 * @param executor the executor to use for asynchronous execution
267 jsr166 1.11 * @return the CompletableFuture
268 dl 1.1 */
269 dl 1.7 public static CompletableFuture<Void> runAsync(Runnable runnable,
270 jsr166 1.12 Executor executor) {
271 dl 1.1 if (executor == null || runnable == null)
272     throw new NullPointerException();
273     CompletableFuture<Void> f = new CompletableFuture<Void>();
274     executor.execute(new AsyncRunnable(runnable, f));
275     return f;
276     }
277    
278     /**
279     * Returns {@code true} if completed in any fashion: normally,
280 jsr166 1.16 * exceptionally, or via cancellation.
281 dl 1.1 *
282     * @return {@code true} if completed
283     */
284     public boolean isDone() {
285     return result != null;
286     }
287    
288     /**
289 dl 1.19 * Waits if necessary for the computation to complete, and then
290     * retrieves its result.
291     *
292     * @return the computed result
293     * @throws CancellationException if the computation was cancelled
294     * @throws ExecutionException if the computation threw an
295     * exception
296     * @throws InterruptedException if the current thread was interrupted
297     * while waiting
298     */
299 dl 1.20 @SuppressWarnings("unchecked") public T get() throws InterruptedException, ExecutionException {
300     Object r; Throwable ex, cause;
301 dl 1.19 if ((r = result) == null && (r = waitingGet(true)) == null)
302     throw new InterruptedException();
303     if (r instanceof AltResult) {
304     if ((ex = ((AltResult)r).ex) != null) {
305     if (ex instanceof CancellationException)
306     throw (CancellationException)ex;
307 dl 1.20 if ((ex instanceof CompletionException) &&
308     (cause = ex.getCause()) != null)
309     ex = cause;
310 dl 1.19 throw new ExecutionException(ex);
311     }
312     return null;
313     }
314     return (T)r;
315     }
316    
317     /**
318 dl 1.20 * Waits if necessary for at most the given time for completion,
319     * and then retrieves its result, if available.
320 dl 1.1 *
321 dl 1.20 * @param timeout the maximum time to wait
322     * @param unit the time unit of the timeout argument
323     * @return the computed result
324 dl 1.19 * @throws CancellationException if the computation was cancelled
325 dl 1.20 * @throws ExecutionException if the computation threw an
326 dl 1.19 * exception
327 dl 1.20 * @throws InterruptedException if the current thread was interrupted
328     * while waiting
329     * @throws TimeoutException if the wait timed out
330 dl 1.1 */
331 dl 1.20 @SuppressWarnings("unchecked") public T get(long timeout, TimeUnit unit)
332     throws InterruptedException, ExecutionException, TimeoutException {
333     Object r; Throwable ex, cause;
334     long nanos = unit.toNanos(timeout);
335     if (Thread.interrupted())
336     throw new InterruptedException();
337 jsr166 1.2 if ((r = result) == null)
338 dl 1.20 r = timedAwaitDone(nanos);
339 jsr166 1.2 if (r instanceof AltResult) {
340 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
341 dl 1.19 if (ex instanceof CancellationException)
342     throw (CancellationException)ex;
343 dl 1.20 if ((ex instanceof CompletionException) &&
344     (cause = ex.getCause()) != null)
345     ex = cause;
346     throw new ExecutionException(ex);
347 dl 1.5 }
348 jsr166 1.2 return null;
349     }
350     return (T)r;
351 dl 1.1 }
352    
353     /**
354 dl 1.20 * Returns the result value when complete, or throws an
355     * (unchecked) exception if completed exceptionally. To better
356     * conform with the use of common functional forms, if a
357     * computation involved in the completion of this
358     * CompletableFuture threw an exception, this method throws an
359     * (unchecked) {@link CompletionException} with the underlying
360     * exception as its cause.
361 dl 1.1 *
362 dl 1.20 * @return the result value
363 dl 1.19 * @throws CancellationException if the computation was cancelled
364 dl 1.20 * @throws CompletionException if a completion computation threw
365     * an exception
366 dl 1.1 */
367 dl 1.20 @SuppressWarnings("unchecked") public T join() {
368 dl 1.1 Object r; Throwable ex;
369 jsr166 1.2 if ((r = result) == null)
370 dl 1.20 r = waitingGet(false);
371 jsr166 1.2 if (r instanceof AltResult) {
372 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
373 dl 1.19 if (ex instanceof CancellationException)
374     throw (CancellationException)ex;
375     if (ex instanceof CompletionException)
376     throw (CompletionException)ex;
377     throw new CompletionException(ex);
378 dl 1.5 }
379 jsr166 1.2 return null;
380     }
381     return (T)r;
382 dl 1.1 }
383    
384     /**
385 dl 1.20 * Returns the result value (or throws any encountered exception)
386     * if completed, else returns the given valueIfAbsent.
387 dl 1.1 *
388 dl 1.20 * @param valueIfAbsent the value to return if not completed
389     * @return the result value, if completed, else the given valueIfAbsent
390 dl 1.1 * @throws CancellationException if the computation was cancelled
391 dl 1.20 * @throws CompletionException if a completion computation threw
392     * an exception
393 dl 1.1 */
394 dl 1.20 @SuppressWarnings("unchecked") public T getNow(T valueIfAbsent) {
395 dl 1.1 Object r; Throwable ex;
396 jsr166 1.2 if ((r = result) == null)
397 dl 1.20 return valueIfAbsent;
398 jsr166 1.2 if (r instanceof AltResult) {
399 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
400 dl 1.19 if (ex instanceof CancellationException)
401     throw (CancellationException)ex;
402 dl 1.20 if (ex instanceof CompletionException)
403     throw (CompletionException)ex;
404     throw new CompletionException(ex);
405 dl 1.5 }
406 jsr166 1.2 return null;
407     }
408     return (T)r;
409 dl 1.1 }
410    
411     /**
412     * If not already completed, sets the value returned by {@link
413     * #get()} and related methods to the given value.
414     *
415     * @param value the result value
416     * @return true if this invocation caused this CompletableFuture
417 jsr166 1.14 * to transition to a completed state, else false
418 dl 1.1 */
419     public boolean complete(T value) {
420 dl 1.20 boolean triggered =
421     result == null &&
422 dl 1.1 UNSAFE.compareAndSwapObject(this, RESULT, null,
423 dl 1.20 value == null ? NIL : value);
424     postComplete();
425     return triggered;
426 dl 1.19 }
427    
428     /**
429 dl 1.1 * If not already completed, causes invocations of {@link #get()}
430     * and related methods to throw the given exception.
431     *
432     * @param ex the exception
433     * @return true if this invocation caused this CompletableFuture
434 jsr166 1.14 * to transition to a completed state, else false
435 dl 1.1 */
436     public boolean completeExceptionally(Throwable ex) {
437     if (ex == null) throw new NullPointerException();
438 dl 1.20 boolean triggered =
439     result == null &&
440     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
441     postComplete();
442     return triggered;
443 dl 1.19 }
444    
445     /**
446 dl 1.1 * Creates and returns a CompletableFuture that is completed with
447     * the result of the given function of this CompletableFuture.
448     * If this CompletableFuture completes exceptionally,
449     * then the returned CompletableFuture also does so,
450 dl 1.19 * with a CompletionException holding this exception as
451 dl 1.1 * its cause.
452     *
453     * @param fn the function to use to compute the value of
454     * the returned CompletableFuture
455     * @return the new CompletableFuture
456     */
457 dl 1.7 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
458 dl 1.1 return thenFunction(fn, null);
459     }
460    
461     /**
462     * Creates and returns a CompletableFuture that is asynchronously
463     * completed using the {@link ForkJoinPool#commonPool()} with the
464     * result of the given function of this CompletableFuture. If
465     * this CompletableFuture completes exceptionally, then the
466     * returned CompletableFuture also does so, with a
467 dl 1.19 * CompletionException holding this exception as its cause.
468 dl 1.1 *
469     * @param fn the function to use to compute the value of
470     * the returned CompletableFuture
471     * @return the new CompletableFuture
472     */
473 dl 1.7 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
474 dl 1.1 return thenFunction(fn, ForkJoinPool.commonPool());
475     }
476    
477     /**
478     * Creates and returns a CompletableFuture that is asynchronously
479     * completed using the given executor with the result of the given
480     * function of this CompletableFuture. If this CompletableFuture
481     * completes exceptionally, then the returned CompletableFuture
482 dl 1.19 * also does so, with a CompletionException holding this exception as
483 dl 1.1 * its cause.
484     *
485     * @param fn the function to use to compute the value of
486     * the returned CompletableFuture
487     * @param executor the executor to use for asynchronous execution
488     * @return the new CompletableFuture
489     */
490 dl 1.7 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
491 jsr166 1.12 Executor executor) {
492 dl 1.1 if (executor == null) throw new NullPointerException();
493     return thenFunction(fn, executor);
494     }
495    
496     /**
497     * Creates and returns a CompletableFuture that is completed after
498 dl 1.7 * performing the given action with this CompletableFuture's
499     * result when it completes. If this CompletableFuture
500     * completes exceptionally, then the returned CompletableFuture
501 dl 1.19 * also does so, with a CompletionException holding this exception as
502 dl 1.7 * its cause.
503     *
504     * @param block the action to perform before completing the
505     * returned CompletableFuture
506     * @return the new CompletableFuture
507     */
508     public CompletableFuture<Void> thenAccept(Block<? super T> block) {
509     return thenBlock(block, null);
510     }
511    
512     /**
513     * Creates and returns a CompletableFuture that is asynchronously
514     * completed using the {@link ForkJoinPool#commonPool()} with this
515     * CompletableFuture's result when it completes. If this
516     * CompletableFuture completes exceptionally, then the returned
517 dl 1.19 * CompletableFuture also does so, with a CompletionException holding
518 dl 1.7 * this exception as its cause.
519     *
520     * @param block the action to perform before completing the
521     * returned CompletableFuture
522     * @return the new CompletableFuture
523     */
524     public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block) {
525     return thenBlock(block, ForkJoinPool.commonPool());
526     }
527    
528     /**
529     * Creates and returns a CompletableFuture that is asynchronously
530     * completed using the given executor with this
531     * CompletableFuture's result when it completes. If this
532     * CompletableFuture completes exceptionally, then the returned
533 dl 1.19 * CompletableFuture also does so, with a CompletionException holding
534 dl 1.7 * this exception as its cause.
535     *
536     * @param block the action to perform before completing the
537     * returned CompletableFuture
538     * @param executor the executor to use for asynchronous execution
539     * @return the new CompletableFuture
540     */
541     public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block,
542     Executor executor) {
543     if (executor == null) throw new NullPointerException();
544     return thenBlock(block, executor);
545     }
546    
547     /**
548     * Creates and returns a CompletableFuture that is completed after
549     * performing the given action when this CompletableFuture
550 dl 1.1 * completes. If this CompletableFuture completes exceptionally,
551     * then the returned CompletableFuture also does so, with a
552 dl 1.19 * CompletionException holding this exception as its cause.
553 dl 1.1 *
554     * @param action the action to perform before completing the
555     * returned CompletableFuture
556     * @return the new CompletableFuture
557     */
558 dl 1.7 public CompletableFuture<Void> thenRun(Runnable action) {
559 dl 1.1 return thenRunnable(action, null);
560     }
561    
562     /**
563     * Creates and returns a CompletableFuture that is asynchronously
564     * completed using the {@link ForkJoinPool#commonPool()} after
565 dl 1.7 * performing the given action when this CompletableFuture
566 dl 1.1 * completes. If this CompletableFuture completes exceptionally,
567     * then the returned CompletableFuture also does so, with a
568 dl 1.19 * CompletionException holding this exception as its cause.
569 dl 1.1 *
570     * @param action the action to perform before completing the
571     * returned CompletableFuture
572     * @return the new CompletableFuture
573     */
574 dl 1.7 public CompletableFuture<Void> thenRunAsync(Runnable action) {
575 dl 1.1 return thenRunnable(action, ForkJoinPool.commonPool());
576     }
577    
578     /**
579     * Creates and returns a CompletableFuture that is asynchronously
580     * completed using the given executor after performing the given
581 dl 1.7 * action when this CompletableFuture completes. If this
582 dl 1.1 * CompletableFuture completes exceptionally, then the returned
583 dl 1.19 * CompletableFuture also does so, with a CompletionException holding
584 dl 1.1 * this exception as its cause.
585     *
586     * @param action the action to perform before completing the
587     * returned CompletableFuture
588     * @param executor the executor to use for asynchronous execution
589     * @return the new CompletableFuture
590     */
591 jsr166 1.12 public CompletableFuture<Void> thenRunAsync(Runnable action,
592     Executor executor) {
593 dl 1.1 if (executor == null) throw new NullPointerException();
594     return thenRunnable(action, executor);
595     }
596    
597     /**
598     * Creates and returns a CompletableFuture that is completed with
599     * the result of the given function of this and the other given
600 dl 1.7 * CompletableFuture's results when both complete. If this or
601 dl 1.1 * the other CompletableFuture complete exceptionally, then the
602     * returned CompletableFuture also does so, with a
603 dl 1.19 * CompletionException holding the exception as its cause.
604 dl 1.1 *
605     * @param other the other CompletableFuture
606     * @param fn the function to use to compute the value of
607     * the returned CompletableFuture
608     * @return the new CompletableFuture
609     */
610 dl 1.17 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
611     BiFunction<? super T,? super U,? extends V> fn) {
612 dl 1.1 return andFunction(other, fn, null);
613     }
614    
615     /**
616 jsr166 1.3 * Creates and returns a CompletableFuture that is asynchronously
617 dl 1.1 * completed using the {@link ForkJoinPool#commonPool()} with
618     * the result of the given function of this and the other given
619 dl 1.7 * CompletableFuture's results when both complete. If this or
620 dl 1.1 * the other CompletableFuture complete exceptionally, then the
621     * returned CompletableFuture also does so, with a
622 dl 1.19 * CompletionException holding the exception as its cause.
623 dl 1.1 *
624     * @param other the other CompletableFuture
625     * @param fn the function to use to compute the value of
626     * the returned CompletableFuture
627     * @return the new CompletableFuture
628     */
629 dl 1.17 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
630     BiFunction<? super T,? super U,? extends V> fn) {
631 dl 1.1 return andFunction(other, fn, ForkJoinPool.commonPool());
632     }
633    
634     /**
635 jsr166 1.3 * Creates and returns a CompletableFuture that is
636 dl 1.1 * asynchronously completed using the given executor with the
637     * result of the given function of this and the other given
638 dl 1.7 * CompletableFuture's results when both complete. If this or
639 dl 1.1 * the other CompletableFuture complete exceptionally, then the
640     * returned CompletableFuture also does so, with a
641 dl 1.19 * CompletionException holding the exception as its cause.
642 dl 1.1 *
643     * @param other the other CompletableFuture
644     * @param fn the function to use to compute the value of
645     * the returned CompletableFuture
646     * @param executor the executor to use for asynchronous execution
647     * @return the new CompletableFuture
648     */
649    
650 dl 1.17 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
651     BiFunction<? super T,? super U,? extends V> fn,
652     Executor executor) {
653 dl 1.7 if (executor == null) throw new NullPointerException();
654     return andFunction(other, fn, executor);
655     }
656    
657     /**
658     * Creates and returns a CompletableFuture that is completed with
659     * the results of this and the other given CompletableFuture if
660     * both complete. If this and/or the other CompletableFuture
661     * complete exceptionally, then the returned CompletableFuture
662 dl 1.19 * also does so, with a CompletionException holding one of these
663 dl 1.7 * exceptions as its cause.
664     *
665     * @param other the other CompletableFuture
666     * @param block the action to perform before completing the
667     * returned CompletableFuture
668     * @return the new CompletableFuture
669     */
670 dl 1.17 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
671     BiBlock<? super T, ? super U> block) {
672 dl 1.7 return andBlock(other, block, null);
673     }
674    
675     /**
676     * Creates and returns a CompletableFuture that is completed
677     * asynchronously using the {@link ForkJoinPool#commonPool()} with
678     * the results of this and the other given CompletableFuture when
679     * both complete. If this and/or the other CompletableFuture
680     * complete exceptionally, then the returned CompletableFuture
681 dl 1.19 * also does so, with a CompletionException holding one of these
682 dl 1.7 * exceptions as its cause.
683     *
684     * @param other the other CompletableFuture
685     * @param block the action to perform before completing the
686     * returned CompletableFuture
687     * @return the new CompletableFuture
688     */
689 dl 1.17 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
690     BiBlock<? super T, ? super U> block) {
691 dl 1.7 return andBlock(other, block, ForkJoinPool.commonPool());
692     }
693    
694     /**
695     * Creates and returns a CompletableFuture that is completed
696     * asynchronously using the given executor with the results of
697     * this and the other given CompletableFuture when both complete.
698     * If this and/or the other CompletableFuture complete
699     * exceptionally, then the returned CompletableFuture also does
700 dl 1.19 * so, with a CompletionException holding one of these exceptions as
701 dl 1.7 * its cause.
702     *
703     * @param other the other CompletableFuture
704     * @param block the action to perform before completing the
705     * returned CompletableFuture
706     * @param executor the executor to use for asynchronous execution
707     * @return the new CompletableFuture
708     */
709 dl 1.17 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
710     BiBlock<? super T, ? super U> block,
711     Executor executor) {
712 dl 1.1 if (executor == null) throw new NullPointerException();
713 dl 1.7 return andBlock(other, block, executor);
714 dl 1.1 }
715    
716     /**
717     * Creates and returns a CompletableFuture that is completed
718 dl 1.7 * when this and the other given CompletableFuture both
719 dl 1.1 * complete. If this and/or the other CompletableFuture complete
720     * exceptionally, then the returned CompletableFuture also does
721 dl 1.19 * so, with a CompletionException holding one of these exceptions as
722 dl 1.1 * its cause.
723     *
724     * @param other the other CompletableFuture
725     * @param action the action to perform before completing the
726     * returned CompletableFuture
727     * @return the new CompletableFuture
728     */
729 dl 1.17 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
730     Runnable action) {
731 dl 1.1 return andRunnable(other, action, null);
732     }
733    
734     /**
735     * Creates and returns a CompletableFuture that is completed
736     * asynchronously using the {@link ForkJoinPool#commonPool()}
737 dl 1.7 * when this and the other given CompletableFuture both
738 dl 1.1 * complete. If this and/or the other CompletableFuture complete
739     * exceptionally, then the returned CompletableFuture also does
740 dl 1.19 * so, with a CompletionException holding one of these exceptions as
741 dl 1.1 * its cause.
742     *
743     * @param other the other CompletableFuture
744     * @param action the action to perform before completing the
745     * returned CompletableFuture
746     * @return the new CompletableFuture
747     */
748 dl 1.17 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
749     Runnable action) {
750 dl 1.1 return andRunnable(other, action, ForkJoinPool.commonPool());
751     }
752    
753     /**
754     * Creates and returns a CompletableFuture that is completed
755     * asynchronously using the given executor
756 dl 1.7 * when this and the other given CompletableFuture both
757 dl 1.1 * complete. If this and/or the other CompletableFuture complete
758     * exceptionally, then the returned CompletableFuture also does
759 dl 1.19 * so, with a CompletionException holding one of these exceptions as
760 dl 1.1 * its cause.
761     *
762     * @param other the other CompletableFuture
763     * @param action the action to perform before completing the
764     * returned CompletableFuture
765     * @param executor the executor to use for asynchronous execution
766     * @return the new CompletableFuture
767     */
768 dl 1.17 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
769     Runnable action,
770     Executor executor) {
771 dl 1.1 if (executor == null) throw new NullPointerException();
772     return andRunnable(other, action, executor);
773     }
774    
775     /**
776     * Creates and returns a CompletableFuture that is completed with
777     * the result of the given function of either this or the other
778 dl 1.7 * given CompletableFuture's results when either complete. If
779 dl 1.1 * this and/or the other CompletableFuture complete exceptionally,
780     * then the returned CompletableFuture may also do so, with a
781 dl 1.19 * CompletionException holding one of these exceptions as its cause.
782 dl 1.1 * No guarantees are made about which result or exception is used
783     * in the returned CompletableFuture.
784     *
785     * @param other the other CompletableFuture
786     * @param fn the function to use to compute the value of
787     * the returned CompletableFuture
788     * @return the new CompletableFuture
789     */
790 dl 1.17 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
791     Function<? super T, U> fn) {
792 dl 1.1 return orFunction(other, fn, null);
793     }
794    
795     /**
796     * Creates and returns a CompletableFuture that is completed
797     * asynchronously using the {@link ForkJoinPool#commonPool()} with
798     * the result of the given function of either this or the other
799 dl 1.7 * given CompletableFuture's results when either complete. If
800 dl 1.1 * this and/or the other CompletableFuture complete exceptionally,
801     * then the returned CompletableFuture may also do so, with a
802 dl 1.19 * CompletionException holding one of these exceptions as its cause.
803 dl 1.1 * No guarantees are made about which result or exception is used
804     * in the returned CompletableFuture.
805     *
806     * @param other the other CompletableFuture
807     * @param fn the function to use to compute the value of
808     * the returned CompletableFuture
809     * @return the new CompletableFuture
810     */
811 dl 1.17 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
812     Function<? super T, U> fn) {
813 dl 1.1 return orFunction(other, fn, ForkJoinPool.commonPool());
814     }
815    
816     /**
817     * Creates and returns a CompletableFuture that is completed
818     * asynchronously using the given executor with the result of the
819     * given function of either this or the other given
820 dl 1.7 * CompletableFuture's results when either complete. If this
821 dl 1.1 * and/or the other CompletableFuture complete exceptionally, then
822     * the returned CompletableFuture may also do so, with a
823 dl 1.19 * CompletionException holding one of these exceptions as its cause.
824 dl 1.1 * No guarantees are made about which result or exception is used
825     * in the returned CompletableFuture.
826     *
827     * @param other the other CompletableFuture
828     * @param fn the function to use to compute the value of
829     * the returned CompletableFuture
830     * @param executor the executor to use for asynchronous execution
831     * @return the new CompletableFuture
832     */
833 dl 1.17 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
834     Function<? super T, U> fn,
835     Executor executor) {
836 dl 1.1 if (executor == null) throw new NullPointerException();
837     return orFunction(other, fn, executor);
838     }
839    
840     /**
841 dl 1.7 * Creates and returns a CompletableFuture that is completed after
842     * performing the given action with the result of either this or the
843     * other given CompletableFuture's result, when either complete.
844     * If this and/or the other CompletableFuture complete
845     * exceptionally, then the returned CompletableFuture may also do
846 dl 1.19 * so, with a CompletionException holding one of these exceptions as
847 dl 1.7 * its cause. No guarantees are made about which exception is
848     * used in the returned CompletableFuture.
849     *
850     * @param other the other CompletableFuture
851     * @param block the action to perform before completing the
852     * returned CompletableFuture
853     * @return the new CompletableFuture
854     */
855 dl 1.17 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
856     Block<? super T> block) {
857 dl 1.7 return orBlock(other, block, null);
858     }
859    
860     /**
861     * Creates and returns a CompletableFuture that is completed
862     * asynchronously using the {@link ForkJoinPool#commonPool()},
863     * performing the given action with the result of either this or
864     * the other given CompletableFuture's result, when either
865     * complete. If this and/or the other CompletableFuture complete
866     * exceptionally, then the returned CompletableFuture may also do
867 dl 1.19 * so, with a CompletionException holding one of these exceptions as
868 dl 1.7 * its cause. No guarantees are made about which exception is
869     * used in the returned CompletableFuture.
870     *
871     * @param other the other CompletableFuture
872     * @param block the action to perform before completing the
873     * returned CompletableFuture
874     * @return the new CompletableFuture
875     */
876 dl 1.17 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
877     Block<? super T> block) {
878 dl 1.7 return orBlock(other, block, ForkJoinPool.commonPool());
879     }
880    
881     /**
882     * Creates and returns a CompletableFuture that is completed
883     * asynchronously using the given executor,
884     * performing the given action with the result of either this or
885     * the other given CompletableFuture's result, when either
886     * complete. If this and/or the other CompletableFuture complete
887     * exceptionally, then the returned CompletableFuture may also do
888 dl 1.19 * so, with a CompletionException holding one of these exceptions as
889 dl 1.7 * its cause. No guarantees are made about which exception is
890     * used in the returned CompletableFuture.
891     *
892     * @param other the other CompletableFuture
893     * @param block the action to perform before completing the
894     * returned CompletableFuture
895     * @param executor the executor to use for asynchronous execution
896     * @return the new CompletableFuture
897     */
898 dl 1.17 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
899     Block<? super T> block,
900     Executor executor) {
901 dl 1.7 if (executor == null) throw new NullPointerException();
902     return orBlock(other, block, executor);
903     }
904    
905     /**
906 dl 1.1 * Creates and returns a CompletableFuture that is completed
907     * after this or the other given CompletableFuture complete. If
908     * this and/or the other CompletableFuture complete exceptionally,
909     * then the returned CompletableFuture may also do so, with a
910 dl 1.19 * CompletionException holding one of these exceptions as its cause.
911 dl 1.1 * No guarantees are made about which exception is used in the
912     * returned CompletableFuture.
913     *
914     * @param other the other CompletableFuture
915     * @param action the action to perform before completing the
916     * returned CompletableFuture
917     * @return the new CompletableFuture
918     */
919 dl 1.17 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
920     Runnable action) {
921 dl 1.1 return orRunnable(other, action, null);
922     }
923    
924     /**
925     * Creates and returns a CompletableFuture that is completed
926     * asynchronously using the {@link ForkJoinPool#commonPool()}
927     * after this or the other given CompletableFuture complete. If
928     * this and/or the other CompletableFuture complete exceptionally,
929     * then the returned CompletableFuture may also do so, with a
930 dl 1.19 * CompletionException holding one of these exceptions as its cause.
931 dl 1.1 * No guarantees are made about which exception is used in the
932     * returned CompletableFuture.
933     *
934     * @param other the other CompletableFuture
935     * @param action the action to perform before completing the
936     * returned CompletableFuture
937     * @return the new CompletableFuture
938     */
939 dl 1.17 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
940     Runnable action) {
941 dl 1.1 return orRunnable(other, action, ForkJoinPool.commonPool());
942     }
943    
944     /**
945     * Creates and returns a CompletableFuture that is completed
946     * asynchronously using the given executor after this or the other
947     * given CompletableFuture complete. If this and/or the other
948     * CompletableFuture complete exceptionally, then the returned
949 dl 1.19 * CompletableFuture may also do so, with a CompletionException
950     * holding one of these exceptions as its cause. No guarantees are
951 dl 1.1 * made about which exception is used in the returned
952     * CompletableFuture.
953     *
954     * @param other the other CompletableFuture
955     * @param action the action to perform before completing the
956     * returned CompletableFuture
957     * @param executor the executor to use for asynchronous execution
958     * @return the new CompletableFuture
959     */
960 dl 1.17 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
961     Runnable action,
962     Executor executor) {
963 dl 1.1 if (executor == null) throw new NullPointerException();
964     return orRunnable(other, action, executor);
965     }
966    
967     /**
968 dl 1.20 * Returns a CompletableFuture (or an equivalent one) produced by
969     * the given function of the result of this CompletableFuture when
970     * completed. If this CompletableFuture completes exceptionally,
971     * then the returned CompletableFuture also does so, with a
972     * CompletionException holding this exception as its cause.
973 dl 1.17 *
974     * @param fn the function returning a new CompletableFuture.
975     * @return the CompletableFuture, that {@code isDone()} upon
976     * return if completed by the given function, or an exception
977     * occurs.
978     */
979 dl 1.20 @SuppressWarnings("unchecked") public <U> CompletableFuture<U> thenCompose(Function<? super T,
980 dl 1.17 CompletableFuture<U>> fn) {
981     if (fn == null) throw new NullPointerException();
982     CompletableFuture<U> dst = null;
983     ThenCompose<T,U> d = null;
984     Object r;
985     if ((r = result) == null) {
986     dst = new CompletableFuture<U>();
987     CompletionNode p = new CompletionNode
988     (d = new ThenCompose<T,U>(this, fn, dst));
989     while ((r = result) == null) {
990     if (UNSAFE.compareAndSwapObject
991     (this, COMPLETIONS, p.next = completions, p))
992     break;
993     }
994     }
995     if (r != null && (d == null || d.compareAndSet(0, 1))) {
996     T t; Throwable ex = null;
997     if (r instanceof AltResult) {
998     ex = ((AltResult)r).ex;
999     t = null;
1000     }
1001     else
1002     t = (T) r;
1003     if (ex == null) {
1004     try {
1005     dst = fn.apply(t);
1006     } catch (Throwable rex) {
1007     ex = rex;
1008     }
1009     }
1010     if (dst == null) {
1011     dst = new CompletableFuture<U>();
1012     if (ex == null)
1013     ex = new NullPointerException();
1014     }
1015     if (ex != null)
1016 dl 1.20 dst.internalComplete(null, ex);
1017 dl 1.17 }
1018 dl 1.20 helpPostComplete();
1019     dst.helpPostComplete();
1020 dl 1.17 return dst;
1021     }
1022    
1023     /**
1024 dl 1.6 * Creates and returns a CompletableFuture that is completed with
1025     * the result of the given function of the exception triggering
1026 dl 1.7 * this CompletableFuture's completion when it completes
1027 dl 1.6 * exceptionally; Otherwise, if this CompletableFuture completes
1028     * normally, then the returned CompletableFuture also completes
1029     * normally with the same value.
1030     *
1031     * @param fn the function to use to compute the value of the
1032     * returned CompletableFuture if this CompletableFuture completed
1033     * exceptionally
1034 dl 1.1 * @return the new CompletableFuture
1035     */
1036 dl 1.20 @SuppressWarnings("unchecked") public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
1037 dl 1.6 if (fn == null) throw new NullPointerException();
1038     CompletableFuture<T> dst = new CompletableFuture<T>();
1039 dl 1.1 ExceptionAction<T> d = null;
1040 dl 1.6 Object r;
1041 jsr166 1.2 if ((r = result) == null) {
1042 dl 1.1 CompletionNode p =
1043 dl 1.6 new CompletionNode(d = new ExceptionAction<T>(this, fn, dst));
1044 dl 1.1 while ((r = result) == null) {
1045     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1046     p.next = completions, p))
1047     break;
1048     }
1049     }
1050 dl 1.6 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1051 dl 1.20 T t = null; Throwable ex, dx = null;
1052 dl 1.6 if (r instanceof AltResult) {
1053     if ((ex = ((AltResult)r).ex) != null) {
1054     try {
1055 dl 1.20 t = fn.apply(ex);
1056 dl 1.6 } catch (Throwable rex) {
1057 dl 1.20 dx = rex;
1058 dl 1.6 }
1059     }
1060 dl 1.1 }
1061 dl 1.6 else
1062     t = (T) r;
1063 dl 1.20 dst.internalComplete(t, dx);
1064 dl 1.1 }
1065 dl 1.20 helpPostComplete();
1066 dl 1.17 return dst;
1067     }
1068    
1069     /**
1070     * Creates and returns a CompletableFuture that is completed with
1071     * the result of the given function of the result and exception of
1072     * this CompletableFuture's completion when it completes. The
1073     * given function is invoked with the result (or {@code null} if
1074     * none) and the exception (or {@code null} if none) of this
1075     * CompletableFuture when complete.
1076     *
1077     * @param fn the function to use to compute the value of the
1078     * returned CompletableFuture
1079    
1080     * @return the new CompletableFuture
1081     */
1082 dl 1.20 @SuppressWarnings("unchecked") public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
1083 dl 1.17 if (fn == null) throw new NullPointerException();
1084     CompletableFuture<U> dst = new CompletableFuture<U>();
1085     ThenHandle<T,U> d = null;
1086     Object r;
1087     if ((r = result) == null) {
1088     CompletionNode p =
1089     new CompletionNode(d = new ThenHandle<T,U>(this, fn, dst));
1090     while ((r = result) == null) {
1091     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1092     p.next = completions, p))
1093     break;
1094     }
1095     }
1096     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1097     T t; Throwable ex;
1098     if (r instanceof AltResult) {
1099     ex = ((AltResult)r).ex;
1100     t = null;
1101     }
1102     else {
1103     ex = null;
1104     t = (T) r;
1105     }
1106 dl 1.20 U u = null; Throwable dx = null;
1107 dl 1.17 try {
1108 dl 1.20 u = fn.apply(t, ex);
1109 dl 1.17 } catch (Throwable rex) {
1110 dl 1.20 dx = rex;
1111 dl 1.17 }
1112 dl 1.20 dst.internalComplete(u, dx);
1113 dl 1.17 }
1114 dl 1.20 helpPostComplete();
1115 dl 1.1 return dst;
1116     }
1117    
1118     /**
1119     * Attempts to complete this CompletableFuture with
1120     * a {@link CancellationException}.
1121     *
1122     * @param mayInterruptIfRunning this value has no effect in this
1123     * implementation because interrupts are not used to control
1124     * processing.
1125     *
1126     * @return {@code true} if this task is now cancelled
1127     */
1128     public boolean cancel(boolean mayInterruptIfRunning) {
1129     Object r;
1130     while ((r = result) == null) {
1131     r = new AltResult(new CancellationException());
1132     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
1133     postComplete();
1134     return true;
1135     }
1136     }
1137     return ((r instanceof AltResult) &&
1138     (((AltResult)r).ex instanceof CancellationException));
1139     }
1140    
1141     /**
1142     * Returns {@code true} if this CompletableFuture was cancelled
1143     * before it completed normally.
1144     *
1145     * @return {@code true} if this CompletableFuture was cancelled
1146     * before it completed normally
1147     */
1148     public boolean isCancelled() {
1149     Object r;
1150     return ((r = result) != null &&
1151     (r instanceof AltResult) &&
1152     (((AltResult)r).ex instanceof CancellationException));
1153     }
1154    
1155     /**
1156 dl 1.6 * Forcibly sets or resets the value subsequently returned by
1157     * method get() and related methods, whether or not already
1158     * completed. This method is designed for use only in error
1159     * recovery actions, and even in such situations may result in
1160     * ongoing dependent completions using established versus
1161     * overwritten values.
1162 dl 1.1 *
1163     * @param value the completion value
1164     */
1165 dl 1.6 public void obtrudeValue(T value) {
1166 jsr166 1.2 result = (value == null) ? NIL : value;
1167 dl 1.1 postComplete();
1168     }
1169    
1170     /* ------------- waiting for completions -------------- */
1171    
1172 jsr166 1.2 /**
1173 dl 1.1 * Heuristic spin value for waitingGet() before blocking on
1174     * multiprocessors
1175     */
1176     static final int WAITING_GET_SPINS = 256;
1177    
1178     /**
1179 dl 1.19 * Returns raw result after waiting, or null if interruptible and
1180     * interrupted.
1181 dl 1.1 */
1182 dl 1.19 private Object waitingGet(boolean interruptible) {
1183 dl 1.1 WaitNode q = null;
1184     boolean queued = false, interrupted = false;
1185     int h = 0, spins = 0;
1186     for (Object r;;) {
1187     if ((r = result) != null) {
1188 dl 1.5 Throwable ex;
1189 dl 1.1 if (q != null) // suppress unpark
1190     q.thread = null;
1191     postComplete(); // help release others
1192     if (interrupted)
1193     Thread.currentThread().interrupt();
1194 dl 1.19 return r;
1195 dl 1.1 }
1196     else if (h == 0) {
1197     h = ThreadLocalRandom.current().nextInt();
1198     if (Runtime.getRuntime().availableProcessors() > 1)
1199     spins = WAITING_GET_SPINS;
1200     }
1201     else if (spins > 0) {
1202     h ^= h << 1; // xorshift
1203 jsr166 1.2 h ^= h >>> 3;
1204 dl 1.1 if ((h ^= h << 10) >= 0)
1205     --spins;
1206 jsr166 1.2 }
1207 dl 1.1 else if (q == null)
1208     q = new WaitNode();
1209     else if (!queued)
1210     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1211     q.next = waiters, q);
1212 dl 1.19 else if (Thread.interrupted()) {
1213 dl 1.20 if (interruptible) {
1214     removeWaiter(q);
1215 dl 1.19 return null;
1216 dl 1.20 }
1217 dl 1.1 interrupted = true;
1218 dl 1.19 }
1219 dl 1.1 else if (q.thread == null)
1220     q.thread = Thread.currentThread();
1221     else
1222     LockSupport.park(this);
1223     }
1224     }
1225    
1226     /**
1227     * Awaits completion or aborts on interrupt or timeout.
1228     *
1229     * @param nanos time to wait
1230     * @return raw result
1231     */
1232     private Object timedAwaitDone(long nanos)
1233     throws InterruptedException, TimeoutException {
1234     final long deadline = System.nanoTime() + nanos;
1235     WaitNode q = null;
1236     boolean queued = false;
1237     for (Object r;;) {
1238     if (Thread.interrupted()) {
1239     removeWaiter(q);
1240     throw new InterruptedException();
1241     }
1242     else if ((r = result) != null) {
1243     if (q != null)
1244     q.thread = null;
1245     postComplete();
1246     return r;
1247     }
1248     else if (q == null)
1249     q = new WaitNode();
1250     else if (!queued)
1251     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1252     q.next = waiters, q);
1253     else if ((nanos = deadline - System.nanoTime()) <= 0L) {
1254     removeWaiter(q);
1255     throw new TimeoutException();
1256     }
1257     else if (q.thread == null)
1258     q.thread = Thread.currentThread();
1259     else
1260     LockSupport.parkNanos(this, nanos);
1261     }
1262     }
1263    
1264     /**
1265     * Tries to unlink a timed-out or interrupted wait node to avoid
1266     * accumulating garbage. Internal nodes are simply unspliced
1267     * without CAS since it is harmless if they are traversed anyway
1268     * by releasers. To avoid effects of unsplicing from already
1269     * removed nodes, the list is retraversed in case of an apparent
1270     * race. This is slow when there are a lot of nodes, but we don't
1271     * expect lists to be long enough to outweigh higher-overhead
1272     * schemes.
1273     */
1274     private void removeWaiter(WaitNode node) {
1275     if (node != null) {
1276     node.thread = null;
1277     retry:
1278     for (;;) { // restart on removeWaiter race
1279     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
1280     s = q.next;
1281     if (q.thread != null)
1282     pred = q;
1283     else if (pred != null) {
1284     pred.next = s;
1285     if (pred.thread == null) // check for race
1286     continue retry;
1287     }
1288     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
1289     continue retry;
1290     }
1291     break;
1292     }
1293     }
1294     }
1295    
1296     /* ------------- Async tasks -------------- */
1297    
1298     /** Base class can act as either FJ or plain Runnable */
1299     static abstract class Async extends ForkJoinTask<Void> implements Runnable {
1300     public final Void getRawResult() { return null; }
1301     public final void setRawResult(Void v) { }
1302     public final void run() { exec(); }
1303     }
1304    
1305     static final class AsyncRunnable extends Async {
1306 dl 1.21 final Runnable fn;
1307 dl 1.1 final CompletableFuture<Void> dst;
1308 dl 1.21 AsyncRunnable(Runnable fn, CompletableFuture<Void> dst) {
1309     this.fn = fn; this.dst = dst;
1310 dl 1.1 }
1311     public final boolean exec() {
1312     CompletableFuture<Void> d;
1313 dl 1.21 if ((d = this.dst) != null) {
1314     Throwable ex;
1315     try {
1316     fn.run();
1317     ex = null;
1318     } catch (Throwable rex) {
1319     ex = rex;
1320     }
1321     d.internalComplete(null, ex);
1322 dl 1.1 }
1323     return true;
1324     }
1325     private static final long serialVersionUID = 5232453952276885070L;
1326     }
1327    
1328     static final class AsyncSupplier<U> extends Async {
1329 dl 1.21 final Supplier<U> fn;
1330 dl 1.1 final CompletableFuture<U> dst;
1331 dl 1.21 AsyncSupplier(Supplier<U> fn, CompletableFuture<U> dst) {
1332     this.fn = fn; this.dst = dst;
1333 dl 1.1 }
1334     public final boolean exec() {
1335     CompletableFuture<U> d;
1336 dl 1.21 if ((d = this.dst) != null) {
1337     U u; Throwable ex;
1338     try {
1339     u = fn.get();
1340     ex = null;
1341     } catch (Throwable rex) {
1342     ex = rex;
1343     u = null;
1344     }
1345     d.internalComplete(u, ex);
1346 dl 1.1 }
1347     return true;
1348     }
1349     private static final long serialVersionUID = 5232453952276885070L;
1350     }
1351    
1352     static final class AsyncFunction<T,U> extends Async {
1353     Function<? super T,? extends U> fn;
1354     T arg;
1355     final CompletableFuture<U> dst;
1356     AsyncFunction(T arg, Function<? super T,? extends U> fn,
1357     CompletableFuture<U> dst) {
1358     this.arg = arg; this.fn = fn; this.dst = dst;
1359     }
1360     public final boolean exec() {
1361 dl 1.21 CompletableFuture<U> d; U u; Throwable ex;
1362     if ((d = this.dst) != null) {
1363     try {
1364     u = fn.apply(arg);
1365     ex = null;
1366     } catch (Throwable rex) {
1367     ex = rex;
1368     u = null;
1369     }
1370     d.internalComplete(u, ex);
1371 dl 1.1 }
1372     return true;
1373     }
1374     private static final long serialVersionUID = 5232453952276885070L;
1375     }
1376    
1377     static final class AsyncBiFunction<T,U,V> extends Async {
1378     final BiFunction<? super T,? super U,? extends V> fn;
1379     final T arg1;
1380     final U arg2;
1381     final CompletableFuture<V> dst;
1382     AsyncBiFunction(T arg1, U arg2,
1383     BiFunction<? super T,? super U,? extends V> fn,
1384     CompletableFuture<V> dst) {
1385     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1386     }
1387     public final boolean exec() {
1388 dl 1.21 CompletableFuture<V> d; V v; Throwable ex;
1389     if ((d = this.dst) != null) {
1390     try {
1391     v = fn.apply(arg1, arg2);
1392     ex = null;
1393     } catch (Throwable rex) {
1394     ex = rex;
1395     v = null;
1396     }
1397     d.internalComplete(v, ex);
1398 dl 1.1 }
1399     return true;
1400     }
1401     private static final long serialVersionUID = 5232453952276885070L;
1402     }
1403    
1404 dl 1.7 static final class AsyncBlock<T> extends Async {
1405     Block<? super T> fn;
1406     T arg;
1407     final CompletableFuture<Void> dst;
1408     AsyncBlock(T arg, Block<? super T> fn,
1409     CompletableFuture<Void> dst) {
1410     this.arg = arg; this.fn = fn; this.dst = dst;
1411     }
1412     public final boolean exec() {
1413 dl 1.21 CompletableFuture<Void> d; Throwable ex;
1414     if ((d = this.dst) != null) {
1415     try {
1416     fn.accept(arg);
1417     ex = null;
1418     } catch (Throwable rex) {
1419     ex = rex;
1420     }
1421     d.internalComplete(null, ex);
1422 dl 1.7 }
1423     return true;
1424     }
1425     private static final long serialVersionUID = 5232453952276885070L;
1426     }
1427    
1428     static final class AsyncBiBlock<T,U> extends Async {
1429     final BiBlock<? super T,? super U> fn;
1430     final T arg1;
1431     final U arg2;
1432     final CompletableFuture<Void> dst;
1433     AsyncBiBlock(T arg1, U arg2,
1434     BiBlock<? super T,? super U> fn,
1435     CompletableFuture<Void> dst) {
1436     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1437     }
1438     public final boolean exec() {
1439 dl 1.21 CompletableFuture<Void> d; Throwable ex;
1440     if ((d = this.dst) != null) {
1441     try {
1442     fn.accept(arg1, arg2);
1443     ex = null;
1444     } catch (Throwable rex) {
1445     ex = rex;
1446     }
1447     d.internalComplete(null, ex);
1448 dl 1.7 }
1449     return true;
1450     }
1451     private static final long serialVersionUID = 5232453952276885070L;
1452     }
1453    
1454 dl 1.1 /* ------------- Completions -------------- */
1455    
1456     // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1457     static abstract class Completion extends AtomicInteger implements Runnable {
1458     }
1459    
1460     static final class ThenFunction<T,U> extends Completion {
1461 jsr166 1.2 final CompletableFuture<? extends T> src;
1462     final Function<? super T,? extends U> fn;
1463     final CompletableFuture<U> dst;
1464 dl 1.1 final Executor executor;
1465     ThenFunction(CompletableFuture<? extends T> src,
1466     final Function<? super T,? extends U> fn,
1467     final CompletableFuture<U> dst, Executor executor) {
1468     this.src = src; this.fn = fn; this.dst = dst;
1469     this.executor = executor;
1470     }
1471 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1472 dl 1.1 CompletableFuture<? extends T> a;
1473     Function<? super T,? extends U> fn;
1474     CompletableFuture<U> dst;
1475 jsr166 1.2 Object r; T t; Throwable ex;
1476     if ((dst = this.dst) != null &&
1477 dl 1.1 (fn = this.fn) != null &&
1478     (a = this.src) != null &&
1479     (r = a.result) != null &&
1480     compareAndSet(0, 1)) {
1481 jsr166 1.2 if (r instanceof AltResult) {
1482 dl 1.19 ex = ((AltResult)r).ex;
1483 dl 1.1 t = null;
1484     }
1485 dl 1.17 else {
1486     ex = null;
1487 dl 1.1 t = (T) r;
1488     }
1489 dl 1.20 Executor e = executor;
1490     U u = null;
1491 dl 1.17 if (ex == null) {
1492     try {
1493 dl 1.20 if (e != null)
1494     e.execute(new AsyncFunction<T,U>(t, fn, dst));
1495 dl 1.17 else
1496 dl 1.20 u = fn.apply(t);
1497 dl 1.17 } catch (Throwable rex) {
1498     ex = rex;
1499     }
1500     }
1501 dl 1.20 if (e == null || ex != null)
1502     dst.internalComplete(u, ex);
1503 dl 1.1 }
1504     }
1505 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1506 dl 1.1 }
1507    
1508 dl 1.7 static final class ThenBlock<T> extends Completion {
1509     final CompletableFuture<? extends T> src;
1510     final Block<? super T> fn;
1511     final CompletableFuture<Void> dst;
1512     final Executor executor;
1513     ThenBlock(CompletableFuture<? extends T> src,
1514     final Block<? super T> fn,
1515     final CompletableFuture<Void> dst, Executor executor) {
1516     this.src = src; this.fn = fn; this.dst = dst;
1517     this.executor = executor;
1518     }
1519 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1520 dl 1.7 CompletableFuture<? extends T> a;
1521     Block<? super T> fn;
1522     CompletableFuture<Void> dst;
1523     Object r; T t; Throwable ex;
1524     if ((dst = this.dst) != null &&
1525     (fn = this.fn) != null &&
1526     (a = this.src) != null &&
1527     (r = a.result) != null &&
1528     compareAndSet(0, 1)) {
1529     if (r instanceof AltResult) {
1530 dl 1.19 ex = ((AltResult)r).ex;
1531 dl 1.7 t = null;
1532     }
1533 dl 1.17 else {
1534     ex = null;
1535 dl 1.7 t = (T) r;
1536 dl 1.17 }
1537 dl 1.20 Executor e = executor;
1538 dl 1.17 if (ex == null) {
1539     try {
1540 dl 1.20 if (e != null)
1541     e.execute(new AsyncBlock<T>(t, fn, dst));
1542     else
1543 dl 1.17 fn.accept(t);
1544     } catch (Throwable rex) {
1545     ex = rex;
1546 dl 1.7 }
1547     }
1548 dl 1.20 if (e == null || ex != null)
1549     dst.internalComplete(null, ex);
1550 dl 1.7 }
1551     }
1552 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1553 dl 1.7 }
1554    
1555 dl 1.1 static final class ThenRunnable<T> extends Completion {
1556 jsr166 1.2 final CompletableFuture<? extends T> src;
1557     final Runnable fn;
1558     final CompletableFuture<Void> dst;
1559 dl 1.1 final Executor executor;
1560     ThenRunnable(CompletableFuture<? extends T> src,
1561     Runnable fn,
1562     CompletableFuture<Void> dst,
1563     Executor executor) {
1564     this.src = src; this.fn = fn; this.dst = dst;
1565     this.executor = executor;
1566     }
1567 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1568 dl 1.1 CompletableFuture<? extends T> a;
1569     Runnable fn;
1570     CompletableFuture<Void> dst;
1571 jsr166 1.2 Object r; Throwable ex;
1572     if ((dst = this.dst) != null &&
1573 dl 1.1 (fn = this.fn) != null &&
1574     (a = this.src) != null &&
1575     (r = a.result) != null &&
1576     compareAndSet(0, 1)) {
1577 dl 1.19 if (r instanceof AltResult)
1578     ex = ((AltResult)r).ex;
1579 dl 1.17 else
1580     ex = null;
1581 dl 1.20 Executor e = executor;
1582 dl 1.17 if (ex == null) {
1583     try {
1584 dl 1.20 if (e != null)
1585     e.execute(new AsyncRunnable(fn, dst));
1586     else
1587 dl 1.17 fn.run();
1588     } catch (Throwable rex) {
1589     ex = rex;
1590 dl 1.1 }
1591     }
1592 dl 1.20 if (e == null || ex != null)
1593     dst.internalComplete(null, ex);
1594 dl 1.1 }
1595     }
1596 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1597 dl 1.1 }
1598    
1599     static final class AndFunction<T,U,V> extends Completion {
1600 jsr166 1.2 final CompletableFuture<? extends T> src;
1601     final CompletableFuture<? extends U> snd;
1602     final BiFunction<? super T,? super U,? extends V> fn;
1603     final CompletableFuture<V> dst;
1604 dl 1.1 final Executor executor;
1605     AndFunction(CompletableFuture<? extends T> src,
1606     CompletableFuture<? extends U> snd,
1607     BiFunction<? super T,? super U,? extends V> fn,
1608     CompletableFuture<V> dst, Executor executor) {
1609     this.src = src; this.snd = snd;
1610     this.fn = fn; this.dst = dst;
1611     this.executor = executor;
1612     }
1613 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1614 jsr166 1.2 Object r, s; T t; U u; Throwable ex;
1615 dl 1.1 CompletableFuture<? extends T> a;
1616     CompletableFuture<? extends U> b;
1617     BiFunction<? super T,? super U,? extends V> fn;
1618     CompletableFuture<V> dst;
1619 jsr166 1.2 if ((dst = this.dst) != null &&
1620 dl 1.1 (fn = this.fn) != null &&
1621     (a = this.src) != null &&
1622     (r = a.result) != null &&
1623     (b = this.snd) != null &&
1624     (s = b.result) != null &&
1625     compareAndSet(0, 1)) {
1626 jsr166 1.2 if (r instanceof AltResult) {
1627 dl 1.19 ex = ((AltResult)r).ex;
1628 jsr166 1.2 t = null;
1629     }
1630 dl 1.19 else {
1631     ex = null;
1632 jsr166 1.2 t = (T) r;
1633 dl 1.19 }
1634     if (ex != null)
1635     u = null;
1636     else if (s instanceof AltResult) {
1637     ex = ((AltResult)s).ex;
1638 jsr166 1.2 u = null;
1639     }
1640     else
1641     u = (U) s;
1642 dl 1.20 Executor e = executor;
1643     V v = null;
1644 dl 1.19 if (ex == null) {
1645     try {
1646 dl 1.20 if (e != null)
1647     e.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1648 dl 1.19 else
1649 dl 1.20 v = fn.apply(t, u);
1650 dl 1.19 } catch (Throwable rex) {
1651     ex = rex;
1652     }
1653 dl 1.1 }
1654 dl 1.20 if (e == null || ex != null)
1655     dst.internalComplete(v, ex);
1656 jsr166 1.2 }
1657     }
1658 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1659 dl 1.1 }
1660    
1661 dl 1.7 static final class AndBlock<T,U> extends Completion {
1662     final CompletableFuture<? extends T> src;
1663     final CompletableFuture<? extends U> snd;
1664     final BiBlock<? super T,? super U> fn;
1665     final CompletableFuture<Void> dst;
1666     final Executor executor;
1667     AndBlock(CompletableFuture<? extends T> src,
1668     CompletableFuture<? extends U> snd,
1669     BiBlock<? super T,? super U> fn,
1670     CompletableFuture<Void> dst, Executor executor) {
1671     this.src = src; this.snd = snd;
1672     this.fn = fn; this.dst = dst;
1673     this.executor = executor;
1674     }
1675 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1676 dl 1.7 Object r, s; T t; U u; Throwable ex;
1677     CompletableFuture<? extends T> a;
1678     CompletableFuture<? extends U> b;
1679     BiBlock<? super T,? super U> fn;
1680     CompletableFuture<Void> dst;
1681     if ((dst = this.dst) != null &&
1682     (fn = this.fn) != null &&
1683     (a = this.src) != null &&
1684     (r = a.result) != null &&
1685     (b = this.snd) != null &&
1686     (s = b.result) != null &&
1687     compareAndSet(0, 1)) {
1688     if (r instanceof AltResult) {
1689 dl 1.19 ex = ((AltResult)r).ex;
1690 dl 1.7 t = null;
1691     }
1692 dl 1.19 else {
1693     ex = null;
1694 dl 1.7 t = (T) r;
1695 dl 1.19 }
1696     if (ex != null)
1697     u = null;
1698     else if (s instanceof AltResult) {
1699     ex = ((AltResult)s).ex;
1700 dl 1.7 u = null;
1701     }
1702     else
1703     u = (U) s;
1704 dl 1.20 Executor e = executor;
1705 dl 1.19 if (ex == null) {
1706     try {
1707 dl 1.20 if (e != null)
1708     e.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
1709     else
1710 dl 1.19 fn.accept(t, u);
1711     } catch (Throwable rex) {
1712     ex = rex;
1713 dl 1.7 }
1714     }
1715 dl 1.20 if (e == null || ex != null)
1716     dst.internalComplete(null, ex);
1717 dl 1.7 }
1718     }
1719 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1720 dl 1.7 }
1721    
1722 dl 1.1 static final class AndRunnable<T> extends Completion {
1723 jsr166 1.2 final CompletableFuture<? extends T> src;
1724     final CompletableFuture<?> snd;
1725     final Runnable fn;
1726     final CompletableFuture<Void> dst;
1727 dl 1.1 final Executor executor;
1728     AndRunnable(CompletableFuture<? extends T> src,
1729     CompletableFuture<?> snd,
1730     Runnable fn,
1731     CompletableFuture<Void> dst, Executor executor) {
1732     this.src = src; this.snd = snd;
1733     this.fn = fn; this.dst = dst;
1734     this.executor = executor;
1735     }
1736 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1737 jsr166 1.2 Object r, s; Throwable ex;
1738 dl 1.1 final CompletableFuture<? extends T> a;
1739     final CompletableFuture<?> b;
1740     final Runnable fn;
1741     final CompletableFuture<Void> dst;
1742 jsr166 1.2 if ((dst = this.dst) != null &&
1743 dl 1.1 (fn = this.fn) != null &&
1744     (a = this.src) != null &&
1745     (r = a.result) != null &&
1746     (b = this.snd) != null &&
1747     (s = b.result) != null &&
1748     compareAndSet(0, 1)) {
1749 dl 1.19 if (r instanceof AltResult)
1750     ex = ((AltResult)r).ex;
1751     else
1752     ex = null;
1753     if (ex == null && (s instanceof AltResult))
1754     ex = ((AltResult)s).ex;
1755 dl 1.20 Executor e = executor;
1756 dl 1.19 if (ex == null) {
1757     try {
1758 dl 1.20 if (e != null)
1759     e.execute(new AsyncRunnable(fn, dst));
1760     else
1761 dl 1.19 fn.run();
1762     } catch (Throwable rex) {
1763     ex = rex;
1764 dl 1.1 }
1765 jsr166 1.2 }
1766 dl 1.20 if (e == null || ex != null)
1767     dst.internalComplete(null, ex);
1768 jsr166 1.2 }
1769     }
1770 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1771 dl 1.1 }
1772    
1773     static final class OrFunction<T,U> extends Completion {
1774 jsr166 1.2 final CompletableFuture<? extends T> src;
1775     final CompletableFuture<? extends T> snd;
1776     final Function<? super T,? extends U> fn;
1777     final CompletableFuture<U> dst;
1778 dl 1.1 final Executor executor;
1779     OrFunction(CompletableFuture<? extends T> src,
1780     CompletableFuture<? extends T> snd,
1781     Function<? super T,? extends U> fn,
1782     CompletableFuture<U> dst, Executor executor) {
1783     this.src = src; this.snd = snd;
1784     this.fn = fn; this.dst = dst;
1785     this.executor = executor;
1786     }
1787 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1788 jsr166 1.2 Object r; T t; Throwable ex;
1789 dl 1.1 CompletableFuture<? extends T> a;
1790     CompletableFuture<? extends T> b;
1791     Function<? super T,? extends U> fn;
1792     CompletableFuture<U> dst;
1793 jsr166 1.2 if ((dst = this.dst) != null &&
1794 dl 1.1 (fn = this.fn) != null &&
1795     (((a = this.src) != null && (r = a.result) != null) ||
1796     ((b = this.snd) != null && (r = b.result) != null)) &&
1797     compareAndSet(0, 1)) {
1798 jsr166 1.2 if (r instanceof AltResult) {
1799 dl 1.19 ex = ((AltResult)r).ex;
1800 jsr166 1.2 t = null;
1801     }
1802 dl 1.19 else {
1803     ex = null;
1804 jsr166 1.2 t = (T) r;
1805 dl 1.1 }
1806 dl 1.20 Executor e = executor;
1807     U u = null;
1808 dl 1.19 if (ex == null) {
1809     try {
1810 dl 1.20 if (e != null)
1811     e.execute(new AsyncFunction<T,U>(t, fn, dst));
1812 dl 1.19 else
1813 dl 1.20 u = fn.apply(t);
1814 dl 1.19 } catch (Throwable rex) {
1815     ex = rex;
1816     }
1817     }
1818 dl 1.20 if (e == null || ex != null)
1819     dst.internalComplete(u, ex);
1820 jsr166 1.2 }
1821     }
1822 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1823 dl 1.1 }
1824    
1825 dl 1.7 static final class OrBlock<T> extends Completion {
1826     final CompletableFuture<? extends T> src;
1827     final CompletableFuture<? extends T> snd;
1828     final Block<? super T> fn;
1829     final CompletableFuture<Void> dst;
1830     final Executor executor;
1831     OrBlock(CompletableFuture<? extends T> src,
1832     CompletableFuture<? extends T> snd,
1833     Block<? super T> fn,
1834     CompletableFuture<Void> dst, Executor executor) {
1835     this.src = src; this.snd = snd;
1836     this.fn = fn; this.dst = dst;
1837     this.executor = executor;
1838     }
1839 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1840 dl 1.7 Object r; T t; Throwable ex;
1841     CompletableFuture<? extends T> a;
1842     CompletableFuture<? extends T> b;
1843     Block<? super T> fn;
1844     CompletableFuture<Void> dst;
1845     if ((dst = this.dst) != null &&
1846     (fn = this.fn) != null &&
1847     (((a = this.src) != null && (r = a.result) != null) ||
1848     ((b = this.snd) != null && (r = b.result) != null)) &&
1849     compareAndSet(0, 1)) {
1850     if (r instanceof AltResult) {
1851 dl 1.19 ex = ((AltResult)r).ex;
1852 dl 1.7 t = null;
1853     }
1854 dl 1.19 else {
1855     ex = null;
1856 dl 1.7 t = (T) r;
1857 dl 1.19 }
1858 dl 1.20 Executor e = executor;
1859 dl 1.19 if (ex == null) {
1860     try {
1861 dl 1.20 if (e != null)
1862     e.execute(new AsyncBlock<T>(t, fn, dst));
1863     else
1864 dl 1.19 fn.accept(t);
1865     } catch (Throwable rex) {
1866     ex = rex;
1867 dl 1.7 }
1868     }
1869 dl 1.20 if (e == null || ex != null)
1870     dst.internalComplete(null, ex);
1871 dl 1.7 }
1872     }
1873 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1874 dl 1.7 }
1875    
1876 dl 1.1 static final class OrRunnable<T> extends Completion {
1877 jsr166 1.2 final CompletableFuture<? extends T> src;
1878     final CompletableFuture<?> snd;
1879     final Runnable fn;
1880     final CompletableFuture<Void> dst;
1881 dl 1.1 final Executor executor;
1882     OrRunnable(CompletableFuture<? extends T> src,
1883     CompletableFuture<?> snd,
1884     Runnable fn,
1885     CompletableFuture<Void> dst, Executor executor) {
1886     this.src = src; this.snd = snd;
1887     this.fn = fn; this.dst = dst;
1888     this.executor = executor;
1889     }
1890 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1891 jsr166 1.2 Object r; Throwable ex;
1892 dl 1.1 CompletableFuture<? extends T> a;
1893     final CompletableFuture<?> b;
1894     final Runnable fn;
1895     final CompletableFuture<Void> dst;
1896 jsr166 1.2 if ((dst = this.dst) != null &&
1897 dl 1.1 (fn = this.fn) != null &&
1898     (((a = this.src) != null && (r = a.result) != null) ||
1899     ((b = this.snd) != null && (r = b.result) != null)) &&
1900     compareAndSet(0, 1)) {
1901 dl 1.19 if (r instanceof AltResult)
1902     ex = ((AltResult)r).ex;
1903     else
1904     ex = null;
1905 dl 1.20 Executor e = executor;
1906 dl 1.19 if (ex == null) {
1907 dl 1.1 try {
1908 dl 1.20 if (e != null)
1909     e.execute(new AsyncRunnable(fn, dst));
1910     else
1911 dl 1.1 fn.run();
1912     } catch (Throwable rex) {
1913 dl 1.19 ex = rex;
1914 dl 1.1 }
1915     }
1916 dl 1.20 if (e == null || ex != null)
1917     dst.internalComplete(null, ex);
1918 jsr166 1.2 }
1919     }
1920 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1921 dl 1.1 }
1922    
1923     static final class ExceptionAction<T> extends Completion {
1924 jsr166 1.2 final CompletableFuture<? extends T> src;
1925 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1926     final CompletableFuture<T> dst;
1927 dl 1.1 ExceptionAction(CompletableFuture<? extends T> src,
1928 dl 1.6 Function<? super Throwable, ? extends T> fn,
1929     CompletableFuture<T> dst) {
1930 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1931     }
1932 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1933 dl 1.1 CompletableFuture<? extends T> a;
1934 dl 1.6 Function<? super Throwable, ? extends T> fn;
1935     CompletableFuture<T> dst;
1936 dl 1.20 Object r; T t = null; Throwable ex, dx = null;
1937 jsr166 1.2 if ((dst = this.dst) != null &&
1938 dl 1.1 (fn = this.fn) != null &&
1939     (a = this.src) != null &&
1940     (r = a.result) != null &&
1941     compareAndSet(0, 1)) {
1942 dl 1.20 if ((r instanceof AltResult) &&
1943     (ex = ((AltResult)r).ex) != null) {
1944     try {
1945     t = fn.apply(ex);
1946     } catch (Throwable rex) {
1947     dx = rex;
1948 dl 1.1 }
1949     }
1950 dl 1.6 else
1951     t = (T) r;
1952 dl 1.20 dst.internalComplete(t, dx);
1953 dl 1.1 }
1954     }
1955 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1956 dl 1.1 }
1957    
1958 dl 1.17 static final class ThenCopy<T> extends Completion {
1959     final CompletableFuture<? extends T> src;
1960     final CompletableFuture<T> dst;
1961     ThenCopy(CompletableFuture<? extends T> src,
1962     CompletableFuture<T> dst) {
1963     this.src = src; this.dst = dst;
1964     }
1965 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1966 dl 1.17 CompletableFuture<? extends T> a;
1967     CompletableFuture<T> dst;
1968 dl 1.20 Object r; Object t; Throwable ex;
1969 dl 1.17 if ((dst = this.dst) != null &&
1970     (a = this.src) != null &&
1971     (r = a.result) != null &&
1972     compareAndSet(0, 1)) {
1973 dl 1.20 if (r instanceof AltResult) {
1974     ex = ((AltResult)r).ex;
1975     t = null;
1976     }
1977     else {
1978     ex = null;
1979     t = r;
1980     }
1981     dst.internalComplete(t, ex);
1982 dl 1.17 }
1983     }
1984 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1985 dl 1.17 }
1986    
1987     static final class ThenHandle<T,U> extends Completion {
1988     final CompletableFuture<? extends T> src;
1989     final BiFunction<? super T, Throwable, ? extends U> fn;
1990     final CompletableFuture<U> dst;
1991     ThenHandle(CompletableFuture<? extends T> src,
1992     BiFunction<? super T, Throwable, ? extends U> fn,
1993     final CompletableFuture<U> dst) {
1994     this.src = src; this.fn = fn; this.dst = dst;
1995     }
1996 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1997 dl 1.17 CompletableFuture<? extends T> a;
1998     BiFunction<? super T, Throwable, ? extends U> fn;
1999     CompletableFuture<U> dst;
2000     Object r; T t; Throwable ex;
2001     if ((dst = this.dst) != null &&
2002     (fn = this.fn) != null &&
2003     (a = this.src) != null &&
2004     (r = a.result) != null &&
2005     compareAndSet(0, 1)) {
2006     if (r instanceof AltResult) {
2007     ex = ((AltResult)r).ex;
2008     t = null;
2009     }
2010     else {
2011     ex = null;
2012     t = (T) r;
2013     }
2014 dl 1.20 U u = null; Throwable dx = null;
2015 dl 1.17 try {
2016 dl 1.20 u = fn.apply(t, ex);
2017 dl 1.17 } catch (Throwable rex) {
2018 dl 1.20 dx = rex;
2019 dl 1.17 }
2020 dl 1.20 dst.internalComplete(u, dx);
2021 dl 1.17 }
2022     }
2023 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
2024 dl 1.17 }
2025    
2026     static final class ThenCompose<T,U> extends Completion {
2027     final CompletableFuture<? extends T> src;
2028     final Function<? super T, CompletableFuture<U>> fn;
2029     final CompletableFuture<U> dst;
2030     ThenCompose(CompletableFuture<? extends T> src,
2031     Function<? super T, CompletableFuture<U>> fn,
2032     final CompletableFuture<U> dst) {
2033     this.src = src; this.fn = fn; this.dst = dst;
2034     }
2035 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
2036 dl 1.17 CompletableFuture<? extends T> a;
2037     Function<? super T, CompletableFuture<U>> fn;
2038     CompletableFuture<U> dst;
2039     Object r; T t; Throwable ex;
2040     if ((dst = this.dst) != null &&
2041     (fn = this.fn) != null &&
2042     (a = this.src) != null &&
2043     (r = a.result) != null &&
2044     compareAndSet(0, 1)) {
2045     if (r instanceof AltResult) {
2046     ex = ((AltResult)r).ex;
2047     t = null;
2048     }
2049     else {
2050     ex = null;
2051     t = (T) r;
2052     }
2053     CompletableFuture<U> c = null;
2054 dl 1.20 U u = null;
2055     boolean complete = false;
2056 dl 1.17 if (ex == null) {
2057     try {
2058     c = fn.apply(t);
2059     } catch (Throwable rex) {
2060     ex = rex;
2061     }
2062     }
2063     if (ex != null || c == null) {
2064     if (ex == null)
2065     ex = new NullPointerException();
2066     }
2067 dl 1.18 else {
2068     ThenCopy<U> d = null;
2069     Object s;
2070     if ((s = c.result) == null) {
2071     CompletionNode p = new CompletionNode
2072     (d = new ThenCopy<U>(c, dst));
2073     while ((s = c.result) == null) {
2074     if (UNSAFE.compareAndSwapObject
2075     (c, COMPLETIONS, p.next = c.completions, p))
2076     break;
2077     }
2078 dl 1.17 }
2079 dl 1.18 if (s != null && (d == null || d.compareAndSet(0, 1))) {
2080 dl 1.20 complete = true;
2081 dl 1.18 if (s instanceof AltResult) {
2082     ex = ((AltResult)s).ex; // no rewrap
2083     u = null;
2084     }
2085     else
2086     u = (U) s;
2087 dl 1.17 }
2088     }
2089 dl 1.20 if (complete || ex != null)
2090     dst.internalComplete(u, ex);
2091     if (c != null)
2092     c.helpPostComplete();
2093 dl 1.17 }
2094     }
2095 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
2096 dl 1.17 }
2097    
2098 dl 1.1 /* ------------- then/and/or implementations -------------- */
2099    
2100 dl 1.20 @SuppressWarnings("unchecked") private <U> CompletableFuture<U> thenFunction
2101     (Function<? super T,? extends U> fn,
2102     Executor executor) {
2103 dl 1.1 if (fn == null) throw new NullPointerException();
2104     CompletableFuture<U> dst = new CompletableFuture<U>();
2105     ThenFunction<T,U> d = null;
2106 jsr166 1.2 Object r;
2107     if ((r = result) == null) {
2108 dl 1.1 CompletionNode p = new CompletionNode
2109     (d = new ThenFunction<T,U>(this, fn, dst, executor));
2110     while ((r = result) == null) {
2111     if (UNSAFE.compareAndSwapObject
2112     (this, COMPLETIONS, p.next = completions, p))
2113     break;
2114     }
2115     }
2116 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2117 dl 1.19 T t; Throwable ex;
2118 jsr166 1.2 if (r instanceof AltResult) {
2119 dl 1.19 ex = ((AltResult)r).ex;
2120 jsr166 1.2 t = null;
2121 dl 1.1 }
2122 dl 1.19 else {
2123     ex = null;
2124 jsr166 1.2 t = (T) r;
2125 dl 1.19 }
2126 dl 1.20 Executor e = executor;
2127     U u = null;
2128 jsr166 1.2 if (ex == null) {
2129 dl 1.1 try {
2130 dl 1.20 if (e != null)
2131     e.execute(new AsyncFunction<T,U>(t, fn, dst));
2132 dl 1.1 else
2133 dl 1.20 u = fn.apply(t);
2134 dl 1.1 } catch (Throwable rex) {
2135 dl 1.19 ex = rex;
2136 dl 1.1 }
2137     }
2138 dl 1.20 if (e == null || ex != null)
2139     dst.internalComplete(u, ex);
2140 dl 1.17 }
2141 dl 1.20 helpPostComplete();
2142 dl 1.1 return dst;
2143     }
2144    
2145 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> thenBlock
2146     (Block<? super T> fn,
2147     Executor executor) {
2148 dl 1.7 if (fn == null) throw new NullPointerException();
2149     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2150     ThenBlock<T> d = null;
2151     Object r;
2152     if ((r = result) == null) {
2153     CompletionNode p = new CompletionNode
2154     (d = new ThenBlock<T>(this, fn, dst, executor));
2155     while ((r = result) == null) {
2156     if (UNSAFE.compareAndSwapObject
2157     (this, COMPLETIONS, p.next = completions, p))
2158     break;
2159     }
2160     }
2161     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2162 dl 1.19 T t; Throwable ex;
2163 dl 1.7 if (r instanceof AltResult) {
2164 dl 1.19 ex = ((AltResult)r).ex;
2165 dl 1.7 t = null;
2166     }
2167 dl 1.19 else {
2168     ex = null;
2169 dl 1.7 t = (T) r;
2170 dl 1.19 }
2171 dl 1.20 Executor e = executor;
2172 dl 1.7 if (ex == null) {
2173     try {
2174 dl 1.20 if (e != null)
2175     e.execute(new AsyncBlock<T>(t, fn, dst));
2176     else
2177 dl 1.7 fn.accept(t);
2178     } catch (Throwable rex) {
2179 dl 1.19 ex = rex;
2180 dl 1.7 }
2181     }
2182 dl 1.20 if (e == null || ex != null)
2183     dst.internalComplete(null, ex);
2184 dl 1.17 }
2185 dl 1.20 helpPostComplete();
2186 dl 1.7 return dst;
2187     }
2188    
2189 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> thenRunnable
2190     (Runnable action,
2191     Executor executor) {
2192 dl 1.1 if (action == null) throw new NullPointerException();
2193     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2194     ThenRunnable<T> d = null;
2195 jsr166 1.2 Object r;
2196     if ((r = result) == null) {
2197 dl 1.1 CompletionNode p = new CompletionNode
2198     (d = new ThenRunnable<T>(this, action, dst, executor));
2199     while ((r = result) == null) {
2200     if (UNSAFE.compareAndSwapObject
2201     (this, COMPLETIONS, p.next = completions, p))
2202     break;
2203     }
2204     }
2205 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2206 dl 1.19 Throwable ex;
2207     if (r instanceof AltResult)
2208     ex = ((AltResult)r).ex;
2209     else
2210     ex = null;
2211 dl 1.20 Executor e = executor;
2212 jsr166 1.2 if (ex == null) {
2213 dl 1.1 try {
2214 dl 1.20 if (e != null)
2215     e.execute(new AsyncRunnable(action, dst));
2216     else
2217 dl 1.1 action.run();
2218     } catch (Throwable rex) {
2219 dl 1.19 ex = rex;
2220 dl 1.1 }
2221     }
2222 dl 1.20 if (e == null || ex != null)
2223     dst.internalComplete(null, ex);
2224 dl 1.17 }
2225 dl 1.20 helpPostComplete();
2226 dl 1.1 return dst;
2227     }
2228    
2229 dl 1.20 @SuppressWarnings("unchecked") private <U,V> CompletableFuture<V> andFunction
2230     (CompletableFuture<? extends U> other,
2231     BiFunction<? super T,? super U,? extends V> fn,
2232     Executor executor) {
2233 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
2234 jsr166 1.2 CompletableFuture<V> dst = new CompletableFuture<V>();
2235     AndFunction<T,U,V> d = null;
2236     Object r, s = null;
2237     if ((r = result) == null || (s = other.result) == null) {
2238 dl 1.1 d = new AndFunction<T,U,V>(this, other, fn, dst, executor);
2239     CompletionNode q = null, p = new CompletionNode(d);
2240     while ((r == null && (r = result) == null) ||
2241     (s == null && (s = other.result) == null)) {
2242     if (q != null) {
2243     if (s != null ||
2244     UNSAFE.compareAndSwapObject
2245     (other, COMPLETIONS, q.next = other.completions, q))
2246     break;
2247     }
2248     else if (r != null ||
2249     UNSAFE.compareAndSwapObject
2250     (this, COMPLETIONS, p.next = completions, p)) {
2251     if (s != null)
2252     break;
2253     q = new CompletionNode(d);
2254     }
2255     }
2256     }
2257 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2258 dl 1.19 T t; U u; Throwable ex;
2259 jsr166 1.2 if (r instanceof AltResult) {
2260 dl 1.19 ex = ((AltResult)r).ex;
2261 jsr166 1.2 t = null;
2262 dl 1.1 }
2263 dl 1.19 else {
2264     ex = null;
2265 jsr166 1.2 t = (T) r;
2266 dl 1.19 }
2267 dl 1.1 if (ex != null)
2268     u = null;
2269     else if (s instanceof AltResult) {
2270 dl 1.19 ex = ((AltResult)s).ex;
2271 dl 1.1 u = null;
2272     }
2273     else
2274 jsr166 1.2 u = (U) s;
2275 dl 1.20 Executor e = executor;
2276     V v = null;
2277 jsr166 1.2 if (ex == null) {
2278 dl 1.1 try {
2279 dl 1.20 if (e != null)
2280     e.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
2281 dl 1.1 else
2282 dl 1.20 v = fn.apply(t, u);
2283 dl 1.1 } catch (Throwable rex) {
2284 dl 1.19 ex = rex;
2285 dl 1.1 }
2286     }
2287 dl 1.20 if (e == null || ex != null)
2288     dst.internalComplete(v, ex);
2289 jsr166 1.2 }
2290 dl 1.20 helpPostComplete();
2291     other.helpPostComplete();
2292 jsr166 1.2 return dst;
2293 dl 1.1 }
2294    
2295 dl 1.20 @SuppressWarnings("unchecked") private <U> CompletableFuture<Void> andBlock
2296     (CompletableFuture<? extends U> other,
2297     BiBlock<? super T,? super U> fn,
2298     Executor executor) {
2299 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
2300     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2301     AndBlock<T,U> d = null;
2302     Object r, s = null;
2303     if ((r = result) == null || (s = other.result) == null) {
2304     d = new AndBlock<T,U>(this, other, fn, dst, executor);
2305     CompletionNode q = null, p = new CompletionNode(d);
2306     while ((r == null && (r = result) == null) ||
2307     (s == null && (s = other.result) == null)) {
2308     if (q != null) {
2309     if (s != null ||
2310     UNSAFE.compareAndSwapObject
2311     (other, COMPLETIONS, q.next = other.completions, q))
2312     break;
2313     }
2314     else if (r != null ||
2315     UNSAFE.compareAndSwapObject
2316     (this, COMPLETIONS, p.next = completions, p)) {
2317     if (s != null)
2318     break;
2319     q = new CompletionNode(d);
2320     }
2321     }
2322     }
2323     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2324 dl 1.19 T t; U u; Throwable ex;
2325 dl 1.7 if (r instanceof AltResult) {
2326 dl 1.19 ex = ((AltResult)r).ex;
2327 dl 1.7 t = null;
2328     }
2329 dl 1.19 else {
2330     ex = null;
2331 dl 1.7 t = (T) r;
2332 dl 1.19 }
2333 dl 1.7 if (ex != null)
2334     u = null;
2335     else if (s instanceof AltResult) {
2336 dl 1.19 ex = ((AltResult)s).ex;
2337 dl 1.7 u = null;
2338     }
2339     else
2340     u = (U) s;
2341 dl 1.20 Executor e = executor;
2342 dl 1.7 if (ex == null) {
2343     try {
2344 dl 1.20 if (e != null)
2345     e.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
2346     else
2347 dl 1.7 fn.accept(t, u);
2348     } catch (Throwable rex) {
2349 dl 1.19 ex = rex;
2350 dl 1.7 }
2351     }
2352 dl 1.20 if (e == null || ex != null)
2353     dst.internalComplete(null, ex);
2354 dl 1.7 }
2355 dl 1.20 helpPostComplete();
2356     other.helpPostComplete();
2357 dl 1.7 return dst;
2358     }
2359    
2360 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> andRunnable
2361     (CompletableFuture<?> other,
2362     Runnable action,
2363     Executor executor) {
2364 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2365 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2366     AndRunnable<T> d = null;
2367     Object r, s = null;
2368     if ((r = result) == null || (s = other.result) == null) {
2369 dl 1.1 d = new AndRunnable<T>(this, other, action, dst, executor);
2370     CompletionNode q = null, p = new CompletionNode(d);
2371     while ((r == null && (r = result) == null) ||
2372     (s == null && (s = other.result) == null)) {
2373     if (q != null) {
2374     if (s != null ||
2375     UNSAFE.compareAndSwapObject
2376     (other, COMPLETIONS, q.next = other.completions, q))
2377     break;
2378     }
2379     else if (r != null ||
2380     UNSAFE.compareAndSwapObject
2381     (this, COMPLETIONS, p.next = completions, p)) {
2382     if (s != null)
2383     break;
2384     q = new CompletionNode(d);
2385     }
2386     }
2387     }
2388 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2389 dl 1.19 Throwable ex;
2390     if (r instanceof AltResult)
2391     ex = ((AltResult)r).ex;
2392     else
2393     ex = null;
2394     if (ex == null && (s instanceof AltResult))
2395     ex = ((AltResult)s).ex;
2396 dl 1.20 Executor e = executor;
2397 dl 1.19 if (ex == null) {
2398 dl 1.1 try {
2399 dl 1.20 if (e != null)
2400     e.execute(new AsyncRunnable(action, dst));
2401     else
2402 dl 1.1 action.run();
2403     } catch (Throwable rex) {
2404 dl 1.19 ex = rex;
2405 dl 1.1 }
2406     }
2407 dl 1.20 if (e == null || ex != null)
2408     dst.internalComplete(null, ex);
2409 jsr166 1.2 }
2410 dl 1.20 helpPostComplete();
2411     other.helpPostComplete();
2412 jsr166 1.2 return dst;
2413 dl 1.1 }
2414    
2415 dl 1.20 @SuppressWarnings("unchecked") private <U> CompletableFuture<U> orFunction
2416     (CompletableFuture<? extends T> other,
2417     Function<? super T, U> fn,
2418     Executor executor) {
2419 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
2420 jsr166 1.2 CompletableFuture<U> dst = new CompletableFuture<U>();
2421     OrFunction<T,U> d = null;
2422     Object r;
2423     if ((r = result) == null && (r = other.result) == null) {
2424 dl 1.1 d = new OrFunction<T,U>(this, other, fn, dst, executor);
2425     CompletionNode q = null, p = new CompletionNode(d);
2426     while ((r = result) == null && (r = other.result) == null) {
2427     if (q != null) {
2428     if (UNSAFE.compareAndSwapObject
2429     (other, COMPLETIONS, q.next = other.completions, q))
2430     break;
2431     }
2432     else if (UNSAFE.compareAndSwapObject
2433     (this, COMPLETIONS, p.next = completions, p))
2434     q = new CompletionNode(d);
2435     }
2436 jsr166 1.2 }
2437     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2438 dl 1.19 T t; Throwable ex;
2439 jsr166 1.2 if (r instanceof AltResult) {
2440 dl 1.19 ex = ((AltResult)r).ex;
2441 jsr166 1.2 t = null;
2442 dl 1.1 }
2443 dl 1.19 else {
2444     ex = null;
2445 jsr166 1.2 t = (T) r;
2446 dl 1.19 }
2447 dl 1.20 Executor e = executor;
2448     U u = null;
2449 jsr166 1.2 if (ex == null) {
2450 dl 1.1 try {
2451 dl 1.20 if (e != null)
2452     e.execute(new AsyncFunction<T,U>(t, fn, dst));
2453 dl 1.1 else
2454 dl 1.20 u = fn.apply(t);
2455 dl 1.1 } catch (Throwable rex) {
2456 dl 1.19 ex = rex;
2457 dl 1.1 }
2458     }
2459 dl 1.20 if (e == null || ex != null)
2460     dst.internalComplete(u, ex);
2461 jsr166 1.2 }
2462 dl 1.20 helpPostComplete();
2463     other.helpPostComplete();
2464 jsr166 1.2 return dst;
2465 dl 1.1 }
2466    
2467 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> orBlock
2468     (CompletableFuture<? extends T> other,
2469     Block<? super T> fn,
2470     Executor executor) {
2471 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
2472     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2473     OrBlock<T> d = null;
2474     Object r;
2475     if ((r = result) == null && (r = other.result) == null) {
2476     d = new OrBlock<T>(this, other, fn, dst, executor);
2477     CompletionNode q = null, p = new CompletionNode(d);
2478     while ((r = result) == null && (r = other.result) == null) {
2479     if (q != null) {
2480     if (UNSAFE.compareAndSwapObject
2481     (other, COMPLETIONS, q.next = other.completions, q))
2482     break;
2483     }
2484     else if (UNSAFE.compareAndSwapObject
2485     (this, COMPLETIONS, p.next = completions, p))
2486     q = new CompletionNode(d);
2487     }
2488     }
2489     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2490 dl 1.19 T t; Throwable ex;
2491 dl 1.7 if (r instanceof AltResult) {
2492 dl 1.19 ex = ((AltResult)r).ex;
2493 dl 1.7 t = null;
2494     }
2495 dl 1.19 else {
2496     ex = null;
2497 dl 1.7 t = (T) r;
2498 dl 1.19 }
2499 dl 1.20 Executor e = executor;
2500 dl 1.7 if (ex == null) {
2501     try {
2502 dl 1.20 if (e != null)
2503     e.execute(new AsyncBlock<T>(t, fn, dst));
2504     else
2505 dl 1.7 fn.accept(t);
2506     } catch (Throwable rex) {
2507 dl 1.19 ex = rex;
2508 dl 1.7 }
2509     }
2510 dl 1.20 if (e == null || ex != null)
2511     dst.internalComplete(null, ex);
2512 dl 1.7 }
2513 dl 1.20 helpPostComplete();
2514     other.helpPostComplete();
2515 dl 1.7 return dst;
2516     }
2517    
2518 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> orRunnable
2519     (CompletableFuture<?> other,
2520     Runnable action,
2521     Executor executor) {
2522 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2523 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2524     OrRunnable<T> d = null;
2525     Object r;
2526     if ((r = result) == null && (r = other.result) == null) {
2527 dl 1.1 d = new OrRunnable<T>(this, other, action, dst, executor);
2528     CompletionNode q = null, p = new CompletionNode(d);
2529     while ((r = result) == null && (r = other.result) == null) {
2530     if (q != null) {
2531     if (UNSAFE.compareAndSwapObject
2532     (other, COMPLETIONS, q.next = other.completions, q))
2533     break;
2534     }
2535     else if (UNSAFE.compareAndSwapObject
2536     (this, COMPLETIONS, p.next = completions, p))
2537     q = new CompletionNode(d);
2538     }
2539 jsr166 1.2 }
2540     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2541 dl 1.19 Throwable ex;
2542     if (r instanceof AltResult)
2543     ex = ((AltResult)r).ex;
2544     else
2545     ex = null;
2546 dl 1.20 Executor e = executor;
2547 dl 1.19 if (ex == null) {
2548 dl 1.1 try {
2549 dl 1.20 if (e != null)
2550     e.execute(new AsyncRunnable(action, dst));
2551     else
2552 dl 1.1 action.run();
2553     } catch (Throwable rex) {
2554 dl 1.19 ex = rex;
2555 dl 1.1 }
2556     }
2557 dl 1.20 if (e == null || ex != null)
2558     dst.internalComplete(null, ex);
2559 jsr166 1.2 }
2560 dl 1.20 helpPostComplete();
2561     other.helpPostComplete();
2562 jsr166 1.2 return dst;
2563 dl 1.1 }
2564    
2565     // Unsafe mechanics
2566     private static final sun.misc.Unsafe UNSAFE;
2567     private static final long RESULT;
2568     private static final long WAITERS;
2569     private static final long COMPLETIONS;
2570     static {
2571     try {
2572     UNSAFE = sun.misc.Unsafe.getUnsafe();
2573     Class<?> k = CompletableFuture.class;
2574     RESULT = UNSAFE.objectFieldOffset
2575     (k.getDeclaredField("result"));
2576     WAITERS = UNSAFE.objectFieldOffset
2577     (k.getDeclaredField("waiters"));
2578     COMPLETIONS = UNSAFE.objectFieldOffset
2579     (k.getDeclaredField("completions"));
2580     } catch (Exception e) {
2581     throw new Error(e);
2582     }
2583     }
2584     }