ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.20
Committed: Mon Dec 31 15:01:42 2012 UTC (11 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.19: +398 -354 lines
Log Message:
Rename getValue to join; cleanup pass

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.7 import java.util.function.Block;
10     import java.util.function.BiBlock;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22 dl 1.19 import java.util.concurrent.CompletionException;
23 dl 1.1 import java.util.concurrent.atomic.AtomicInteger;
24     import java.util.concurrent.locks.LockSupport;
25    
26     /**
27     * A {@link Future} that may be explicitly completed (setting its
28     * value and status), and may include dependent functions and actions
29     * that trigger upon its completion.
30     *
31 dl 1.7 * <p>Similar methods are available for Functions, Blocks, and
32     * Runnables, depending on whether actions require arguments and/or
33     * produce results. Functions and actions supplied for dependent
34     * completions (mainly using methods with prefix {@code then}) may be
35 dl 1.1 * performed by the thread that completes the current
36     * CompletableFuture, or by any other caller of these methods. There
37     * are no guarantees about the order of processing completions unless
38 dl 1.17 * constrained by these methods.
39 dl 1.1 *
40 jsr166 1.4 * <p>When two or more threads attempt to {@link #complete} or {@link
41 dl 1.17 * #completeExceptionally} a CompletableFuture, only one of them
42 dl 1.19 * succeeds.
43     *
44     * <p> Upon exceptional completion, or when a completion entails
45 dl 1.5 * computation of a function or action, and it terminates abruptly
46 dl 1.19 * with an (unchecked) exception or error, then further completions
47     * act as {@code completeExceptionally} with a {@link
48     * CompletionException} holding that exception as its cause. If a
49     * CompletableFuture completes exceptionally, and is not followed by a
50     * {@link #exceptionally} or {@link #handle} completion, then all of
51     * its dependents (and their dependents) also complete exceptionally
52 dl 1.20 * with CompletionExceptions holding the ultimate cause. In case of a
53     * CompletionException, methods {@link #get()} and {@link #get(long,
54     * TimeUnit)} throw a {@link ExecutionException} with the same cause
55     * as would be held in the corresponding CompletionException. However,
56     * in these cases, methods {@link #join()} and {@link #getNow} throw
57     * the CompletionException, which simplifies usage especially within
58     * other completion functions.
59 dl 1.1 *
60     * <p>CompletableFutures themselves do not execute asynchronously.
61     * However, the {@code async} methods provide commonly useful ways to
62 jsr166 1.8 * commence asynchronous processing, using either a given {@link
63 dl 1.1 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
64     * function or action that will result in the completion of a new
65     * CompletableFuture.
66     *
67     * @author Doug Lea
68     * @since 1.8
69     */
70     public class CompletableFuture<T> implements Future<T> {
71     /*
72 dl 1.20 * Overview:
73 dl 1.1 *
74 dl 1.20 * 1. Non-nullness of field result (set via CAS) indicates
75     * done. An AltResult is used to box null as a result, as well as
76     * to hold exceptions. Using a single field makes completion fast
77     * and simple to detect and trigger, at the expense of a lot of
78     * encoding and decoding that infiltrates many methods. One minor
79     * simplification relies on the (static) NIL (to box null results)
80     * being the only AltResult with a null exception field, so we
81     * don't usually need explicit comparisons with NIL.
82 dl 1.1 *
83     * 2. Waiters are held in a Treiber stack similar to the one used
84 dl 1.20 * in FutureTask, Phaser, and SynchronousQueue. See their
85     * internal documentation for details.
86 dl 1.1 *
87     * 3. Completions are also kept in a list/stack, and pulled off
88 dl 1.20 * and run when completion is triggered. (We could in fact use the
89     * the same stack as for waiters, but would give up the potential
90     * parallelism obtained because woken waiters help release/run
91     * others (see method postComplete). Because post-processing may
92     * race with direct calls, completions extend AtomicInteger so
93     * callers can claim the action via compareAndSet(0, 1). The
94     * Completion.run methods are all written a boringly similar
95     * uniform way (that sometimes includes unnecessary-looking
96     * checks, kept to maintain uniformity). There are enough
97     * dimensions upon which they differ that factoring to use common
98     * code isn't worthwhile.
99     *
100     * 4. The exported then/and/or methods do support a bit of
101     * factoring (see thenFunction, andBlock, etc). They must cope
102     * with the intrinsic races surrounding addition of a dependent
103     * action versus performing the action directly because the task
104     * is already complete. For example, a CF may not be complete
105     * upon entry, so a dependent completion is added, but by the time
106     * it is added, the target CF is complete, so must be directly
107     * executed. This is all done while avoiding unnecessary object
108     * construction in safe-bypass cases.
109 dl 1.1 */
110    
111 dl 1.20 // preliminary class definitions
112    
113 dl 1.1 static final class AltResult {
114     final Throwable ex; // null only for NIL
115 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
116 dl 1.1 }
117    
118     static final AltResult NIL = new AltResult(null);
119    
120     /**
121     * Simple linked list nodes to record waiting threads in a Treiber
122     * stack. See other classes such as Phaser and SynchronousQueue
123     * for more detailed explanation.
124     */
125     static final class WaitNode {
126     volatile Thread thread;
127     volatile WaitNode next;
128     }
129    
130     /**
131     * Simple linked list nodes to record completions, used in
132 dl 1.20 * basically the same way as WaitNodes. (We separate nodes from
133     * the Completions themselves mainly because for the And and Or
134     * methods, the same Completion object resides in two lists.)
135 dl 1.1 */
136     static final class CompletionNode {
137 jsr166 1.2 final Completion completion;
138 dl 1.1 volatile CompletionNode next;
139 jsr166 1.2 CompletionNode(Completion completion) { this.completion = completion; }
140 dl 1.1 }
141    
142 dl 1.20
143     // Fields
144    
145     volatile Object result; // Either the result or boxed AltResult
146 dl 1.1 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
147     volatile CompletionNode completions; // list (Treiber stack) of completions
148    
149 dl 1.20 // Basic utilities for triggering and processing completions
150     // (The Completion and Async classes and internal methods that
151     // use them are defined after the public methods.)
152    
153     /**
154     * Removes and signals all waiting threads and runs all completions.
155     */
156     final void postComplete() {
157     WaitNode q; Thread t;
158     while ((q = waiters) != null) {
159     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
160     (t = q.thread) != null) {
161     q.thread = null;
162     LockSupport.unpark(t);
163     }
164     }
165    
166     CompletionNode h; Completion c;
167     while ((h = completions) != null) {
168     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
169     (c = h.completion) != null)
170     c.run();
171     }
172     }
173    
174     /**
175     * Triggers completion with the encoding of the given arguments:
176     * if the exception is non-null, encodes it as a wrapped
177     * CompletionException unless it is one already. Otherwise uses
178     * the given result, boxed as NIL if null.
179     */
180     final void internalComplete(Object v, Throwable ex) {
181     if (result == null)
182     UNSAFE.compareAndSwapObject
183     (this, RESULT, null,
184     (ex == null) ? (v == null) ? NIL : v :
185     new AltResult((ex instanceof CompletionException) ? ex :
186     new CompletionException(ex)));
187     postComplete(); // help out even if not triggered
188     }
189    
190     /**
191     * If triggered, help release and/or process completions
192     */
193     final void helpPostComplete() {
194     if (result != null)
195     postComplete();
196     }
197    
198     // public methods
199    
200 dl 1.1 /**
201     * Creates a new incomplete CompletableFuture.
202     */
203     public CompletableFuture() {
204     }
205    
206     /**
207     * Asynchronously executes in the {@link
208     * ForkJoinPool#commonPool()}, a task that completes the returned
209     * CompletableFuture with the result of the given Supplier.
210     *
211     * @param supplier a function returning the value to be used
212 jsr166 1.14 * to complete the returned CompletableFuture
213 jsr166 1.11 * @return the CompletableFuture
214 dl 1.1 */
215 dl 1.7 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
216 dl 1.1 if (supplier == null) throw new NullPointerException();
217     CompletableFuture<U> f = new CompletableFuture<U>();
218     ForkJoinPool.commonPool().
219 dl 1.7 execute((ForkJoinTask<?>)new AsyncSupplier<U>(supplier, f));
220 dl 1.1 return f;
221     }
222    
223     /**
224     * Asynchronously executes using the given executor, a task that
225     * completes the returned CompletableFuture with the result of the
226     * given Supplier.
227     *
228     * @param supplier a function returning the value to be used
229 jsr166 1.11 * to complete the returned CompletableFuture
230 dl 1.1 * @param executor the executor to use for asynchronous execution
231 jsr166 1.11 * @return the CompletableFuture
232 dl 1.1 */
233 dl 1.7 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
234     Executor executor) {
235 dl 1.1 if (executor == null || supplier == null)
236     throw new NullPointerException();
237     CompletableFuture<U> f = new CompletableFuture<U>();
238 dl 1.7 executor.execute(new AsyncSupplier<U>(supplier, f));
239 dl 1.1 return f;
240     }
241    
242     /**
243     * Asynchronously executes in the {@link
244     * ForkJoinPool#commonPool()} a task that runs the given action,
245 jsr166 1.11 * and then completes the returned CompletableFuture.
246 dl 1.1 *
247     * @param runnable the action to run before completing the
248 jsr166 1.11 * returned CompletableFuture
249     * @return the CompletableFuture
250 dl 1.1 */
251 dl 1.7 public static CompletableFuture<Void> runAsync(Runnable runnable) {
252 dl 1.1 if (runnable == null) throw new NullPointerException();
253     CompletableFuture<Void> f = new CompletableFuture<Void>();
254     ForkJoinPool.commonPool().
255     execute((ForkJoinTask<?>)new AsyncRunnable(runnable, f));
256     return f;
257     }
258    
259     /**
260     * Asynchronously executes using the given executor, a task that
261     * runs the given action, and then completes the returned
262 jsr166 1.11 * CompletableFuture.
263 dl 1.1 *
264     * @param runnable the action to run before completing the
265 jsr166 1.11 * returned CompletableFuture
266 dl 1.1 * @param executor the executor to use for asynchronous execution
267 jsr166 1.11 * @return the CompletableFuture
268 dl 1.1 */
269 dl 1.7 public static CompletableFuture<Void> runAsync(Runnable runnable,
270 jsr166 1.12 Executor executor) {
271 dl 1.1 if (executor == null || runnable == null)
272     throw new NullPointerException();
273     CompletableFuture<Void> f = new CompletableFuture<Void>();
274     executor.execute(new AsyncRunnable(runnable, f));
275     return f;
276     }
277    
278     /**
279     * Returns {@code true} if completed in any fashion: normally,
280 jsr166 1.16 * exceptionally, or via cancellation.
281 dl 1.1 *
282     * @return {@code true} if completed
283     */
284     public boolean isDone() {
285     return result != null;
286     }
287    
288     /**
289 dl 1.19 * Waits if necessary for the computation to complete, and then
290     * retrieves its result.
291     *
292     * @return the computed result
293     * @throws CancellationException if the computation was cancelled
294     * @throws ExecutionException if the computation threw an
295     * exception
296     * @throws InterruptedException if the current thread was interrupted
297     * while waiting
298     */
299 dl 1.20 @SuppressWarnings("unchecked") public T get() throws InterruptedException, ExecutionException {
300     Object r; Throwable ex, cause;
301 dl 1.19 if ((r = result) == null && (r = waitingGet(true)) == null)
302     throw new InterruptedException();
303     if (r instanceof AltResult) {
304     if ((ex = ((AltResult)r).ex) != null) {
305     if (ex instanceof CancellationException)
306     throw (CancellationException)ex;
307 dl 1.20 if ((ex instanceof CompletionException) &&
308     (cause = ex.getCause()) != null)
309     ex = cause;
310 dl 1.19 throw new ExecutionException(ex);
311     }
312     return null;
313     }
314     return (T)r;
315     }
316    
317     /**
318 dl 1.20 * Waits if necessary for at most the given time for completion,
319     * and then retrieves its result, if available.
320 dl 1.1 *
321 dl 1.20 * @param timeout the maximum time to wait
322     * @param unit the time unit of the timeout argument
323     * @return the computed result
324 dl 1.19 * @throws CancellationException if the computation was cancelled
325 dl 1.20 * @throws ExecutionException if the computation threw an
326 dl 1.19 * exception
327 dl 1.20 * @throws InterruptedException if the current thread was interrupted
328     * while waiting
329     * @throws TimeoutException if the wait timed out
330 dl 1.1 */
331 dl 1.20 @SuppressWarnings("unchecked") public T get(long timeout, TimeUnit unit)
332     throws InterruptedException, ExecutionException, TimeoutException {
333     Object r; Throwable ex, cause;
334     long nanos = unit.toNanos(timeout);
335     if (Thread.interrupted())
336     throw new InterruptedException();
337 jsr166 1.2 if ((r = result) == null)
338 dl 1.20 r = timedAwaitDone(nanos);
339 jsr166 1.2 if (r instanceof AltResult) {
340 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
341 dl 1.19 if (ex instanceof CancellationException)
342     throw (CancellationException)ex;
343 dl 1.20 if ((ex instanceof CompletionException) &&
344     (cause = ex.getCause()) != null)
345     ex = cause;
346     throw new ExecutionException(ex);
347 dl 1.5 }
348 jsr166 1.2 return null;
349     }
350     return (T)r;
351 dl 1.1 }
352    
353     /**
354 dl 1.20 * Returns the result value when complete, or throws an
355     * (unchecked) exception if completed exceptionally. To better
356     * conform with the use of common functional forms, if a
357     * computation involved in the completion of this
358     * CompletableFuture threw an exception, this method throws an
359     * (unchecked) {@link CompletionException} with the underlying
360     * exception as its cause.
361 dl 1.1 *
362 dl 1.20 * @return the result value
363 dl 1.19 * @throws CancellationException if the computation was cancelled
364 dl 1.20 * @throws CompletionException if a completion computation threw
365     * an exception
366 dl 1.1 */
367 dl 1.20 @SuppressWarnings("unchecked") public T join() {
368 dl 1.1 Object r; Throwable ex;
369 jsr166 1.2 if ((r = result) == null)
370 dl 1.20 r = waitingGet(false);
371 jsr166 1.2 if (r instanceof AltResult) {
372 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
373 dl 1.19 if (ex instanceof CancellationException)
374     throw (CancellationException)ex;
375     if (ex instanceof CompletionException)
376     throw (CompletionException)ex;
377     throw new CompletionException(ex);
378 dl 1.5 }
379 jsr166 1.2 return null;
380     }
381     return (T)r;
382 dl 1.1 }
383    
384     /**
385 dl 1.20 * Returns the result value (or throws any encountered exception)
386     * if completed, else returns the given valueIfAbsent.
387 dl 1.1 *
388 dl 1.20 * @param valueIfAbsent the value to return if not completed
389     * @return the result value, if completed, else the given valueIfAbsent
390 dl 1.1 * @throws CancellationException if the computation was cancelled
391 dl 1.20 * @throws CompletionException if a completion computation threw
392     * an exception
393 dl 1.1 */
394 dl 1.20 @SuppressWarnings("unchecked") public T getNow(T valueIfAbsent) {
395 dl 1.1 Object r; Throwable ex;
396 jsr166 1.2 if ((r = result) == null)
397 dl 1.20 return valueIfAbsent;
398 jsr166 1.2 if (r instanceof AltResult) {
399 dl 1.5 if ((ex = ((AltResult)r).ex) != null) {
400 dl 1.19 if (ex instanceof CancellationException)
401     throw (CancellationException)ex;
402 dl 1.20 if (ex instanceof CompletionException)
403     throw (CompletionException)ex;
404     throw new CompletionException(ex);
405 dl 1.5 }
406 jsr166 1.2 return null;
407     }
408     return (T)r;
409 dl 1.1 }
410    
411     /**
412     * If not already completed, sets the value returned by {@link
413     * #get()} and related methods to the given value.
414     *
415     * @param value the result value
416     * @return true if this invocation caused this CompletableFuture
417 jsr166 1.14 * to transition to a completed state, else false
418 dl 1.1 */
419     public boolean complete(T value) {
420 dl 1.20 boolean triggered =
421     result == null &&
422 dl 1.1 UNSAFE.compareAndSwapObject(this, RESULT, null,
423 dl 1.20 value == null ? NIL : value);
424     postComplete();
425     return triggered;
426 dl 1.19 }
427    
428     /**
429 dl 1.1 * If not already completed, causes invocations of {@link #get()}
430     * and related methods to throw the given exception.
431     *
432     * @param ex the exception
433     * @return true if this invocation caused this CompletableFuture
434 jsr166 1.14 * to transition to a completed state, else false
435 dl 1.1 */
436     public boolean completeExceptionally(Throwable ex) {
437     if (ex == null) throw new NullPointerException();
438 dl 1.20 boolean triggered =
439     result == null &&
440     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
441     postComplete();
442     return triggered;
443 dl 1.19 }
444    
445     /**
446 dl 1.1 * Creates and returns a CompletableFuture that is completed with
447     * the result of the given function of this CompletableFuture.
448     * If this CompletableFuture completes exceptionally,
449     * then the returned CompletableFuture also does so,
450 dl 1.19 * with a CompletionException holding this exception as
451 dl 1.1 * its cause.
452     *
453     * @param fn the function to use to compute the value of
454     * the returned CompletableFuture
455     * @return the new CompletableFuture
456     */
457 dl 1.7 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
458 dl 1.1 return thenFunction(fn, null);
459     }
460    
461     /**
462     * Creates and returns a CompletableFuture that is asynchronously
463     * completed using the {@link ForkJoinPool#commonPool()} with the
464     * result of the given function of this CompletableFuture. If
465     * this CompletableFuture completes exceptionally, then the
466     * returned CompletableFuture also does so, with a
467 dl 1.19 * CompletionException holding this exception as its cause.
468 dl 1.1 *
469     * @param fn the function to use to compute the value of
470     * the returned CompletableFuture
471     * @return the new CompletableFuture
472     */
473 dl 1.7 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
474 dl 1.1 return thenFunction(fn, ForkJoinPool.commonPool());
475     }
476    
477     /**
478     * Creates and returns a CompletableFuture that is asynchronously
479     * completed using the given executor with the result of the given
480     * function of this CompletableFuture. If this CompletableFuture
481     * completes exceptionally, then the returned CompletableFuture
482 dl 1.19 * also does so, with a CompletionException holding this exception as
483 dl 1.1 * its cause.
484     *
485     * @param fn the function to use to compute the value of
486     * the returned CompletableFuture
487     * @param executor the executor to use for asynchronous execution
488     * @return the new CompletableFuture
489     */
490 dl 1.7 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
491 jsr166 1.12 Executor executor) {
492 dl 1.1 if (executor == null) throw new NullPointerException();
493     return thenFunction(fn, executor);
494     }
495    
496     /**
497     * Creates and returns a CompletableFuture that is completed after
498 dl 1.7 * performing the given action with this CompletableFuture's
499     * result when it completes. If this CompletableFuture
500     * completes exceptionally, then the returned CompletableFuture
501 dl 1.19 * also does so, with a CompletionException holding this exception as
502 dl 1.7 * its cause.
503     *
504     * @param block the action to perform before completing the
505     * returned CompletableFuture
506     * @return the new CompletableFuture
507     */
508     public CompletableFuture<Void> thenAccept(Block<? super T> block) {
509     return thenBlock(block, null);
510     }
511    
512     /**
513     * Creates and returns a CompletableFuture that is asynchronously
514     * completed using the {@link ForkJoinPool#commonPool()} with this
515     * CompletableFuture's result when it completes. If this
516     * CompletableFuture completes exceptionally, then the returned
517 dl 1.19 * CompletableFuture also does so, with a CompletionException holding
518 dl 1.7 * this exception as its cause.
519     *
520     * @param block the action to perform before completing the
521     * returned CompletableFuture
522     * @return the new CompletableFuture
523     */
524     public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block) {
525     return thenBlock(block, ForkJoinPool.commonPool());
526     }
527    
528     /**
529     * Creates and returns a CompletableFuture that is asynchronously
530     * completed using the given executor with this
531     * CompletableFuture's result when it completes. If this
532     * CompletableFuture completes exceptionally, then the returned
533 dl 1.19 * CompletableFuture also does so, with a CompletionException holding
534 dl 1.7 * this exception as its cause.
535     *
536     * @param block the action to perform before completing the
537     * returned CompletableFuture
538     * @param executor the executor to use for asynchronous execution
539     * @return the new CompletableFuture
540     */
541     public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block,
542     Executor executor) {
543     if (executor == null) throw new NullPointerException();
544     return thenBlock(block, executor);
545     }
546    
547     /**
548     * Creates and returns a CompletableFuture that is completed after
549     * performing the given action when this CompletableFuture
550 dl 1.1 * completes. If this CompletableFuture completes exceptionally,
551     * then the returned CompletableFuture also does so, with a
552 dl 1.19 * CompletionException holding this exception as its cause.
553 dl 1.1 *
554     * @param action the action to perform before completing the
555     * returned CompletableFuture
556     * @return the new CompletableFuture
557     */
558 dl 1.7 public CompletableFuture<Void> thenRun(Runnable action) {
559 dl 1.1 return thenRunnable(action, null);
560     }
561    
562     /**
563     * Creates and returns a CompletableFuture that is asynchronously
564     * completed using the {@link ForkJoinPool#commonPool()} after
565 dl 1.7 * performing the given action when this CompletableFuture
566 dl 1.1 * completes. If this CompletableFuture completes exceptionally,
567     * then the returned CompletableFuture also does so, with a
568 dl 1.19 * CompletionException holding this exception as its cause.
569 dl 1.1 *
570     * @param action the action to perform before completing the
571     * returned CompletableFuture
572     * @return the new CompletableFuture
573     */
574 dl 1.7 public CompletableFuture<Void> thenRunAsync(Runnable action) {
575 dl 1.1 return thenRunnable(action, ForkJoinPool.commonPool());
576     }
577    
578     /**
579     * Creates and returns a CompletableFuture that is asynchronously
580     * completed using the given executor after performing the given
581 dl 1.7 * action when this CompletableFuture completes. If this
582 dl 1.1 * CompletableFuture completes exceptionally, then the returned
583 dl 1.19 * CompletableFuture also does so, with a CompletionException holding
584 dl 1.1 * this exception as its cause.
585     *
586     * @param action the action to perform before completing the
587     * returned CompletableFuture
588     * @param executor the executor to use for asynchronous execution
589     * @return the new CompletableFuture
590     */
591 jsr166 1.12 public CompletableFuture<Void> thenRunAsync(Runnable action,
592     Executor executor) {
593 dl 1.1 if (executor == null) throw new NullPointerException();
594     return thenRunnable(action, executor);
595     }
596    
597     /**
598     * Creates and returns a CompletableFuture that is completed with
599     * the result of the given function of this and the other given
600 dl 1.7 * CompletableFuture's results when both complete. If this or
601 dl 1.1 * the other CompletableFuture complete exceptionally, then the
602     * returned CompletableFuture also does so, with a
603 dl 1.19 * CompletionException holding the exception as its cause.
604 dl 1.1 *
605     * @param other the other CompletableFuture
606     * @param fn the function to use to compute the value of
607     * the returned CompletableFuture
608     * @return the new CompletableFuture
609     */
610 dl 1.17 public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
611     BiFunction<? super T,? super U,? extends V> fn) {
612 dl 1.1 return andFunction(other, fn, null);
613     }
614    
615     /**
616 jsr166 1.3 * Creates and returns a CompletableFuture that is asynchronously
617 dl 1.1 * completed using the {@link ForkJoinPool#commonPool()} with
618     * the result of the given function of this and the other given
619 dl 1.7 * CompletableFuture's results when both complete. If this or
620 dl 1.1 * the other CompletableFuture complete exceptionally, then the
621     * returned CompletableFuture also does so, with a
622 dl 1.19 * CompletionException holding the exception as its cause.
623 dl 1.1 *
624     * @param other the other CompletableFuture
625     * @param fn the function to use to compute the value of
626     * the returned CompletableFuture
627     * @return the new CompletableFuture
628     */
629 dl 1.17 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
630     BiFunction<? super T,? super U,? extends V> fn) {
631 dl 1.1 return andFunction(other, fn, ForkJoinPool.commonPool());
632     }
633    
634     /**
635 jsr166 1.3 * Creates and returns a CompletableFuture that is
636 dl 1.1 * asynchronously completed using the given executor with the
637     * result of the given function of this and the other given
638 dl 1.7 * CompletableFuture's results when both complete. If this or
639 dl 1.1 * the other CompletableFuture complete exceptionally, then the
640     * returned CompletableFuture also does so, with a
641 dl 1.19 * CompletionException holding the exception as its cause.
642 dl 1.1 *
643     * @param other the other CompletableFuture
644     * @param fn the function to use to compute the value of
645     * the returned CompletableFuture
646     * @param executor the executor to use for asynchronous execution
647     * @return the new CompletableFuture
648     */
649    
650 dl 1.17 public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
651     BiFunction<? super T,? super U,? extends V> fn,
652     Executor executor) {
653 dl 1.7 if (executor == null) throw new NullPointerException();
654     return andFunction(other, fn, executor);
655     }
656    
657     /**
658     * Creates and returns a CompletableFuture that is completed with
659     * the results of this and the other given CompletableFuture if
660     * both complete. If this and/or the other CompletableFuture
661     * complete exceptionally, then the returned CompletableFuture
662 dl 1.19 * also does so, with a CompletionException holding one of these
663 dl 1.7 * exceptions as its cause.
664     *
665     * @param other the other CompletableFuture
666     * @param block the action to perform before completing the
667     * returned CompletableFuture
668     * @return the new CompletableFuture
669     */
670 dl 1.17 public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
671     BiBlock<? super T, ? super U> block) {
672 dl 1.7 return andBlock(other, block, null);
673     }
674    
675     /**
676     * Creates and returns a CompletableFuture that is completed
677     * asynchronously using the {@link ForkJoinPool#commonPool()} with
678     * the results of this and the other given CompletableFuture when
679     * both complete. If this and/or the other CompletableFuture
680     * complete exceptionally, then the returned CompletableFuture
681 dl 1.19 * also does so, with a CompletionException holding one of these
682 dl 1.7 * exceptions as its cause.
683     *
684     * @param other the other CompletableFuture
685     * @param block the action to perform before completing the
686     * returned CompletableFuture
687     * @return the new CompletableFuture
688     */
689 dl 1.17 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
690     BiBlock<? super T, ? super U> block) {
691 dl 1.7 return andBlock(other, block, ForkJoinPool.commonPool());
692     }
693    
694     /**
695     * Creates and returns a CompletableFuture that is completed
696     * asynchronously using the given executor with the results of
697     * this and the other given CompletableFuture when both complete.
698     * If this and/or the other CompletableFuture complete
699     * exceptionally, then the returned CompletableFuture also does
700 dl 1.19 * so, with a CompletionException holding one of these exceptions as
701 dl 1.7 * its cause.
702     *
703     * @param other the other CompletableFuture
704     * @param block the action to perform before completing the
705     * returned CompletableFuture
706     * @param executor the executor to use for asynchronous execution
707     * @return the new CompletableFuture
708     */
709 dl 1.17 public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
710     BiBlock<? super T, ? super U> block,
711     Executor executor) {
712 dl 1.1 if (executor == null) throw new NullPointerException();
713 dl 1.7 return andBlock(other, block, executor);
714 dl 1.1 }
715    
716     /**
717     * Creates and returns a CompletableFuture that is completed
718 dl 1.7 * when this and the other given CompletableFuture both
719 dl 1.1 * complete. If this and/or the other CompletableFuture complete
720     * exceptionally, then the returned CompletableFuture also does
721 dl 1.19 * so, with a CompletionException holding one of these exceptions as
722 dl 1.1 * its cause.
723     *
724     * @param other the other CompletableFuture
725     * @param action the action to perform before completing the
726     * returned CompletableFuture
727     * @return the new CompletableFuture
728     */
729 dl 1.17 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
730     Runnable action) {
731 dl 1.1 return andRunnable(other, action, null);
732     }
733    
734     /**
735     * Creates and returns a CompletableFuture that is completed
736     * asynchronously using the {@link ForkJoinPool#commonPool()}
737 dl 1.7 * when this and the other given CompletableFuture both
738 dl 1.1 * complete. If this and/or the other CompletableFuture complete
739     * exceptionally, then the returned CompletableFuture also does
740 dl 1.19 * so, with a CompletionException holding one of these exceptions as
741 dl 1.1 * its cause.
742     *
743     * @param other the other CompletableFuture
744     * @param action the action to perform before completing the
745     * returned CompletableFuture
746     * @return the new CompletableFuture
747     */
748 dl 1.17 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
749     Runnable action) {
750 dl 1.1 return andRunnable(other, action, ForkJoinPool.commonPool());
751     }
752    
753     /**
754     * Creates and returns a CompletableFuture that is completed
755     * asynchronously using the given executor
756 dl 1.7 * when this and the other given CompletableFuture both
757 dl 1.1 * complete. If this and/or the other CompletableFuture complete
758     * exceptionally, then the returned CompletableFuture also does
759 dl 1.19 * so, with a CompletionException holding one of these exceptions as
760 dl 1.1 * its cause.
761     *
762     * @param other the other CompletableFuture
763     * @param action the action to perform before completing the
764     * returned CompletableFuture
765     * @param executor the executor to use for asynchronous execution
766     * @return the new CompletableFuture
767     */
768 dl 1.17 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
769     Runnable action,
770     Executor executor) {
771 dl 1.1 if (executor == null) throw new NullPointerException();
772     return andRunnable(other, action, executor);
773     }
774    
775     /**
776     * Creates and returns a CompletableFuture that is completed with
777     * the result of the given function of either this or the other
778 dl 1.7 * given CompletableFuture's results when either complete. If
779 dl 1.1 * this and/or the other CompletableFuture complete exceptionally,
780     * then the returned CompletableFuture may also do so, with a
781 dl 1.19 * CompletionException holding one of these exceptions as its cause.
782 dl 1.1 * No guarantees are made about which result or exception is used
783     * in the returned CompletableFuture.
784     *
785     * @param other the other CompletableFuture
786     * @param fn the function to use to compute the value of
787     * the returned CompletableFuture
788     * @return the new CompletableFuture
789     */
790 dl 1.17 public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
791     Function<? super T, U> fn) {
792 dl 1.1 return orFunction(other, fn, null);
793     }
794    
795     /**
796     * Creates and returns a CompletableFuture that is completed
797     * asynchronously using the {@link ForkJoinPool#commonPool()} with
798     * the result of the given function of either this or the other
799 dl 1.7 * given CompletableFuture's results when either complete. If
800 dl 1.1 * this and/or the other CompletableFuture complete exceptionally,
801     * then the returned CompletableFuture may also do so, with a
802 dl 1.19 * CompletionException holding one of these exceptions as its cause.
803 dl 1.1 * No guarantees are made about which result or exception is used
804     * in the returned CompletableFuture.
805     *
806     * @param other the other CompletableFuture
807     * @param fn the function to use to compute the value of
808     * the returned CompletableFuture
809     * @return the new CompletableFuture
810     */
811 dl 1.17 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
812     Function<? super T, U> fn) {
813 dl 1.1 return orFunction(other, fn, ForkJoinPool.commonPool());
814     }
815    
816     /**
817     * Creates and returns a CompletableFuture that is completed
818     * asynchronously using the given executor with the result of the
819     * given function of either this or the other given
820 dl 1.7 * CompletableFuture's results when either complete. If this
821 dl 1.1 * and/or the other CompletableFuture complete exceptionally, then
822     * the returned CompletableFuture may also do so, with a
823 dl 1.19 * CompletionException holding one of these exceptions as its cause.
824 dl 1.1 * No guarantees are made about which result or exception is used
825     * in the returned CompletableFuture.
826     *
827     * @param other the other CompletableFuture
828     * @param fn the function to use to compute the value of
829     * the returned CompletableFuture
830     * @param executor the executor to use for asynchronous execution
831     * @return the new CompletableFuture
832     */
833 dl 1.17 public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
834     Function<? super T, U> fn,
835     Executor executor) {
836 dl 1.1 if (executor == null) throw new NullPointerException();
837     return orFunction(other, fn, executor);
838     }
839    
840     /**
841 dl 1.7 * Creates and returns a CompletableFuture that is completed after
842     * performing the given action with the result of either this or the
843     * other given CompletableFuture's result, when either complete.
844     * If this and/or the other CompletableFuture complete
845     * exceptionally, then the returned CompletableFuture may also do
846 dl 1.19 * so, with a CompletionException holding one of these exceptions as
847 dl 1.7 * its cause. No guarantees are made about which exception is
848     * used in the returned CompletableFuture.
849     *
850     * @param other the other CompletableFuture
851     * @param block the action to perform before completing the
852     * returned CompletableFuture
853     * @return the new CompletableFuture
854     */
855 dl 1.17 public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
856     Block<? super T> block) {
857 dl 1.7 return orBlock(other, block, null);
858     }
859    
860     /**
861     * Creates and returns a CompletableFuture that is completed
862     * asynchronously using the {@link ForkJoinPool#commonPool()},
863     * performing the given action with the result of either this or
864     * the other given CompletableFuture's result, when either
865     * complete. If this and/or the other CompletableFuture complete
866     * exceptionally, then the returned CompletableFuture may also do
867 dl 1.19 * so, with a CompletionException holding one of these exceptions as
868 dl 1.7 * its cause. No guarantees are made about which exception is
869     * used in the returned CompletableFuture.
870     *
871     * @param other the other CompletableFuture
872     * @param block the action to perform before completing the
873     * returned CompletableFuture
874     * @return the new CompletableFuture
875     */
876 dl 1.17 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
877     Block<? super T> block) {
878 dl 1.7 return orBlock(other, block, ForkJoinPool.commonPool());
879     }
880    
881     /**
882     * Creates and returns a CompletableFuture that is completed
883     * asynchronously using the given executor,
884     * performing the given action with the result of either this or
885     * the other given CompletableFuture's result, when either
886     * complete. If this and/or the other CompletableFuture complete
887     * exceptionally, then the returned CompletableFuture may also do
888 dl 1.19 * so, with a CompletionException holding one of these exceptions as
889 dl 1.7 * its cause. No guarantees are made about which exception is
890     * used in the returned CompletableFuture.
891     *
892     * @param other the other CompletableFuture
893     * @param block the action to perform before completing the
894     * returned CompletableFuture
895     * @param executor the executor to use for asynchronous execution
896     * @return the new CompletableFuture
897     */
898 dl 1.17 public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
899     Block<? super T> block,
900     Executor executor) {
901 dl 1.7 if (executor == null) throw new NullPointerException();
902     return orBlock(other, block, executor);
903     }
904    
905     /**
906 dl 1.1 * Creates and returns a CompletableFuture that is completed
907     * after this or the other given CompletableFuture complete. If
908     * this and/or the other CompletableFuture complete exceptionally,
909     * then the returned CompletableFuture may also do so, with a
910 dl 1.19 * CompletionException holding one of these exceptions as its cause.
911 dl 1.1 * No guarantees are made about which exception is used in the
912     * returned CompletableFuture.
913     *
914     * @param other the other CompletableFuture
915     * @param action the action to perform before completing the
916     * returned CompletableFuture
917     * @return the new CompletableFuture
918     */
919 dl 1.17 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
920     Runnable action) {
921 dl 1.1 return orRunnable(other, action, null);
922     }
923    
924     /**
925     * Creates and returns a CompletableFuture that is completed
926     * asynchronously using the {@link ForkJoinPool#commonPool()}
927     * after this or the other given CompletableFuture complete. If
928     * this and/or the other CompletableFuture complete exceptionally,
929     * then the returned CompletableFuture may also do so, with a
930 dl 1.19 * CompletionException holding one of these exceptions as its cause.
931 dl 1.1 * No guarantees are made about which exception is used in the
932     * returned CompletableFuture.
933     *
934     * @param other the other CompletableFuture
935     * @param action the action to perform before completing the
936     * returned CompletableFuture
937     * @return the new CompletableFuture
938     */
939 dl 1.17 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
940     Runnable action) {
941 dl 1.1 return orRunnable(other, action, ForkJoinPool.commonPool());
942     }
943    
944     /**
945     * Creates and returns a CompletableFuture that is completed
946     * asynchronously using the given executor after this or the other
947     * given CompletableFuture complete. If this and/or the other
948     * CompletableFuture complete exceptionally, then the returned
949 dl 1.19 * CompletableFuture may also do so, with a CompletionException
950     * holding one of these exceptions as its cause. No guarantees are
951 dl 1.1 * made about which exception is used in the returned
952     * CompletableFuture.
953     *
954     * @param other the other CompletableFuture
955     * @param action the action to perform before completing the
956     * returned CompletableFuture
957     * @param executor the executor to use for asynchronous execution
958     * @return the new CompletableFuture
959     */
960 dl 1.17 public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
961     Runnable action,
962     Executor executor) {
963 dl 1.1 if (executor == null) throw new NullPointerException();
964     return orRunnable(other, action, executor);
965     }
966    
967     /**
968 dl 1.20 * Returns a CompletableFuture (or an equivalent one) produced by
969     * the given function of the result of this CompletableFuture when
970     * completed. If this CompletableFuture completes exceptionally,
971     * then the returned CompletableFuture also does so, with a
972     * CompletionException holding this exception as its cause.
973 dl 1.17 *
974     * @param fn the function returning a new CompletableFuture.
975     * @return the CompletableFuture, that {@code isDone()} upon
976     * return if completed by the given function, or an exception
977     * occurs.
978     */
979 dl 1.20 @SuppressWarnings("unchecked") public <U> CompletableFuture<U> thenCompose(Function<? super T,
980 dl 1.17 CompletableFuture<U>> fn) {
981     if (fn == null) throw new NullPointerException();
982     CompletableFuture<U> dst = null;
983     ThenCompose<T,U> d = null;
984     Object r;
985     if ((r = result) == null) {
986     dst = new CompletableFuture<U>();
987     CompletionNode p = new CompletionNode
988     (d = new ThenCompose<T,U>(this, fn, dst));
989     while ((r = result) == null) {
990     if (UNSAFE.compareAndSwapObject
991     (this, COMPLETIONS, p.next = completions, p))
992     break;
993     }
994     }
995     if (r != null && (d == null || d.compareAndSet(0, 1))) {
996     T t; Throwable ex = null;
997     if (r instanceof AltResult) {
998     ex = ((AltResult)r).ex;
999     t = null;
1000     }
1001     else
1002     t = (T) r;
1003     if (ex == null) {
1004     try {
1005     dst = fn.apply(t);
1006     } catch (Throwable rex) {
1007     ex = rex;
1008     }
1009     }
1010     if (dst == null) {
1011     dst = new CompletableFuture<U>();
1012     if (ex == null)
1013     ex = new NullPointerException();
1014     }
1015     if (ex != null)
1016 dl 1.20 dst.internalComplete(null, ex);
1017 dl 1.17 }
1018 dl 1.20 helpPostComplete();
1019     dst.helpPostComplete();
1020 dl 1.17 return dst;
1021     }
1022    
1023     /**
1024 dl 1.6 * Creates and returns a CompletableFuture that is completed with
1025     * the result of the given function of the exception triggering
1026 dl 1.7 * this CompletableFuture's completion when it completes
1027 dl 1.6 * exceptionally; Otherwise, if this CompletableFuture completes
1028     * normally, then the returned CompletableFuture also completes
1029     * normally with the same value.
1030     *
1031     * @param fn the function to use to compute the value of the
1032     * returned CompletableFuture if this CompletableFuture completed
1033     * exceptionally
1034 dl 1.1 * @return the new CompletableFuture
1035     */
1036 dl 1.20 @SuppressWarnings("unchecked") public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
1037 dl 1.6 if (fn == null) throw new NullPointerException();
1038     CompletableFuture<T> dst = new CompletableFuture<T>();
1039 dl 1.1 ExceptionAction<T> d = null;
1040 dl 1.6 Object r;
1041 jsr166 1.2 if ((r = result) == null) {
1042 dl 1.1 CompletionNode p =
1043 dl 1.6 new CompletionNode(d = new ExceptionAction<T>(this, fn, dst));
1044 dl 1.1 while ((r = result) == null) {
1045     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1046     p.next = completions, p))
1047     break;
1048     }
1049     }
1050 dl 1.6 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1051 dl 1.20 T t = null; Throwable ex, dx = null;
1052 dl 1.6 if (r instanceof AltResult) {
1053     if ((ex = ((AltResult)r).ex) != null) {
1054     try {
1055 dl 1.20 t = fn.apply(ex);
1056 dl 1.6 } catch (Throwable rex) {
1057 dl 1.20 dx = rex;
1058 dl 1.6 }
1059     }
1060 dl 1.1 }
1061 dl 1.6 else
1062     t = (T) r;
1063 dl 1.20 dst.internalComplete(t, dx);
1064 dl 1.1 }
1065 dl 1.20 helpPostComplete();
1066 dl 1.17 return dst;
1067     }
1068    
1069     /**
1070     * Creates and returns a CompletableFuture that is completed with
1071     * the result of the given function of the result and exception of
1072     * this CompletableFuture's completion when it completes. The
1073     * given function is invoked with the result (or {@code null} if
1074     * none) and the exception (or {@code null} if none) of this
1075     * CompletableFuture when complete.
1076     *
1077     * @param fn the function to use to compute the value of the
1078     * returned CompletableFuture
1079    
1080     * @return the new CompletableFuture
1081     */
1082 dl 1.20 @SuppressWarnings("unchecked") public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
1083 dl 1.17 if (fn == null) throw new NullPointerException();
1084     CompletableFuture<U> dst = new CompletableFuture<U>();
1085     ThenHandle<T,U> d = null;
1086     Object r;
1087     if ((r = result) == null) {
1088     CompletionNode p =
1089     new CompletionNode(d = new ThenHandle<T,U>(this, fn, dst));
1090     while ((r = result) == null) {
1091     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1092     p.next = completions, p))
1093     break;
1094     }
1095     }
1096     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1097     T t; Throwable ex;
1098     if (r instanceof AltResult) {
1099     ex = ((AltResult)r).ex;
1100     t = null;
1101     }
1102     else {
1103     ex = null;
1104     t = (T) r;
1105     }
1106 dl 1.20 U u = null; Throwable dx = null;
1107 dl 1.17 try {
1108 dl 1.20 u = fn.apply(t, ex);
1109 dl 1.17 } catch (Throwable rex) {
1110 dl 1.20 dx = rex;
1111 dl 1.17 }
1112 dl 1.20 dst.internalComplete(u, dx);
1113 dl 1.17 }
1114 dl 1.20 helpPostComplete();
1115 dl 1.1 return dst;
1116     }
1117    
1118     /**
1119     * Attempts to complete this CompletableFuture with
1120     * a {@link CancellationException}.
1121     *
1122     * @param mayInterruptIfRunning this value has no effect in this
1123     * implementation because interrupts are not used to control
1124     * processing.
1125     *
1126     * @return {@code true} if this task is now cancelled
1127     */
1128     public boolean cancel(boolean mayInterruptIfRunning) {
1129     Object r;
1130     while ((r = result) == null) {
1131     r = new AltResult(new CancellationException());
1132     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
1133     postComplete();
1134     return true;
1135     }
1136     }
1137     return ((r instanceof AltResult) &&
1138     (((AltResult)r).ex instanceof CancellationException));
1139     }
1140    
1141     /**
1142     * Returns {@code true} if this CompletableFuture was cancelled
1143     * before it completed normally.
1144     *
1145     * @return {@code true} if this CompletableFuture was cancelled
1146     * before it completed normally
1147     */
1148     public boolean isCancelled() {
1149     Object r;
1150     return ((r = result) != null &&
1151     (r instanceof AltResult) &&
1152     (((AltResult)r).ex instanceof CancellationException));
1153     }
1154    
1155     /**
1156 dl 1.6 * Forcibly sets or resets the value subsequently returned by
1157     * method get() and related methods, whether or not already
1158     * completed. This method is designed for use only in error
1159     * recovery actions, and even in such situations may result in
1160     * ongoing dependent completions using established versus
1161     * overwritten values.
1162 dl 1.1 *
1163     * @param value the completion value
1164     */
1165 dl 1.6 public void obtrudeValue(T value) {
1166 jsr166 1.2 result = (value == null) ? NIL : value;
1167 dl 1.1 postComplete();
1168     }
1169    
1170     /* ------------- waiting for completions -------------- */
1171    
1172 jsr166 1.2 /**
1173 dl 1.1 * Heuristic spin value for waitingGet() before blocking on
1174     * multiprocessors
1175     */
1176     static final int WAITING_GET_SPINS = 256;
1177    
1178     /**
1179 dl 1.19 * Returns raw result after waiting, or null if interruptible and
1180     * interrupted.
1181 dl 1.1 */
1182 dl 1.19 private Object waitingGet(boolean interruptible) {
1183 dl 1.1 WaitNode q = null;
1184     boolean queued = false, interrupted = false;
1185     int h = 0, spins = 0;
1186     for (Object r;;) {
1187     if ((r = result) != null) {
1188 dl 1.5 Throwable ex;
1189 dl 1.1 if (q != null) // suppress unpark
1190     q.thread = null;
1191     postComplete(); // help release others
1192     if (interrupted)
1193     Thread.currentThread().interrupt();
1194 dl 1.19 return r;
1195 dl 1.1 }
1196     else if (h == 0) {
1197     h = ThreadLocalRandom.current().nextInt();
1198     if (Runtime.getRuntime().availableProcessors() > 1)
1199     spins = WAITING_GET_SPINS;
1200     }
1201     else if (spins > 0) {
1202     h ^= h << 1; // xorshift
1203 jsr166 1.2 h ^= h >>> 3;
1204 dl 1.1 if ((h ^= h << 10) >= 0)
1205     --spins;
1206 jsr166 1.2 }
1207 dl 1.1 else if (q == null)
1208     q = new WaitNode();
1209     else if (!queued)
1210     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1211     q.next = waiters, q);
1212 dl 1.19 else if (Thread.interrupted()) {
1213 dl 1.20 if (interruptible) {
1214     removeWaiter(q);
1215 dl 1.19 return null;
1216 dl 1.20 }
1217 dl 1.1 interrupted = true;
1218 dl 1.19 }
1219 dl 1.1 else if (q.thread == null)
1220     q.thread = Thread.currentThread();
1221     else
1222     LockSupport.park(this);
1223     }
1224     }
1225    
1226     /**
1227     * Awaits completion or aborts on interrupt or timeout.
1228     *
1229     * @param nanos time to wait
1230     * @return raw result
1231     */
1232     private Object timedAwaitDone(long nanos)
1233     throws InterruptedException, TimeoutException {
1234     final long deadline = System.nanoTime() + nanos;
1235     WaitNode q = null;
1236     boolean queued = false;
1237     for (Object r;;) {
1238     if (Thread.interrupted()) {
1239     removeWaiter(q);
1240     throw new InterruptedException();
1241     }
1242     else if ((r = result) != null) {
1243     if (q != null)
1244     q.thread = null;
1245     postComplete();
1246     return r;
1247     }
1248     else if (q == null)
1249     q = new WaitNode();
1250     else if (!queued)
1251     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
1252     q.next = waiters, q);
1253     else if ((nanos = deadline - System.nanoTime()) <= 0L) {
1254     removeWaiter(q);
1255     throw new TimeoutException();
1256     }
1257     else if (q.thread == null)
1258     q.thread = Thread.currentThread();
1259     else
1260     LockSupport.parkNanos(this, nanos);
1261     }
1262     }
1263    
1264     /**
1265     * Tries to unlink a timed-out or interrupted wait node to avoid
1266     * accumulating garbage. Internal nodes are simply unspliced
1267     * without CAS since it is harmless if they are traversed anyway
1268     * by releasers. To avoid effects of unsplicing from already
1269     * removed nodes, the list is retraversed in case of an apparent
1270     * race. This is slow when there are a lot of nodes, but we don't
1271     * expect lists to be long enough to outweigh higher-overhead
1272     * schemes.
1273     */
1274     private void removeWaiter(WaitNode node) {
1275     if (node != null) {
1276     node.thread = null;
1277     retry:
1278     for (;;) { // restart on removeWaiter race
1279     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
1280     s = q.next;
1281     if (q.thread != null)
1282     pred = q;
1283     else if (pred != null) {
1284     pred.next = s;
1285     if (pred.thread == null) // check for race
1286     continue retry;
1287     }
1288     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
1289     continue retry;
1290     }
1291     break;
1292     }
1293     }
1294     }
1295    
1296     /* ------------- Async tasks -------------- */
1297    
1298     /** Base class can act as either FJ or plain Runnable */
1299     static abstract class Async extends ForkJoinTask<Void> implements Runnable {
1300     public final Void getRawResult() { return null; }
1301     public final void setRawResult(Void v) { }
1302     public final void run() { exec(); }
1303     }
1304    
1305     static final class AsyncRunnable extends Async {
1306     final Runnable runnable;
1307     final CompletableFuture<Void> dst;
1308     AsyncRunnable(Runnable runnable, CompletableFuture<Void> dst) {
1309     this.runnable = runnable; this.dst = dst;
1310     }
1311     public final boolean exec() {
1312     Runnable fn;
1313     CompletableFuture<Void> d;
1314     if ((fn = this.runnable) == null || (d = this.dst) == null)
1315     throw new NullPointerException();
1316     try {
1317     fn.run();
1318     d.complete(null);
1319     } catch (Throwable ex) {
1320     d.completeExceptionally(ex);
1321     }
1322     return true;
1323     }
1324     private static final long serialVersionUID = 5232453952276885070L;
1325     }
1326    
1327     static final class AsyncSupplier<U> extends Async {
1328     final Supplier<U> supplier;
1329     final CompletableFuture<U> dst;
1330     AsyncSupplier(Supplier<U> supplier, CompletableFuture<U> dst) {
1331     this.supplier = supplier; this.dst = dst;
1332     }
1333     public final boolean exec() {
1334     Supplier<U> fn;
1335     CompletableFuture<U> d;
1336     if ((fn = this.supplier) == null || (d = this.dst) == null)
1337     throw new NullPointerException();
1338     try {
1339     d.complete(fn.get());
1340     } catch (Throwable ex) {
1341     d.completeExceptionally(ex);
1342     }
1343     return true;
1344     }
1345     private static final long serialVersionUID = 5232453952276885070L;
1346     }
1347    
1348     static final class AsyncFunction<T,U> extends Async {
1349     Function<? super T,? extends U> fn;
1350     T arg;
1351     final CompletableFuture<U> dst;
1352     AsyncFunction(T arg, Function<? super T,? extends U> fn,
1353     CompletableFuture<U> dst) {
1354     this.arg = arg; this.fn = fn; this.dst = dst;
1355     }
1356     public final boolean exec() {
1357     Function<? super T,? extends U> fn;
1358     CompletableFuture<U> d;
1359     if ((fn = this.fn) == null || (d = this.dst) == null)
1360     throw new NullPointerException();
1361     try {
1362     d.complete(fn.apply(arg));
1363     } catch (Throwable ex) {
1364     d.completeExceptionally(ex);
1365     }
1366     return true;
1367     }
1368     private static final long serialVersionUID = 5232453952276885070L;
1369     }
1370    
1371     static final class AsyncBiFunction<T,U,V> extends Async {
1372     final BiFunction<? super T,? super U,? extends V> fn;
1373     final T arg1;
1374     final U arg2;
1375     final CompletableFuture<V> dst;
1376     AsyncBiFunction(T arg1, U arg2,
1377     BiFunction<? super T,? super U,? extends V> fn,
1378     CompletableFuture<V> dst) {
1379     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1380     }
1381     public final boolean exec() {
1382     BiFunction<? super T,? super U,? extends V> fn;
1383     CompletableFuture<V> d;
1384     if ((fn = this.fn) == null || (d = this.dst) == null)
1385     throw new NullPointerException();
1386     try {
1387     d.complete(fn.apply(arg1, arg2));
1388     } catch (Throwable ex) {
1389     d.completeExceptionally(ex);
1390     }
1391     return true;
1392     }
1393     private static final long serialVersionUID = 5232453952276885070L;
1394     }
1395    
1396 dl 1.7 static final class AsyncBlock<T> extends Async {
1397     Block<? super T> fn;
1398     T arg;
1399     final CompletableFuture<Void> dst;
1400     AsyncBlock(T arg, Block<? super T> fn,
1401     CompletableFuture<Void> dst) {
1402     this.arg = arg; this.fn = fn; this.dst = dst;
1403     }
1404     public final boolean exec() {
1405     Block<? super T> fn;
1406     CompletableFuture<Void> d;
1407     if ((fn = this.fn) == null || (d = this.dst) == null)
1408     throw new NullPointerException();
1409     try {
1410     fn.accept(arg);
1411     d.complete(null);
1412     } catch (Throwable ex) {
1413     d.completeExceptionally(ex);
1414     }
1415     return true;
1416     }
1417     private static final long serialVersionUID = 5232453952276885070L;
1418     }
1419    
1420     static final class AsyncBiBlock<T,U> extends Async {
1421     final BiBlock<? super T,? super U> fn;
1422     final T arg1;
1423     final U arg2;
1424     final CompletableFuture<Void> dst;
1425     AsyncBiBlock(T arg1, U arg2,
1426     BiBlock<? super T,? super U> fn,
1427     CompletableFuture<Void> dst) {
1428     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
1429     }
1430     public final boolean exec() {
1431     BiBlock<? super T,? super U> fn;
1432     CompletableFuture<Void> d;
1433     if ((fn = this.fn) == null || (d = this.dst) == null)
1434     throw new NullPointerException();
1435     try {
1436     fn.accept(arg1, arg2);
1437     d.complete(null);
1438     } catch (Throwable ex) {
1439     d.completeExceptionally(ex);
1440     }
1441     return true;
1442     }
1443     private static final long serialVersionUID = 5232453952276885070L;
1444     }
1445    
1446 dl 1.1 /* ------------- Completions -------------- */
1447    
1448     // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
1449     static abstract class Completion extends AtomicInteger implements Runnable {
1450     }
1451    
1452     static final class ThenFunction<T,U> extends Completion {
1453 jsr166 1.2 final CompletableFuture<? extends T> src;
1454     final Function<? super T,? extends U> fn;
1455     final CompletableFuture<U> dst;
1456 dl 1.1 final Executor executor;
1457     ThenFunction(CompletableFuture<? extends T> src,
1458     final Function<? super T,? extends U> fn,
1459     final CompletableFuture<U> dst, Executor executor) {
1460     this.src = src; this.fn = fn; this.dst = dst;
1461     this.executor = executor;
1462     }
1463 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1464 dl 1.1 CompletableFuture<? extends T> a;
1465     Function<? super T,? extends U> fn;
1466     CompletableFuture<U> dst;
1467 jsr166 1.2 Object r; T t; Throwable ex;
1468     if ((dst = this.dst) != null &&
1469 dl 1.1 (fn = this.fn) != null &&
1470     (a = this.src) != null &&
1471     (r = a.result) != null &&
1472     compareAndSet(0, 1)) {
1473 jsr166 1.2 if (r instanceof AltResult) {
1474 dl 1.19 ex = ((AltResult)r).ex;
1475 dl 1.1 t = null;
1476     }
1477 dl 1.17 else {
1478     ex = null;
1479 dl 1.1 t = (T) r;
1480     }
1481 dl 1.20 Executor e = executor;
1482     U u = null;
1483 dl 1.17 if (ex == null) {
1484     try {
1485 dl 1.20 if (e != null)
1486     e.execute(new AsyncFunction<T,U>(t, fn, dst));
1487 dl 1.17 else
1488 dl 1.20 u = fn.apply(t);
1489 dl 1.17 } catch (Throwable rex) {
1490     ex = rex;
1491     }
1492     }
1493 dl 1.20 if (e == null || ex != null)
1494     dst.internalComplete(u, ex);
1495 dl 1.1 }
1496     }
1497 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1498 dl 1.1 }
1499    
1500 dl 1.7 static final class ThenBlock<T> extends Completion {
1501     final CompletableFuture<? extends T> src;
1502     final Block<? super T> fn;
1503     final CompletableFuture<Void> dst;
1504     final Executor executor;
1505     ThenBlock(CompletableFuture<? extends T> src,
1506     final Block<? super T> fn,
1507     final CompletableFuture<Void> dst, Executor executor) {
1508     this.src = src; this.fn = fn; this.dst = dst;
1509     this.executor = executor;
1510     }
1511 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1512 dl 1.7 CompletableFuture<? extends T> a;
1513     Block<? super T> fn;
1514     CompletableFuture<Void> dst;
1515     Object r; T t; Throwable ex;
1516     if ((dst = this.dst) != null &&
1517     (fn = this.fn) != null &&
1518     (a = this.src) != null &&
1519     (r = a.result) != null &&
1520     compareAndSet(0, 1)) {
1521     if (r instanceof AltResult) {
1522 dl 1.19 ex = ((AltResult)r).ex;
1523 dl 1.7 t = null;
1524     }
1525 dl 1.17 else {
1526     ex = null;
1527 dl 1.7 t = (T) r;
1528 dl 1.17 }
1529 dl 1.20 Executor e = executor;
1530 dl 1.17 if (ex == null) {
1531     try {
1532 dl 1.20 if (e != null)
1533     e.execute(new AsyncBlock<T>(t, fn, dst));
1534     else
1535 dl 1.17 fn.accept(t);
1536     } catch (Throwable rex) {
1537     ex = rex;
1538 dl 1.7 }
1539     }
1540 dl 1.20 if (e == null || ex != null)
1541     dst.internalComplete(null, ex);
1542 dl 1.7 }
1543     }
1544 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1545 dl 1.7 }
1546    
1547 dl 1.1 static final class ThenRunnable<T> extends Completion {
1548 jsr166 1.2 final CompletableFuture<? extends T> src;
1549     final Runnable fn;
1550     final CompletableFuture<Void> dst;
1551 dl 1.1 final Executor executor;
1552     ThenRunnable(CompletableFuture<? extends T> src,
1553     Runnable fn,
1554     CompletableFuture<Void> dst,
1555     Executor executor) {
1556     this.src = src; this.fn = fn; this.dst = dst;
1557     this.executor = executor;
1558     }
1559 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1560 dl 1.1 CompletableFuture<? extends T> a;
1561     Runnable fn;
1562     CompletableFuture<Void> dst;
1563 jsr166 1.2 Object r; Throwable ex;
1564     if ((dst = this.dst) != null &&
1565 dl 1.1 (fn = this.fn) != null &&
1566     (a = this.src) != null &&
1567     (r = a.result) != null &&
1568     compareAndSet(0, 1)) {
1569 dl 1.19 if (r instanceof AltResult)
1570     ex = ((AltResult)r).ex;
1571 dl 1.17 else
1572     ex = null;
1573 dl 1.20 Executor e = executor;
1574 dl 1.17 if (ex == null) {
1575     try {
1576 dl 1.20 if (e != null)
1577     e.execute(new AsyncRunnable(fn, dst));
1578     else
1579 dl 1.17 fn.run();
1580     } catch (Throwable rex) {
1581     ex = rex;
1582 dl 1.1 }
1583     }
1584 dl 1.20 if (e == null || ex != null)
1585     dst.internalComplete(null, ex);
1586 dl 1.1 }
1587     }
1588 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1589 dl 1.1 }
1590    
1591     static final class AndFunction<T,U,V> extends Completion {
1592 jsr166 1.2 final CompletableFuture<? extends T> src;
1593     final CompletableFuture<? extends U> snd;
1594     final BiFunction<? super T,? super U,? extends V> fn;
1595     final CompletableFuture<V> dst;
1596 dl 1.1 final Executor executor;
1597     AndFunction(CompletableFuture<? extends T> src,
1598     CompletableFuture<? extends U> snd,
1599     BiFunction<? super T,? super U,? extends V> fn,
1600     CompletableFuture<V> dst, Executor executor) {
1601     this.src = src; this.snd = snd;
1602     this.fn = fn; this.dst = dst;
1603     this.executor = executor;
1604     }
1605 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1606 jsr166 1.2 Object r, s; T t; U u; Throwable ex;
1607 dl 1.1 CompletableFuture<? extends T> a;
1608     CompletableFuture<? extends U> b;
1609     BiFunction<? super T,? super U,? extends V> fn;
1610     CompletableFuture<V> dst;
1611 jsr166 1.2 if ((dst = this.dst) != null &&
1612 dl 1.1 (fn = this.fn) != null &&
1613     (a = this.src) != null &&
1614     (r = a.result) != null &&
1615     (b = this.snd) != null &&
1616     (s = b.result) != null &&
1617     compareAndSet(0, 1)) {
1618 jsr166 1.2 if (r instanceof AltResult) {
1619 dl 1.19 ex = ((AltResult)r).ex;
1620 jsr166 1.2 t = null;
1621     }
1622 dl 1.19 else {
1623     ex = null;
1624 jsr166 1.2 t = (T) r;
1625 dl 1.19 }
1626     if (ex != null)
1627     u = null;
1628     else if (s instanceof AltResult) {
1629     ex = ((AltResult)s).ex;
1630 jsr166 1.2 u = null;
1631     }
1632     else
1633     u = (U) s;
1634 dl 1.20 Executor e = executor;
1635     V v = null;
1636 dl 1.19 if (ex == null) {
1637     try {
1638 dl 1.20 if (e != null)
1639     e.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
1640 dl 1.19 else
1641 dl 1.20 v = fn.apply(t, u);
1642 dl 1.19 } catch (Throwable rex) {
1643     ex = rex;
1644     }
1645 dl 1.1 }
1646 dl 1.20 if (e == null || ex != null)
1647     dst.internalComplete(v, ex);
1648 jsr166 1.2 }
1649     }
1650 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1651 dl 1.1 }
1652    
1653 dl 1.7 static final class AndBlock<T,U> extends Completion {
1654     final CompletableFuture<? extends T> src;
1655     final CompletableFuture<? extends U> snd;
1656     final BiBlock<? super T,? super U> fn;
1657     final CompletableFuture<Void> dst;
1658     final Executor executor;
1659     AndBlock(CompletableFuture<? extends T> src,
1660     CompletableFuture<? extends U> snd,
1661     BiBlock<? super T,? super U> fn,
1662     CompletableFuture<Void> dst, Executor executor) {
1663     this.src = src; this.snd = snd;
1664     this.fn = fn; this.dst = dst;
1665     this.executor = executor;
1666     }
1667 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1668 dl 1.7 Object r, s; T t; U u; Throwable ex;
1669     CompletableFuture<? extends T> a;
1670     CompletableFuture<? extends U> b;
1671     BiBlock<? super T,? super U> fn;
1672     CompletableFuture<Void> dst;
1673     if ((dst = this.dst) != null &&
1674     (fn = this.fn) != null &&
1675     (a = this.src) != null &&
1676     (r = a.result) != null &&
1677     (b = this.snd) != null &&
1678     (s = b.result) != null &&
1679     compareAndSet(0, 1)) {
1680     if (r instanceof AltResult) {
1681 dl 1.19 ex = ((AltResult)r).ex;
1682 dl 1.7 t = null;
1683     }
1684 dl 1.19 else {
1685     ex = null;
1686 dl 1.7 t = (T) r;
1687 dl 1.19 }
1688     if (ex != null)
1689     u = null;
1690     else if (s instanceof AltResult) {
1691     ex = ((AltResult)s).ex;
1692 dl 1.7 u = null;
1693     }
1694     else
1695     u = (U) s;
1696 dl 1.20 Executor e = executor;
1697 dl 1.19 if (ex == null) {
1698     try {
1699 dl 1.20 if (e != null)
1700     e.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
1701     else
1702 dl 1.19 fn.accept(t, u);
1703     } catch (Throwable rex) {
1704     ex = rex;
1705 dl 1.7 }
1706     }
1707 dl 1.20 if (e == null || ex != null)
1708     dst.internalComplete(null, ex);
1709 dl 1.7 }
1710     }
1711 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1712 dl 1.7 }
1713    
1714 dl 1.1 static final class AndRunnable<T> extends Completion {
1715 jsr166 1.2 final CompletableFuture<? extends T> src;
1716     final CompletableFuture<?> snd;
1717     final Runnable fn;
1718     final CompletableFuture<Void> dst;
1719 dl 1.1 final Executor executor;
1720     AndRunnable(CompletableFuture<? extends T> src,
1721     CompletableFuture<?> snd,
1722     Runnable fn,
1723     CompletableFuture<Void> dst, Executor executor) {
1724     this.src = src; this.snd = snd;
1725     this.fn = fn; this.dst = dst;
1726     this.executor = executor;
1727     }
1728 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1729 jsr166 1.2 Object r, s; Throwable ex;
1730 dl 1.1 final CompletableFuture<? extends T> a;
1731     final CompletableFuture<?> b;
1732     final Runnable fn;
1733     final CompletableFuture<Void> dst;
1734 jsr166 1.2 if ((dst = this.dst) != null &&
1735 dl 1.1 (fn = this.fn) != null &&
1736     (a = this.src) != null &&
1737     (r = a.result) != null &&
1738     (b = this.snd) != null &&
1739     (s = b.result) != null &&
1740     compareAndSet(0, 1)) {
1741 dl 1.19 if (r instanceof AltResult)
1742     ex = ((AltResult)r).ex;
1743     else
1744     ex = null;
1745     if (ex == null && (s instanceof AltResult))
1746     ex = ((AltResult)s).ex;
1747 dl 1.20 Executor e = executor;
1748 dl 1.19 if (ex == null) {
1749     try {
1750 dl 1.20 if (e != null)
1751     e.execute(new AsyncRunnable(fn, dst));
1752     else
1753 dl 1.19 fn.run();
1754     } catch (Throwable rex) {
1755     ex = rex;
1756 dl 1.1 }
1757 jsr166 1.2 }
1758 dl 1.20 if (e == null || ex != null)
1759     dst.internalComplete(null, ex);
1760 jsr166 1.2 }
1761     }
1762 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1763 dl 1.1 }
1764    
1765     static final class OrFunction<T,U> extends Completion {
1766 jsr166 1.2 final CompletableFuture<? extends T> src;
1767     final CompletableFuture<? extends T> snd;
1768     final Function<? super T,? extends U> fn;
1769     final CompletableFuture<U> dst;
1770 dl 1.1 final Executor executor;
1771     OrFunction(CompletableFuture<? extends T> src,
1772     CompletableFuture<? extends T> snd,
1773     Function<? super T,? extends U> fn,
1774     CompletableFuture<U> dst, Executor executor) {
1775     this.src = src; this.snd = snd;
1776     this.fn = fn; this.dst = dst;
1777     this.executor = executor;
1778     }
1779 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1780 jsr166 1.2 Object r; T t; Throwable ex;
1781 dl 1.1 CompletableFuture<? extends T> a;
1782     CompletableFuture<? extends T> b;
1783     Function<? super T,? extends U> fn;
1784     CompletableFuture<U> dst;
1785 jsr166 1.2 if ((dst = this.dst) != null &&
1786 dl 1.1 (fn = this.fn) != null &&
1787     (((a = this.src) != null && (r = a.result) != null) ||
1788     ((b = this.snd) != null && (r = b.result) != null)) &&
1789     compareAndSet(0, 1)) {
1790 jsr166 1.2 if (r instanceof AltResult) {
1791 dl 1.19 ex = ((AltResult)r).ex;
1792 jsr166 1.2 t = null;
1793     }
1794 dl 1.19 else {
1795     ex = null;
1796 jsr166 1.2 t = (T) r;
1797 dl 1.1 }
1798 dl 1.20 Executor e = executor;
1799     U u = null;
1800 dl 1.19 if (ex == null) {
1801     try {
1802 dl 1.20 if (e != null)
1803     e.execute(new AsyncFunction<T,U>(t, fn, dst));
1804 dl 1.19 else
1805 dl 1.20 u = fn.apply(t);
1806 dl 1.19 } catch (Throwable rex) {
1807     ex = rex;
1808     }
1809     }
1810 dl 1.20 if (e == null || ex != null)
1811     dst.internalComplete(u, ex);
1812 jsr166 1.2 }
1813     }
1814 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1815 dl 1.1 }
1816    
1817 dl 1.7 static final class OrBlock<T> extends Completion {
1818     final CompletableFuture<? extends T> src;
1819     final CompletableFuture<? extends T> snd;
1820     final Block<? super T> fn;
1821     final CompletableFuture<Void> dst;
1822     final Executor executor;
1823     OrBlock(CompletableFuture<? extends T> src,
1824     CompletableFuture<? extends T> snd,
1825     Block<? super T> fn,
1826     CompletableFuture<Void> dst, Executor executor) {
1827     this.src = src; this.snd = snd;
1828     this.fn = fn; this.dst = dst;
1829     this.executor = executor;
1830     }
1831 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1832 dl 1.7 Object r; T t; Throwable ex;
1833     CompletableFuture<? extends T> a;
1834     CompletableFuture<? extends T> b;
1835     Block<? super T> fn;
1836     CompletableFuture<Void> dst;
1837     if ((dst = this.dst) != null &&
1838     (fn = this.fn) != null &&
1839     (((a = this.src) != null && (r = a.result) != null) ||
1840     ((b = this.snd) != null && (r = b.result) != null)) &&
1841     compareAndSet(0, 1)) {
1842     if (r instanceof AltResult) {
1843 dl 1.19 ex = ((AltResult)r).ex;
1844 dl 1.7 t = null;
1845     }
1846 dl 1.19 else {
1847     ex = null;
1848 dl 1.7 t = (T) r;
1849 dl 1.19 }
1850 dl 1.20 Executor e = executor;
1851 dl 1.19 if (ex == null) {
1852     try {
1853 dl 1.20 if (e != null)
1854     e.execute(new AsyncBlock<T>(t, fn, dst));
1855     else
1856 dl 1.19 fn.accept(t);
1857     } catch (Throwable rex) {
1858     ex = rex;
1859 dl 1.7 }
1860     }
1861 dl 1.20 if (e == null || ex != null)
1862     dst.internalComplete(null, ex);
1863 dl 1.7 }
1864     }
1865 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1866 dl 1.7 }
1867    
1868 dl 1.1 static final class OrRunnable<T> extends Completion {
1869 jsr166 1.2 final CompletableFuture<? extends T> src;
1870     final CompletableFuture<?> snd;
1871     final Runnable fn;
1872     final CompletableFuture<Void> dst;
1873 dl 1.1 final Executor executor;
1874     OrRunnable(CompletableFuture<? extends T> src,
1875     CompletableFuture<?> snd,
1876     Runnable fn,
1877     CompletableFuture<Void> dst, Executor executor) {
1878     this.src = src; this.snd = snd;
1879     this.fn = fn; this.dst = dst;
1880     this.executor = executor;
1881     }
1882 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1883 jsr166 1.2 Object r; Throwable ex;
1884 dl 1.1 CompletableFuture<? extends T> a;
1885     final CompletableFuture<?> b;
1886     final Runnable fn;
1887     final CompletableFuture<Void> dst;
1888 jsr166 1.2 if ((dst = this.dst) != null &&
1889 dl 1.1 (fn = this.fn) != null &&
1890     (((a = this.src) != null && (r = a.result) != null) ||
1891     ((b = this.snd) != null && (r = b.result) != null)) &&
1892     compareAndSet(0, 1)) {
1893 dl 1.19 if (r instanceof AltResult)
1894     ex = ((AltResult)r).ex;
1895     else
1896     ex = null;
1897 dl 1.20 Executor e = executor;
1898 dl 1.19 if (ex == null) {
1899 dl 1.1 try {
1900 dl 1.20 if (e != null)
1901     e.execute(new AsyncRunnable(fn, dst));
1902     else
1903 dl 1.1 fn.run();
1904     } catch (Throwable rex) {
1905 dl 1.19 ex = rex;
1906 dl 1.1 }
1907     }
1908 dl 1.20 if (e == null || ex != null)
1909     dst.internalComplete(null, ex);
1910 jsr166 1.2 }
1911     }
1912 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1913 dl 1.1 }
1914    
1915     static final class ExceptionAction<T> extends Completion {
1916 jsr166 1.2 final CompletableFuture<? extends T> src;
1917 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1918     final CompletableFuture<T> dst;
1919 dl 1.1 ExceptionAction(CompletableFuture<? extends T> src,
1920 dl 1.6 Function<? super Throwable, ? extends T> fn,
1921     CompletableFuture<T> dst) {
1922 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1923     }
1924 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1925 dl 1.1 CompletableFuture<? extends T> a;
1926 dl 1.6 Function<? super Throwable, ? extends T> fn;
1927     CompletableFuture<T> dst;
1928 dl 1.20 Object r; T t = null; Throwable ex, dx = null;
1929 jsr166 1.2 if ((dst = this.dst) != null &&
1930 dl 1.1 (fn = this.fn) != null &&
1931     (a = this.src) != null &&
1932     (r = a.result) != null &&
1933     compareAndSet(0, 1)) {
1934 dl 1.20 if ((r instanceof AltResult) &&
1935     (ex = ((AltResult)r).ex) != null) {
1936     try {
1937     t = fn.apply(ex);
1938     } catch (Throwable rex) {
1939     dx = rex;
1940 dl 1.1 }
1941     }
1942 dl 1.6 else
1943     t = (T) r;
1944 dl 1.20 dst.internalComplete(t, dx);
1945 dl 1.1 }
1946     }
1947 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1948 dl 1.1 }
1949    
1950 dl 1.17 static final class ThenCopy<T> extends Completion {
1951     final CompletableFuture<? extends T> src;
1952     final CompletableFuture<T> dst;
1953     ThenCopy(CompletableFuture<? extends T> src,
1954     CompletableFuture<T> dst) {
1955     this.src = src; this.dst = dst;
1956     }
1957 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1958 dl 1.17 CompletableFuture<? extends T> a;
1959     CompletableFuture<T> dst;
1960 dl 1.20 Object r; Object t; Throwable ex;
1961 dl 1.17 if ((dst = this.dst) != null &&
1962     (a = this.src) != null &&
1963     (r = a.result) != null &&
1964     compareAndSet(0, 1)) {
1965 dl 1.20 if (r instanceof AltResult) {
1966     ex = ((AltResult)r).ex;
1967     t = null;
1968     }
1969     else {
1970     ex = null;
1971     t = r;
1972     }
1973     dst.internalComplete(t, ex);
1974 dl 1.17 }
1975     }
1976 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1977 dl 1.17 }
1978    
1979     static final class ThenHandle<T,U> extends Completion {
1980     final CompletableFuture<? extends T> src;
1981     final BiFunction<? super T, Throwable, ? extends U> fn;
1982     final CompletableFuture<U> dst;
1983     ThenHandle(CompletableFuture<? extends T> src,
1984     BiFunction<? super T, Throwable, ? extends U> fn,
1985     final CompletableFuture<U> dst) {
1986     this.src = src; this.fn = fn; this.dst = dst;
1987     }
1988 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
1989 dl 1.17 CompletableFuture<? extends T> a;
1990     BiFunction<? super T, Throwable, ? extends U> fn;
1991     CompletableFuture<U> dst;
1992     Object r; T t; Throwable ex;
1993     if ((dst = this.dst) != null &&
1994     (fn = this.fn) != null &&
1995     (a = this.src) != null &&
1996     (r = a.result) != null &&
1997     compareAndSet(0, 1)) {
1998     if (r instanceof AltResult) {
1999     ex = ((AltResult)r).ex;
2000     t = null;
2001     }
2002     else {
2003     ex = null;
2004     t = (T) r;
2005     }
2006 dl 1.20 U u = null; Throwable dx = null;
2007 dl 1.17 try {
2008 dl 1.20 u = fn.apply(t, ex);
2009 dl 1.17 } catch (Throwable rex) {
2010 dl 1.20 dx = rex;
2011 dl 1.17 }
2012 dl 1.20 dst.internalComplete(u, dx);
2013 dl 1.17 }
2014     }
2015 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
2016 dl 1.17 }
2017    
2018     static final class ThenCompose<T,U> extends Completion {
2019     final CompletableFuture<? extends T> src;
2020     final Function<? super T, CompletableFuture<U>> fn;
2021     final CompletableFuture<U> dst;
2022     ThenCompose(CompletableFuture<? extends T> src,
2023     Function<? super T, CompletableFuture<U>> fn,
2024     final CompletableFuture<U> dst) {
2025     this.src = src; this.fn = fn; this.dst = dst;
2026     }
2027 dl 1.20 @SuppressWarnings("unchecked") public final void run() {
2028 dl 1.17 CompletableFuture<? extends T> a;
2029     Function<? super T, CompletableFuture<U>> fn;
2030     CompletableFuture<U> dst;
2031     Object r; T t; Throwable ex;
2032     if ((dst = this.dst) != null &&
2033     (fn = this.fn) != null &&
2034     (a = this.src) != null &&
2035     (r = a.result) != null &&
2036     compareAndSet(0, 1)) {
2037     if (r instanceof AltResult) {
2038     ex = ((AltResult)r).ex;
2039     t = null;
2040     }
2041     else {
2042     ex = null;
2043     t = (T) r;
2044     }
2045     CompletableFuture<U> c = null;
2046 dl 1.20 U u = null;
2047     boolean complete = false;
2048 dl 1.17 if (ex == null) {
2049     try {
2050     c = fn.apply(t);
2051     } catch (Throwable rex) {
2052     ex = rex;
2053     }
2054     }
2055     if (ex != null || c == null) {
2056     if (ex == null)
2057     ex = new NullPointerException();
2058     }
2059 dl 1.18 else {
2060     ThenCopy<U> d = null;
2061     Object s;
2062     if ((s = c.result) == null) {
2063     CompletionNode p = new CompletionNode
2064     (d = new ThenCopy<U>(c, dst));
2065     while ((s = c.result) == null) {
2066     if (UNSAFE.compareAndSwapObject
2067     (c, COMPLETIONS, p.next = c.completions, p))
2068     break;
2069     }
2070 dl 1.17 }
2071 dl 1.18 if (s != null && (d == null || d.compareAndSet(0, 1))) {
2072 dl 1.20 complete = true;
2073 dl 1.18 if (s instanceof AltResult) {
2074     ex = ((AltResult)s).ex; // no rewrap
2075     u = null;
2076     }
2077     else
2078     u = (U) s;
2079 dl 1.17 }
2080     }
2081 dl 1.20 if (complete || ex != null)
2082     dst.internalComplete(u, ex);
2083     if (c != null)
2084     c.helpPostComplete();
2085 dl 1.17 }
2086     }
2087 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
2088 dl 1.17 }
2089    
2090 dl 1.1 /* ------------- then/and/or implementations -------------- */
2091    
2092 dl 1.20 @SuppressWarnings("unchecked") private <U> CompletableFuture<U> thenFunction
2093     (Function<? super T,? extends U> fn,
2094     Executor executor) {
2095 dl 1.1 if (fn == null) throw new NullPointerException();
2096     CompletableFuture<U> dst = new CompletableFuture<U>();
2097     ThenFunction<T,U> d = null;
2098 jsr166 1.2 Object r;
2099     if ((r = result) == null) {
2100 dl 1.1 CompletionNode p = new CompletionNode
2101     (d = new ThenFunction<T,U>(this, fn, dst, executor));
2102     while ((r = result) == null) {
2103     if (UNSAFE.compareAndSwapObject
2104     (this, COMPLETIONS, p.next = completions, p))
2105     break;
2106     }
2107     }
2108 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2109 dl 1.19 T t; Throwable ex;
2110 jsr166 1.2 if (r instanceof AltResult) {
2111 dl 1.19 ex = ((AltResult)r).ex;
2112 jsr166 1.2 t = null;
2113 dl 1.1 }
2114 dl 1.19 else {
2115     ex = null;
2116 jsr166 1.2 t = (T) r;
2117 dl 1.19 }
2118 dl 1.20 Executor e = executor;
2119     U u = null;
2120 jsr166 1.2 if (ex == null) {
2121 dl 1.1 try {
2122 dl 1.20 if (e != null)
2123     e.execute(new AsyncFunction<T,U>(t, fn, dst));
2124 dl 1.1 else
2125 dl 1.20 u = fn.apply(t);
2126 dl 1.1 } catch (Throwable rex) {
2127 dl 1.19 ex = rex;
2128 dl 1.1 }
2129     }
2130 dl 1.20 if (e == null || ex != null)
2131     dst.internalComplete(u, ex);
2132 dl 1.17 }
2133 dl 1.20 helpPostComplete();
2134 dl 1.1 return dst;
2135     }
2136    
2137 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> thenBlock
2138     (Block<? super T> fn,
2139     Executor executor) {
2140 dl 1.7 if (fn == null) throw new NullPointerException();
2141     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2142     ThenBlock<T> d = null;
2143     Object r;
2144     if ((r = result) == null) {
2145     CompletionNode p = new CompletionNode
2146     (d = new ThenBlock<T>(this, fn, dst, executor));
2147     while ((r = result) == null) {
2148     if (UNSAFE.compareAndSwapObject
2149     (this, COMPLETIONS, p.next = completions, p))
2150     break;
2151     }
2152     }
2153     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2154 dl 1.19 T t; Throwable ex;
2155 dl 1.7 if (r instanceof AltResult) {
2156 dl 1.19 ex = ((AltResult)r).ex;
2157 dl 1.7 t = null;
2158     }
2159 dl 1.19 else {
2160     ex = null;
2161 dl 1.7 t = (T) r;
2162 dl 1.19 }
2163 dl 1.20 Executor e = executor;
2164 dl 1.7 if (ex == null) {
2165     try {
2166 dl 1.20 if (e != null)
2167     e.execute(new AsyncBlock<T>(t, fn, dst));
2168     else
2169 dl 1.7 fn.accept(t);
2170     } catch (Throwable rex) {
2171 dl 1.19 ex = rex;
2172 dl 1.7 }
2173     }
2174 dl 1.20 if (e == null || ex != null)
2175     dst.internalComplete(null, ex);
2176 dl 1.17 }
2177 dl 1.20 helpPostComplete();
2178 dl 1.7 return dst;
2179     }
2180    
2181 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> thenRunnable
2182     (Runnable action,
2183     Executor executor) {
2184 dl 1.1 if (action == null) throw new NullPointerException();
2185     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2186     ThenRunnable<T> d = null;
2187 jsr166 1.2 Object r;
2188     if ((r = result) == null) {
2189 dl 1.1 CompletionNode p = new CompletionNode
2190     (d = new ThenRunnable<T>(this, action, dst, executor));
2191     while ((r = result) == null) {
2192     if (UNSAFE.compareAndSwapObject
2193     (this, COMPLETIONS, p.next = completions, p))
2194     break;
2195     }
2196     }
2197 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2198 dl 1.19 Throwable ex;
2199     if (r instanceof AltResult)
2200     ex = ((AltResult)r).ex;
2201     else
2202     ex = null;
2203 dl 1.20 Executor e = executor;
2204 jsr166 1.2 if (ex == null) {
2205 dl 1.1 try {
2206 dl 1.20 if (e != null)
2207     e.execute(new AsyncRunnable(action, dst));
2208     else
2209 dl 1.1 action.run();
2210     } catch (Throwable rex) {
2211 dl 1.19 ex = rex;
2212 dl 1.1 }
2213     }
2214 dl 1.20 if (e == null || ex != null)
2215     dst.internalComplete(null, ex);
2216 dl 1.17 }
2217 dl 1.20 helpPostComplete();
2218 dl 1.1 return dst;
2219     }
2220    
2221 dl 1.20 @SuppressWarnings("unchecked") private <U,V> CompletableFuture<V> andFunction
2222     (CompletableFuture<? extends U> other,
2223     BiFunction<? super T,? super U,? extends V> fn,
2224     Executor executor) {
2225 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
2226 jsr166 1.2 CompletableFuture<V> dst = new CompletableFuture<V>();
2227     AndFunction<T,U,V> d = null;
2228     Object r, s = null;
2229     if ((r = result) == null || (s = other.result) == null) {
2230 dl 1.1 d = new AndFunction<T,U,V>(this, other, fn, dst, executor);
2231     CompletionNode q = null, p = new CompletionNode(d);
2232     while ((r == null && (r = result) == null) ||
2233     (s == null && (s = other.result) == null)) {
2234     if (q != null) {
2235     if (s != null ||
2236     UNSAFE.compareAndSwapObject
2237     (other, COMPLETIONS, q.next = other.completions, q))
2238     break;
2239     }
2240     else if (r != null ||
2241     UNSAFE.compareAndSwapObject
2242     (this, COMPLETIONS, p.next = completions, p)) {
2243     if (s != null)
2244     break;
2245     q = new CompletionNode(d);
2246     }
2247     }
2248     }
2249 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2250 dl 1.19 T t; U u; Throwable ex;
2251 jsr166 1.2 if (r instanceof AltResult) {
2252 dl 1.19 ex = ((AltResult)r).ex;
2253 jsr166 1.2 t = null;
2254 dl 1.1 }
2255 dl 1.19 else {
2256     ex = null;
2257 jsr166 1.2 t = (T) r;
2258 dl 1.19 }
2259 dl 1.1 if (ex != null)
2260     u = null;
2261     else if (s instanceof AltResult) {
2262 dl 1.19 ex = ((AltResult)s).ex;
2263 dl 1.1 u = null;
2264     }
2265     else
2266 jsr166 1.2 u = (U) s;
2267 dl 1.20 Executor e = executor;
2268     V v = null;
2269 jsr166 1.2 if (ex == null) {
2270 dl 1.1 try {
2271 dl 1.20 if (e != null)
2272     e.execute(new AsyncBiFunction<T,U,V>(t, u, fn, dst));
2273 dl 1.1 else
2274 dl 1.20 v = fn.apply(t, u);
2275 dl 1.1 } catch (Throwable rex) {
2276 dl 1.19 ex = rex;
2277 dl 1.1 }
2278     }
2279 dl 1.20 if (e == null || ex != null)
2280     dst.internalComplete(v, ex);
2281 jsr166 1.2 }
2282 dl 1.20 helpPostComplete();
2283     other.helpPostComplete();
2284 jsr166 1.2 return dst;
2285 dl 1.1 }
2286    
2287 dl 1.20 @SuppressWarnings("unchecked") private <U> CompletableFuture<Void> andBlock
2288     (CompletableFuture<? extends U> other,
2289     BiBlock<? super T,? super U> fn,
2290     Executor executor) {
2291 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
2292     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2293     AndBlock<T,U> d = null;
2294     Object r, s = null;
2295     if ((r = result) == null || (s = other.result) == null) {
2296     d = new AndBlock<T,U>(this, other, fn, dst, executor);
2297     CompletionNode q = null, p = new CompletionNode(d);
2298     while ((r == null && (r = result) == null) ||
2299     (s == null && (s = other.result) == null)) {
2300     if (q != null) {
2301     if (s != null ||
2302     UNSAFE.compareAndSwapObject
2303     (other, COMPLETIONS, q.next = other.completions, q))
2304     break;
2305     }
2306     else if (r != null ||
2307     UNSAFE.compareAndSwapObject
2308     (this, COMPLETIONS, p.next = completions, p)) {
2309     if (s != null)
2310     break;
2311     q = new CompletionNode(d);
2312     }
2313     }
2314     }
2315     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2316 dl 1.19 T t; U u; Throwable ex;
2317 dl 1.7 if (r instanceof AltResult) {
2318 dl 1.19 ex = ((AltResult)r).ex;
2319 dl 1.7 t = null;
2320     }
2321 dl 1.19 else {
2322     ex = null;
2323 dl 1.7 t = (T) r;
2324 dl 1.19 }
2325 dl 1.7 if (ex != null)
2326     u = null;
2327     else if (s instanceof AltResult) {
2328 dl 1.19 ex = ((AltResult)s).ex;
2329 dl 1.7 u = null;
2330     }
2331     else
2332     u = (U) s;
2333 dl 1.20 Executor e = executor;
2334 dl 1.7 if (ex == null) {
2335     try {
2336 dl 1.20 if (e != null)
2337     e.execute(new AsyncBiBlock<T,U>(t, u, fn, dst));
2338     else
2339 dl 1.7 fn.accept(t, u);
2340     } catch (Throwable rex) {
2341 dl 1.19 ex = rex;
2342 dl 1.7 }
2343     }
2344 dl 1.20 if (e == null || ex != null)
2345     dst.internalComplete(null, ex);
2346 dl 1.7 }
2347 dl 1.20 helpPostComplete();
2348     other.helpPostComplete();
2349 dl 1.7 return dst;
2350     }
2351    
2352 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> andRunnable
2353     (CompletableFuture<?> other,
2354     Runnable action,
2355     Executor executor) {
2356 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2357 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2358     AndRunnable<T> d = null;
2359     Object r, s = null;
2360     if ((r = result) == null || (s = other.result) == null) {
2361 dl 1.1 d = new AndRunnable<T>(this, other, action, dst, executor);
2362     CompletionNode q = null, p = new CompletionNode(d);
2363     while ((r == null && (r = result) == null) ||
2364     (s == null && (s = other.result) == null)) {
2365     if (q != null) {
2366     if (s != null ||
2367     UNSAFE.compareAndSwapObject
2368     (other, COMPLETIONS, q.next = other.completions, q))
2369     break;
2370     }
2371     else if (r != null ||
2372     UNSAFE.compareAndSwapObject
2373     (this, COMPLETIONS, p.next = completions, p)) {
2374     if (s != null)
2375     break;
2376     q = new CompletionNode(d);
2377     }
2378     }
2379     }
2380 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2381 dl 1.19 Throwable ex;
2382     if (r instanceof AltResult)
2383     ex = ((AltResult)r).ex;
2384     else
2385     ex = null;
2386     if (ex == null && (s instanceof AltResult))
2387     ex = ((AltResult)s).ex;
2388 dl 1.20 Executor e = executor;
2389 dl 1.19 if (ex == null) {
2390 dl 1.1 try {
2391 dl 1.20 if (e != null)
2392     e.execute(new AsyncRunnable(action, dst));
2393     else
2394 dl 1.1 action.run();
2395     } catch (Throwable rex) {
2396 dl 1.19 ex = rex;
2397 dl 1.1 }
2398     }
2399 dl 1.20 if (e == null || ex != null)
2400     dst.internalComplete(null, ex);
2401 jsr166 1.2 }
2402 dl 1.20 helpPostComplete();
2403     other.helpPostComplete();
2404 jsr166 1.2 return dst;
2405 dl 1.1 }
2406    
2407 dl 1.20 @SuppressWarnings("unchecked") private <U> CompletableFuture<U> orFunction
2408     (CompletableFuture<? extends T> other,
2409     Function<? super T, U> fn,
2410     Executor executor) {
2411 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
2412 jsr166 1.2 CompletableFuture<U> dst = new CompletableFuture<U>();
2413     OrFunction<T,U> d = null;
2414     Object r;
2415     if ((r = result) == null && (r = other.result) == null) {
2416 dl 1.1 d = new OrFunction<T,U>(this, other, fn, dst, executor);
2417     CompletionNode q = null, p = new CompletionNode(d);
2418     while ((r = result) == null && (r = other.result) == null) {
2419     if (q != null) {
2420     if (UNSAFE.compareAndSwapObject
2421     (other, COMPLETIONS, q.next = other.completions, q))
2422     break;
2423     }
2424     else if (UNSAFE.compareAndSwapObject
2425     (this, COMPLETIONS, p.next = completions, p))
2426     q = new CompletionNode(d);
2427     }
2428 jsr166 1.2 }
2429     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2430 dl 1.19 T t; Throwable ex;
2431 jsr166 1.2 if (r instanceof AltResult) {
2432 dl 1.19 ex = ((AltResult)r).ex;
2433 jsr166 1.2 t = null;
2434 dl 1.1 }
2435 dl 1.19 else {
2436     ex = null;
2437 jsr166 1.2 t = (T) r;
2438 dl 1.19 }
2439 dl 1.20 Executor e = executor;
2440     U u = null;
2441 jsr166 1.2 if (ex == null) {
2442 dl 1.1 try {
2443 dl 1.20 if (e != null)
2444     e.execute(new AsyncFunction<T,U>(t, fn, dst));
2445 dl 1.1 else
2446 dl 1.20 u = fn.apply(t);
2447 dl 1.1 } catch (Throwable rex) {
2448 dl 1.19 ex = rex;
2449 dl 1.1 }
2450     }
2451 dl 1.20 if (e == null || ex != null)
2452     dst.internalComplete(u, ex);
2453 jsr166 1.2 }
2454 dl 1.20 helpPostComplete();
2455     other.helpPostComplete();
2456 jsr166 1.2 return dst;
2457 dl 1.1 }
2458    
2459 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> orBlock
2460     (CompletableFuture<? extends T> other,
2461     Block<? super T> fn,
2462     Executor executor) {
2463 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
2464     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2465     OrBlock<T> d = null;
2466     Object r;
2467     if ((r = result) == null && (r = other.result) == null) {
2468     d = new OrBlock<T>(this, other, fn, dst, executor);
2469     CompletionNode q = null, p = new CompletionNode(d);
2470     while ((r = result) == null && (r = other.result) == null) {
2471     if (q != null) {
2472     if (UNSAFE.compareAndSwapObject
2473     (other, COMPLETIONS, q.next = other.completions, q))
2474     break;
2475     }
2476     else if (UNSAFE.compareAndSwapObject
2477     (this, COMPLETIONS, p.next = completions, p))
2478     q = new CompletionNode(d);
2479     }
2480     }
2481     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2482 dl 1.19 T t; Throwable ex;
2483 dl 1.7 if (r instanceof AltResult) {
2484 dl 1.19 ex = ((AltResult)r).ex;
2485 dl 1.7 t = null;
2486     }
2487 dl 1.19 else {
2488     ex = null;
2489 dl 1.7 t = (T) r;
2490 dl 1.19 }
2491 dl 1.20 Executor e = executor;
2492 dl 1.7 if (ex == null) {
2493     try {
2494 dl 1.20 if (e != null)
2495     e.execute(new AsyncBlock<T>(t, fn, dst));
2496     else
2497 dl 1.7 fn.accept(t);
2498     } catch (Throwable rex) {
2499 dl 1.19 ex = rex;
2500 dl 1.7 }
2501     }
2502 dl 1.20 if (e == null || ex != null)
2503     dst.internalComplete(null, ex);
2504 dl 1.7 }
2505 dl 1.20 helpPostComplete();
2506     other.helpPostComplete();
2507 dl 1.7 return dst;
2508     }
2509    
2510 dl 1.20 @SuppressWarnings("unchecked") private CompletableFuture<Void> orRunnable
2511     (CompletableFuture<?> other,
2512     Runnable action,
2513     Executor executor) {
2514 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2515 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2516     OrRunnable<T> d = null;
2517     Object r;
2518     if ((r = result) == null && (r = other.result) == null) {
2519 dl 1.1 d = new OrRunnable<T>(this, other, action, dst, executor);
2520     CompletionNode q = null, p = new CompletionNode(d);
2521     while ((r = result) == null && (r = other.result) == null) {
2522     if (q != null) {
2523     if (UNSAFE.compareAndSwapObject
2524     (other, COMPLETIONS, q.next = other.completions, q))
2525     break;
2526     }
2527     else if (UNSAFE.compareAndSwapObject
2528     (this, COMPLETIONS, p.next = completions, p))
2529     q = new CompletionNode(d);
2530     }
2531 jsr166 1.2 }
2532     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2533 dl 1.19 Throwable ex;
2534     if (r instanceof AltResult)
2535     ex = ((AltResult)r).ex;
2536     else
2537     ex = null;
2538 dl 1.20 Executor e = executor;
2539 dl 1.19 if (ex == null) {
2540 dl 1.1 try {
2541 dl 1.20 if (e != null)
2542     e.execute(new AsyncRunnable(action, dst));
2543     else
2544 dl 1.1 action.run();
2545     } catch (Throwable rex) {
2546 dl 1.19 ex = rex;
2547 dl 1.1 }
2548     }
2549 dl 1.20 if (e == null || ex != null)
2550     dst.internalComplete(null, ex);
2551 jsr166 1.2 }
2552 dl 1.20 helpPostComplete();
2553     other.helpPostComplete();
2554 jsr166 1.2 return dst;
2555 dl 1.1 }
2556    
2557     // Unsafe mechanics
2558     private static final sun.misc.Unsafe UNSAFE;
2559     private static final long RESULT;
2560     private static final long WAITERS;
2561     private static final long COMPLETIONS;
2562     static {
2563     try {
2564     UNSAFE = sun.misc.Unsafe.getUnsafe();
2565     Class<?> k = CompletableFuture.class;
2566     RESULT = UNSAFE.objectFieldOffset
2567     (k.getDeclaredField("result"));
2568     WAITERS = UNSAFE.objectFieldOffset
2569     (k.getDeclaredField("waiters"));
2570     COMPLETIONS = UNSAFE.objectFieldOffset
2571     (k.getDeclaredField("completions"));
2572     } catch (Exception e) {
2573     throw new Error(e);
2574     }
2575     }
2576     }